Compare commits

..

No commits in common. "e3f0d8b731b40176bcc632bf92cfe5149402b202" and "c8680b65f79cfeb23b342b70ffe1e233902f7933" have entirely different histories.

2 changed files with 26 additions and 101 deletions

View File

@ -1,6 +1,5 @@
import functools
import json
import random
import re
import urllib.parse
@ -364,20 +363,6 @@ class DailymotionIE(DailymotionBaseInfoExtractor):
continue
yield update_url(player_url, query=query_string)
@staticmethod
def _generate_blockbuster_headers():
# Randomize our HTTP header fingerprint to bust the HTTP Error 403 block
# See https://github.com/yt-dlp/yt-dlp/issues/15526
def random_letters(minimum, maximum):
# Omit vowels so we don't generate valid header names like 'authorization', etc
return ''.join(random.choices('bcdfghjklmnpqrstvwxz', k=random.randint(minimum, maximum)))
return {
random_letters(8, 24): random_letters(16, 32)
for _ in range(random.randint(2, 8))
}
def _real_extract(self, url):
url, smuggled_data = unsmuggle_url(url)
video_id, is_playlist, playlist_id = self._match_valid_url(url).group('id', 'is_playlist', 'playlist_id')
@ -440,8 +425,7 @@ class DailymotionIE(DailymotionBaseInfoExtractor):
continue
if media_type == 'application/x-mpegURL':
fmt, subs = self._extract_m3u8_formats_and_subtitles(
media_url, video_id, 'mp4', live=is_live, m3u8_id='hls',
fatal=False, headers=self._generate_blockbuster_headers())
media_url, video_id, 'mp4', live=is_live, m3u8_id='hls', fatal=False)
formats.extend(fmt)
self._merge_subtitles(subs, target=subtitles)
else:

View File

@ -1,6 +1,4 @@
import base64
import functools
import hashlib
import itertools
import json
import random
@ -17,7 +15,6 @@ from ..utils import (
UnsupportedError,
UserNotLive,
determine_ext,
extract_attributes,
filter_dict,
format_field,
int_or_none,
@ -28,13 +25,13 @@ from ..utils import (
qualities,
srt_subtitles_timecode,
str_or_none,
traverse_obj,
truncate_string,
try_call,
try_get,
url_or_none,
urlencode_postdata,
)
from ..utils.traversal import find_element, require, traverse_obj
class TikTokBaseIE(InfoExtractor):
@ -220,94 +217,38 @@ class TikTokBaseIE(InfoExtractor):
raise ExtractorError('Unable to extract aweme detail info', video_id=aweme_id)
return self._parse_aweme_video_app(aweme_detail)
def _solve_challenge_and_set_cookie(self, webpage):
challenge_data = traverse_obj(webpage, (
{find_element(id='cs', html=True)}, {extract_attributes}, 'class',
filter, {lambda x: f'{x}==='}, {base64.b64decode}, {json.loads}))
if not challenge_data:
if 'Please wait...' in webpage:
raise ExtractorError('Unable to extract challenge data')
raise ExtractorError('Unexpected response from webpage request')
self.to_screen('Solving JS challenge using native Python implementation')
expected_digest = traverse_obj(challenge_data, (
'v', 'c', {str}, {base64.b64decode},
{require('challenge expected digest')}))
base_hash = traverse_obj(challenge_data, (
'v', 'a', {str}, {base64.b64decode},
{hashlib.sha256}, {require('challenge base hash')}))
for i in range(1_000_001):
number = str(i).encode()
test_hash = base_hash.copy()
test_hash.update(number)
if test_hash.digest() == expected_digest:
challenge_data['d'] = base64.b64encode(number).decode()
break
else:
raise ExtractorError('Unable to solve JS challenge')
cookie_value = base64.b64encode(
json.dumps(challenge_data, separators=(',', ':')).encode()).decode()
# At time of writing, the cookie name was _wafchallengeid
cookie_name = traverse_obj(webpage, (
{find_element(id='wci', html=True)}, {extract_attributes},
'class', {require('challenge cookie name')}))
# Actual JS sets Max-Age=1, but we need to adjust for --sleep-requests and Python slowness
expire_time = int(time.time()) + (self.get_param('sleep_interval_requests') or 0) + 2
self._set_cookie('.tiktok.com', cookie_name, cookie_value, expire_time=expire_time)
def _extract_web_data_and_status(self, url, video_id, fatal=True):
video_data, status = {}, -1
def get_webpage(note='Downloading webpage'):
res = self._download_webpage_handle(url, video_id, note, fatal=fatal, impersonate=True)
if res is False:
return False
webpage, urlh = res
if urllib.parse.urlparse(urlh.url).path == '/login':
message = 'TikTok is requiring login for access to this content'
if fatal:
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}', video_id=video_id)
return False
return webpage
webpage = get_webpage()
if webpage is False:
res = self._download_webpage_handle(url, video_id, fatal=fatal, impersonate=True)
if res is False:
return video_data, status
universal_data = self._get_universal_data(webpage, video_id)
if not universal_data:
try:
self._solve_challenge_and_set_cookie(webpage)
except ExtractorError as e:
if fatal:
raise
self.report_warning(e.orig_msg, video_id=video_id)
return video_data, status
webpage = get_webpage(note='Downloading webpage with challenge cookie')
if webpage is False:
return video_data, status
universal_data = self._get_universal_data(webpage, video_id)
if not universal_data:
message = 'Unable to extract universal data for rehydration'
webpage, urlh = res
if urllib.parse.urlparse(urlh.url).path == '/login':
message = 'TikTok is requiring login for access to this content'
if fatal:
raise ExtractorError(message)
self.report_warning(message, video_id=video_id)
self.raise_login_required(message)
self.report_warning(f'{message}. {self._login_hint()}')
return video_data, status
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
if universal_data := self._get_universal_data(webpage, video_id):
self.write_debug('Found universal data for rehydration')
status = traverse_obj(universal_data, ('webapp.video-detail', 'statusCode', {int})) or 0
video_data = traverse_obj(universal_data, ('webapp.video-detail', 'itemInfo', 'itemStruct', {dict}))
elif sigi_data := self._get_sigi_state(webpage, video_id):
self.write_debug('Found sigi state data')
status = traverse_obj(sigi_data, ('VideoPage', 'statusCode', {int})) or 0
video_data = traverse_obj(sigi_data, ('ItemModule', video_id, {dict}))
elif next_data := self._search_nextjs_data(webpage, video_id, default={}):
self.write_debug('Found next.js data')
status = traverse_obj(next_data, ('props', 'pageProps', 'statusCode', {int})) or 0
video_data = traverse_obj(next_data, ('props', 'pageProps', 'itemInfo', 'itemStruct', {dict}))
elif fatal:
raise ExtractorError('Unable to extract webpage video data')
if not traverse_obj(video_data, ('video', {dict})) and traverse_obj(video_data, ('isContentClassified', {bool})):
message = 'This post may not be comfortable for some audiences. Log in for access'