[ie/youtube] Add log level for pot; do not call is_available() until necessary.

This commit is contained in:
coletdjnz 2025-04-12 11:34:31 +12:00
parent a0e036897b
commit 8e9a6553a6
No known key found for this signature in database
GPG Key ID: 91984263BB39894A
6 changed files with 211 additions and 48 deletions

View File

@ -1786,7 +1786,7 @@ The following extractors use this feature:
##### PO Token settings
* `po_token`: Proof of Origin (PO) Token(s) to use. Comma seperated list of PO Tokens in the format `CLIENT.CONTEXT+PO_TOKEN`, e.g. `youtube:po_token=web.gvs+XXX,web.player=XXX,web_safari.gvs+YYY`. Context can be either `gvs` (Google Video Server URLs) or `player` (Innertube player request)
* `pot_debug`: Enable debug logging for PO Token fetching. Either `true` or `false` (default `false`)
* `pot_log_level`: PO Token provider log level. One of `TRACE`, `DEBUG`, `INFO`, `WARNING`, `ERROR`. Default is `DEBUG` if `-v` is used, otherwise `INFO`
* `fetch_pot`: Policy to use for fetching a PO Token from providers. `always` to always try fetch a PO Token regardless if the client requires one for the given context. `when_required` to only fetch a PO Token if the client requires one for the given context (default)
###### youtubepot-webpo

View File

@ -10,6 +10,8 @@ from yt_dlp.utils.networking import HTTPHeaderDict
class MockLogger(IEContentProviderLogger):
log_level = IEContentProviderLogger.LogLevel.TRACE
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.messages = {}

View File

