47 lines
1.7 KiB
Python

from __future__ import annotations
from yt_dlp.extractor.youtube.pot._provider import BuiltInIEContentProvider
from yt_dlp.extractor.youtube.pot.builtin.utils import ContentBindingType, get_webpo_content_binding
from yt_dlp.extractor.youtube.pot.cache import (
CacheProviderWritePolicy,
PoTokenCacheSpec,
PoTokenCacheSpecProvider,
register_pcsp,
)
from yt_dlp.extractor.youtube.pot.provider import (
PoTokenRequest,
)
from yt_dlp.utils import traverse_obj
@register_pcsp
class WebPoPCSP(PoTokenCacheSpecProvider, BuiltInIEContentProvider):
PROVIDER_NAME = 'webpo'
def generate_cache_spec(self, request: PoTokenRequest) -> PoTokenCacheSpec | None:
bind_to_visitor_id = self.get_setting('bind_to_visitor_id', default=['true'])[0] == 'true'
content_binding, content_binding_type = get_webpo_content_binding(
request, bind_to_visitor_id=bind_to_visitor_id)
if not content_binding:
return None
write_policy = CacheProviderWritePolicy.WRITE_ALL
if content_binding_type == ContentBindingType.VIDEO_ID:
write_policy = CacheProviderWritePolicy.WRITE_FIRST
return PoTokenCacheSpec(
key_bindings={
't': 'webpo',
'cb': content_binding,
'cbt': content_binding_type.value,
'ip': traverse_obj(request.innertube_context, ('client', 'remoteHost')),
'sa': request.request_source_address,
'px': request.request_proxy,
},
# Integrity token response usually states it has a ttl of 12 hours (43200 seconds).
# We will default to 6 hours to be safe.
default_ttl=21600,
write_policy=write_policy,
)