@ -240,9 +240,11 @@ class BaseMockCacheProvider(PoTokenCacheProvider, abc.ABC):
self.store_calls = 0
self.delete_calls = 0
self.get_calls = 0
self.available_called_times = 0
self.available = available
def is_available(self) -> bool:
self.available_called_times += 1
return self.available
def store(self, *args, **kwargs):
@ -303,9 +305,10 @@ class MockMemoryPCP(BaseMockCacheProvider):
return self.cache.get(key, [None])[0]
def create_memory_pcp(ie, logger, provider_key='memory'):
cache = MockMemoryPCP(ie, logger, {})
def create_memory_pcp(ie, logger, provider_key='memory', provider_name='memory', available=True):
cache = MockMemoryPCP(ie, logger, {}, available=available)
cache.PROVIDER_KEY = provider_key
cache.PROVIDER_NAME = provider_name
return cache
@ -921,6 +924,96 @@ class TestPoTokenCache:
assert logger.messages['error'].count("Error occurred with \"unexpected_error\" PO Token cache provider: ValueError('something went wrong'); example bug report message") == 3
def test_cache_provider_unavailable_no_fallback(self, pot_request, ie, logger):
provider = create_memory_pcp(ie, logger, available=False)
cache = PoTokenCache(
cache_providers=[provider],
cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
logger=logger,
)
response = PoTokenResponse(EXAMPLE_PO_TOKEN)
cache.store(pot_request, response)
assert cache.get(pot_request) is None
assert provider.get_calls == 0
assert provider.store_calls == 0
assert provider.available_called_times
def test_cache_provider_unavailable_fallback(self, pot_request, ie, logger):
provider_unavailable = create_memory_pcp(ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
cache = PoTokenCache(
cache_providers=[provider_unavailable, provider_available],
cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
logger=logger,
)
response = PoTokenResponse(EXAMPLE_PO_TOKEN)
cache.store(pot_request, response)
assert cache.get(pot_request) is not None
assert provider_unavailable.get_calls == 0
assert provider_unavailable.store_calls == 0
assert provider_available.get_calls == 1
assert provider_available.store_calls == 1
assert provider_unavailable.available_called_times
assert provider_available.available_called_times
# should not even try to use the provider for the request
assert 'Attempting to fetch a PO Token response from "unavailable" provider' not in logger.messages['trace']
assert 'Attempting to fetch a PO Token response from "available" provider' not in logger.messages['trace']
def test_available_not_called(self, ie, pot_request, logger):
# Test that the available method is not called when provider higher in the list is available
provider_unavailable = create_memory_pcp(
ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
logger.log_level = logger.LogLevel.INFO
cache = PoTokenCache(
cache_providers=[provider_available, provider_unavailable],
cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
logger=logger,
)
response = PoTokenResponse(EXAMPLE_PO_TOKEN)
cache.store(pot_request, response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
assert cache.get(pot_request) is not None
assert provider_unavailable.get_calls == 0
assert provider_unavailable.store_calls == 0
assert provider_available.get_calls == 1
assert provider_available.store_calls == 1
assert provider_unavailable.available_called_times == 0
assert provider_available.available_called_times
assert 'PO Token Cache Providers: available-0.0.0 (external), unavailable-0.0.0 (external, unavailable)' not in logger.messages.get('trace', [])
def test_available_called_trace(self, ie, pot_request, logger):
# But if logging level is trace should call available (as part of debug logging)
provider_unavailable = create_memory_pcp(
ie, logger, provider_key='unavailable', provider_name='unavailable', available=False)
provider_available = create_memory_pcp(ie, logger, provider_key='available', provider_name='available')
logger.log_level = logger.LogLevel.TRACE
cache = PoTokenCache(
cache_providers=[provider_available, provider_unavailable],
cache_spec_providers=[ExampleCacheSpecProviderPCSP(ie=ie, logger=logger, settings={})],
logger=logger,
)
response = PoTokenResponse(EXAMPLE_PO_TOKEN)
cache.store(pot_request, response, write_policy=CacheProviderWritePolicy.WRITE_FIRST)
assert cache.get(pot_request) is not None
assert provider_unavailable.get_calls == 0
assert provider_unavailable.store_calls == 0
assert provider_available.get_calls == 1
assert provider_available.store_calls == 1
assert provider_unavailable.available_called_times
assert provider_available.available_called_times
assert 'PO Token Cache Providers: available-0.0.0 (external), unavailable-0.0.0 (external, unavailable)' in logger.messages.get('trace', [])
def test_close(self, ie, pot_request, logger):
# Should call close on the cache providers and cache specs
memory_pcp = create_memory_pcp(ie, logger, provider_key='memory')
@ -1172,7 +1265,40 @@ class TestPoTokenRequestDirector:
assert pot_provider.request_called_times == 1
assert pot_provider.available_called_times
# should not even try use the provider for the request
assert 'Provider preferences for this request: success=0' in logger.messages['trace']
assert 'Attempting to fetch a PO Token from "unavailable" provider' not in logger.messages['trace']
assert 'Attempting to fetch a PO Token from "success" provider' in logger.messages['trace']
def test_available_not_called(self, ie, logger, pot_cache, pot_request, pot_provider):
# Test that the available method is not called when provider higher in the list is available
logger.log_level = logger.LogLevel.INFO
director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
provider = UnavailablePTP(ie, logger, {})
director.register_provider(pot_provider)
director.register_provider(provider)
response = director.get_po_token(pot_request)
assert response == EXAMPLE_PO_TOKEN
assert provider.request_called_times == 0
assert provider.available_called_times == 0
assert pot_provider.request_called_times == 1
assert pot_provider.available_called_times == 2
assert 'PO Token Providers: success-0.0.1 (external), unavailable-0.0.0 (external, unavailable)' not in logger.messages.get('trace', [])
def test_available_called_trace(self, ie, logger, pot_cache, pot_request, pot_provider):
# But if logging level is trace should call available (as part of debug logging)
logger.log_level = logger.LogLevel.TRACE
director = PoTokenRequestDirector(logger=logger, cache=pot_cache)
provider = UnavailablePTP(ie, logger, {})
director.register_provider(pot_provider)
director.register_provider(provider)
response = director.get_po_token(pot_request)
assert response == EXAMPLE_PO_TOKEN
assert provider.request_called_times == 0
assert provider.available_called_times == 1
assert pot_provider.request_called_times == 1
assert pot_provider.available_called_times == 3
assert 'PO Token Providers: success-0.0.1 (external), unavailable-0.0.0 (external, unavailable)' in logger.messages['trace']
def test_provider_error_no_fallback_unexpected(self, ie, logger, pot_cache, pot_request):
director = PoTokenRequestDirector(logger=logger, cache=pot_cache)

View File

@ -505,3 +505,10 @@ def test_register_cache_provider_preference(ie):
return 1
assert len(_pot_cache_provider_preferences.value) == before + 1
def test_logger_log_level(logger):
assert logger.LogLevel('INFO') == logger.LogLevel.INFO
assert logger.LogLevel('debuG') == logger.LogLevel.DEBUG
assert logger.LogLevel(10) == logger.LogLevel.DEBUG
assert logger.LogLevel('UNKNOWN') == logger.LogLevel.INFO

View File

@ -44,30 +44,34 @@ if typing.TYPE_CHECKING:
class YoutubeIEContentProviderLogger(IEContentProviderLogger):
def __init__(self, ie, prefix, enable_trace=False):
def __init__(self, ie, prefix, log_level=IEContentProviderLogger.LogLevel.INFO):
self.__ie = ie
self.prefix = prefix
self.enable_trace = enable_trace
self.log_level = log_level
def _format_msg(self, message: str):
prefixstr = format_field(self.prefix, None, '[%s] ')
return f'{prefixstr}{message}'
def trace(self, message: str):
if self.enable_trace:
if self.log_level <= self.LogLevel.TRACE:
self.__ie.write_debug(self._format_msg('TRACE: ' + message))
def debug(self, message: str):
self.__ie.write_debug(self._format_msg(message))
if self.log_level <= self.LogLevel.DEBUG:
self.__ie.write_debug(self._format_msg(message))
def info(self, message: str):
self.__ie.to_screen(self._format_msg(message))
if self.log_level <= self.LogLevel.INFO:
self.__ie.to_screen(self._format_msg(message))
def warning(self, message: str, *, once=False):
self.__ie.report_warning(self._format_msg(message), only_once=once)
if self.log_level <= self.LogLevel.WARNING:
self.__ie.report_warning(self._format_msg(message), only_once=once)
def error(self, message: str):
self.__ie._downloader.report_error(self._format_msg(message), is_error=False)
if self.log_level <= self.LogLevel.ERROR:
self.__ie._downloader.report_error(self._format_msg(message), is_error=False)
class PoTokenCache:
@ -90,20 +94,21 @@ class PoTokenCache:
self.logger = logger
def _get_available_providers(self) -> Iterable[PoTokenCacheProvider]:
return (provider for provider in self.cache_providers.values() if provider.is_available())
def _get_cache_providers(self, request: PoTokenRequest) -> list[PoTokenCacheProvider]:
"""Sorts cache providers by preference, given a request"""
def _get_cache_providers(self, request: PoTokenRequest) -> Iterable[PoTokenCacheProvider]:
"""Sorts available cache providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.cache_provider_preferences)
for provider in self._get_available_providers()
for provider in self.cache_providers.values()
}
self.logger.trace(f'Available PO Token Providers: {provider_display_list(self._get_available_providers())}')
self.logger.trace('Cache Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_KEY}={pref}' for provider, pref in preferences.items())))
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Cache Providers: {provider_display_list(self.cache_providers.values())}')
self.logger.trace('Cache Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_KEY}={pref}' for provider, pref in preferences.items())))
return sorted(self._get_available_providers(), key=preferences.get, reverse=True)
return (
provider for provider in sorted(self.cache_providers.values(), key=preferences.get, reverse=True) if provider.is_available()
)
def _get_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
for provider in self.cache_spec_providers.values():
@ -150,6 +155,7 @@ class PoTokenCache:
for idx, provider in enumerate(self._get_cache_providers(request)):
try:
self.logger.trace(f'Attempting to fetch PO Token response from "{provider.PROVIDER_NAME}" cache provider')
cache_response = provider.get(cache_key)
if not cache_response:
continue
@ -199,24 +205,22 @@ class PoTokenCache:
self.logger.trace(f'Using write policy: {write_policy}')
for idx, provider in enumerate(self._get_cache_providers(request)):
# WRITE_FIRST should not write to lower priority providers in the case the highest priority provider fails
if idx > 0 and write_policy == CacheProviderWritePolicy.WRITE_FIRST:
return
try:
self.logger.trace(
f'Caching PO Token response in "{provider.PROVIDER_NAME}" cache provider (key={cache_key}, expires_at={cache_response.expires_at})',
)
provider.store(key=cache_key, value=json.dumps(dataclasses.asdict(cache_response)), expires_at=cache_response.expires_at)
except PoTokenCacheProviderError as e:
self.logger.warning(
f'Error from "{provider.PROVIDER_NAME}" PO Token cache provider: {e!r}{provider_bug_report_message(provider) if not e.expected else ""}')
continue
except Exception as e:
self.logger.error(
f'Error occurred with "{provider.PROVIDER_NAME}" PO Token cache provider: {e!r}{provider_bug_report_message(provider)}',
)
continue
# WRITE_FIRST should not write to lower priority providers in the case the highest priority provider fails
if idx == 0 and write_policy == CacheProviderWritePolicy.WRITE_FIRST:
return
def close(self):
for provider in self.cache_providers.values():
@ -239,21 +243,21 @@ class PoTokenRequestDirector:
def register_preference(self, preference):
self.preferences.append(preference)
def _get_available_providers(self) -> list[PoTokenProvider]:
return [provider for provider in self.providers.values() if provider.is_available()]
def _get_providers(self, request: PoTokenRequest) -> list[PoTokenProvider]:
def _get_providers(self, request: PoTokenRequest) -> Iterable[PoTokenProvider]:
"""Sorts available providers by preference, given a request"""
preferences = {
provider: sum(pref(provider, request) for pref in self.preferences)
for provider in self._get_available_providers()
if provider.is_available()
for provider in self.providers.values()
}
self.logger.trace(f'Available PO Token Providers: {provider_display_list(self._get_available_providers())}')
self.logger.trace('Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
if self.logger.log_level <= self.logger.LogLevel.TRACE:
# calling is_available() for every PO Token provider upfront may have some overhead
self.logger.trace(f'PO Token Providers: {provider_display_list(self.providers.values())}')
self.logger.trace('Provider preferences for this request: {}'.format(', '.join(
f'{provider.PROVIDER_NAME}={pref}' for provider, pref in preferences.items())))
return sorted(self._get_available_providers(), key=preferences.get, reverse=True)
return (
provider for provider in sorted(self.providers.values(), key=preferences.get, reverse=True) if provider.is_available()
)
def _get_po_token(self, request) -> PoTokenResponse | None:
for provider in self._get_providers(request):
@ -322,45 +326,49 @@ def initialize_pot_director(ie):
if not ie._downloader:
raise ExtractorError('Downloader not set', expected=False)
enable_trace = ie._configuration_arg('pot_debug', ['false'], ie_key='youtube', casesense=False)[0] == 'true'
log_level = min(
IEContentProviderLogger.LogLevel(ie._configuration_arg('pot_log_level', ['INFO'], ie_key='youtube', casesense=False)[0].upper()),
IEContentProviderLogger.LogLevel.DEBUG if ie._downloader.params.get('verbose', False) else IEContentProviderLogger.LogLevel.INFO,
)
cache_providers = []
for cache_provider in _pot_cache_providers.value.values():
settings = traverse_obj(ie._downloader.params, ('extractor_args', f'{EXTRACTOR_ARG_PREFIX}-{cache_provider.PROVIDER_KEY.lower()}'))
cache_provider_logger = YoutubeIEContentProviderLogger(ie, f'pot:cache:{cache_provider.PROVIDER_NAME}', enable_trace=enable_trace)
cache_provider_logger = YoutubeIEContentProviderLogger(ie, f'pot:cache:{cache_provider.PROVIDER_NAME}', log_level=log_level)
cache_providers.append(cache_provider(ie, cache_provider_logger, settings or {}))
cache_spec_providers = []
for cache_spec_provider in _pot_pcs_providers.value.values():
settings = traverse_obj(ie._downloader.params, ('extractor_args', f'{EXTRACTOR_ARG_PREFIX}-{cache_spec_provider.PROVIDER_KEY.lower()}'))
cache_spec_provider_logger = YoutubeIEContentProviderLogger(ie, f'pot:cache:spec:{cache_spec_provider.PROVIDER_NAME}', enable_trace=enable_trace)
cache_spec_provider_logger = YoutubeIEContentProviderLogger(ie, f'pot:cache:spec:{cache_spec_provider.PROVIDER_NAME}', log_level=log_level)
cache_spec_providers.append(cache_spec_provider(ie, cache_spec_provider_logger, settings or {}))
cache = PoTokenCache(
logger=YoutubeIEContentProviderLogger(ie, 'pot:cache', enable_trace=enable_trace),
logger=YoutubeIEContentProviderLogger(ie, 'pot:cache', log_level=log_level),
cache_providers=cache_providers,
cache_spec_providers=cache_spec_providers,
cache_provider_preferences=list(_pot_cache_provider_preferences.value),
)
director = PoTokenRequestDirector(
logger=YoutubeIEContentProviderLogger(ie, 'pot', enable_trace=enable_trace),
logger=YoutubeIEContentProviderLogger(ie, 'pot', log_level=log_level),
cache=cache,
)
for provider in _pot_providers.value.values():
settings = traverse_obj(ie._downloader.params, ('extractor_args', f'{EXTRACTOR_ARG_PREFIX}-{provider.PROVIDER_KEY.lower()}'))
logger = YoutubeIEContentProviderLogger(ie, f'pot:{provider.PROVIDER_NAME}', enable_trace=enable_trace)
logger = YoutubeIEContentProviderLogger(ie, f'pot:{provider.PROVIDER_NAME}', log_level=log_level)
director.register_provider(provider(ie, logger, settings or {}))
for preference in _ptp_preferences.value:
director.register_preference(preference)
director.logger.debug(f'PO Token Providers: {provider_display_list(director.providers.values())}')
director.logger.debug(f'PO Token Cache Providers: {provider_display_list(cache.cache_providers.values())}')
director.logger.debug(f'PO Token Cache Spec Providers: {provider_display_list(cache.cache_spec_providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} provider preferences')
director.logger.trace(f'Registered {len(cache.cache_provider_preferences)} cache provider preferences')
if director.logger.log_level <= director.logger.LogLevel.DEBUG:
director.logger.debug(f'PO Token Providers: {provider_display_list(director.providers.values())}')
director.logger.debug(f'PO Token Cache Providers: {provider_display_list(cache.cache_providers.values())}')
director.logger.debug(f'PO Token Cache Spec Providers: {provider_display_list(cache.cache_spec_providers.values())}')
director.logger.trace(f'Registered {len(director.preferences)} provider preferences')
director.logger.trace(f'Registered {len(cache.cache_provider_preferences)} cache provider preferences')
return director

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import abc
import enum
import functools
from yt_dlp.extractor.common import InfoExtractor
@ -11,6 +12,25 @@ from yt_dlp.version import __version__
class IEContentProviderLogger(abc.ABC):
class LogLevel(enum.IntEnum):
TRACE = 0
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
@classmethod
def _missing_(cls, value):
if isinstance(value, str):
value = value.upper()
if value in dir(cls):
return cls[value]
return cls.INFO
log_level = LogLevel.INFO
@abc.abstractmethod
def trace(self, message: str):
pass