diff --git a/yt_dlp/YoutubeDL.py b/yt_dlp/YoutubeDL.py index a9a8e4133e..7a6b22a85d 100644 --- a/yt_dlp/YoutubeDL.py +++ b/yt_dlp/YoutubeDL.py @@ -3542,7 +3542,8 @@ class YoutubeDL: 'writing DASH m4a. Only some players support this container', FFmpegFixupM4aPP) ffmpeg_fixup(downloader == 'hlsnative' and not self.params.get('hls_use_mpegts') - or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None, + or info_dict.get('is_live') and self.params.get('hls_use_mpegts') is None + or downloader == 'niconico_live', 'Possible MPEG-TS in MP4 container or malformed AAC timestamps', FFmpegFixupM3u8PP) ffmpeg_fixup(downloader == 'dashsegments' diff --git a/yt_dlp/downloader/niconico.py b/yt_dlp/downloader/niconico.py index 462c6e2d63..62bed646f3 100644 --- a/yt_dlp/downloader/niconico.py +++ b/yt_dlp/downloader/niconico.py @@ -1,12 +1,22 @@ +import contextlib import json +import math import threading import time from . import get_suitable_downloader from .common import FileDownloader from .external import FFmpegFD +from ..downloader.fragment import FragmentFD from ..networking import Request -from ..utils import DownloadError, str_or_none, try_get +from ..networking.exceptions import network_exceptions +from ..utils import ( + DownloadError, + RetryManager, + str_or_none, + traverse_obj, + urljoin, +) class NiconicoDmcFD(FileDownloader): @@ -56,85 +66,184 @@ class NiconicoDmcFD(FileDownloader): return success -class NiconicoLiveFD(FileDownloader): - """ Downloads niconico live without being stopped """ +class NiconicoLiveFD(FragmentFD): + """ Downloads niconico live/timeshift VOD """ - def real_download(self, filename, info_dict): - video_id = info_dict['video_id'] - ws_url = info_dict['url'] - ws_extractor = info_dict['ws'] - ws_origin_host = info_dict['origin'] - live_quality = info_dict.get('live_quality', 'high') - live_latency = info_dict.get('live_latency', 'high') - dl = FFmpegFD(self.ydl, self.params or {}) + _PER_FRAGMENT_DOWNLOAD_RATIO = 0.1 + _WEBSOCKET_RECONNECT_DELAY = 10 - new_info_dict = info_dict.copy() - new_info_dict.update({ - 'protocol': 'm3u8', - }) + @contextlib.contextmanager + def _ws_context(self, info_dict): + """ Hold a WebSocket object and release it when leaving """ - def communicate_ws(reconnect): - if reconnect: - ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'})) - if self.ydl.params.get('verbose', False): - self.to_screen('[debug] Sending startWatching request') - ws.send(json.dumps({ - 'type': 'startWatching', - 'data': { - 'stream': { - 'quality': live_quality, - 'protocol': 'hls+fmp4', - 'latency': live_latency, - 'chasePlay': False, - }, - 'room': { - 'protocol': 'webSocket', - 'commentable': True, - }, - 'reconnect': True, + video_id = info_dict['id'] + format_id = info_dict['format_id'] + live_latency = info_dict['downloader_options']['live_latency'] + ws_url = info_dict['downloader_options']['ws_url'] + + self.ws = None + + self.m3u8_lock = threading.Event() + self.m3u8_url = None + + def communicate_ws(): + self.ws = self.ydl.urlopen(Request(ws_url, headers=info_dict.get('http_headers'))) + if self.ydl.params.get('verbose', False): + self.write_debug('Sending HLS server request') + self.ws.send(json.dumps({ + 'type': 'startWatching', + 'data': { + 'stream': { + 'quality': format_id, + 'protocol': 'hls', + 'latency': live_latency, + 'chasePlay': False, }, - })) - else: - ws = ws_extractor - with ws: + 'room': { + 'protocol': 'webSocket', + 'commentable': True, + }, + }, + })) + with self.ws: while True: - recv = ws.recv() + recv = self.ws.recv() if not recv: continue data = json.loads(recv) - if not data or not isinstance(data, dict): + if not isinstance(data, dict): continue if data.get('type') == 'ping': # pong back - ws.send(r'{"type":"pong"}') - ws.send(r'{"type":"keepSeat"}') + self.ws.send(r'{"type":"pong"}') + self.ws.send(r'{"type":"keepSeat"}') + elif data.get('type') == 'stream': + self.m3u8_url = data['data']['uri'] + self.m3u8_lock.set() elif data.get('type') == 'disconnect': self.write_debug(data) - return True + return elif data.get('type') == 'error': self.write_debug(data) - message = try_get(data, lambda x: x['body']['code'], str) or recv - return DownloadError(message) + message = traverse_obj(data, ('data', 'code')) or recv + raise DownloadError(message) elif self.ydl.params.get('verbose', False): if len(recv) > 100: recv = recv[:100] + '...' - self.to_screen(f'[debug] Server said: {recv}') + self.write_debug(f'Server said: {recv}') + + stopped = threading.Event() def ws_main(): - reconnect = False - while True: + while not stopped.is_set(): try: - ret = communicate_ws(reconnect) - if ret is True: - return - except BaseException as e: - self.to_screen('[{}] {}: Connection error occured, reconnecting after 10 seconds: {}'.format('niconico:live', video_id, str_or_none(e))) - time.sleep(10) - continue - finally: - reconnect = True + communicate_ws() + break # Disconnected + except BaseException as e: # Including TransportError + if stopped.is_set(): + break + + self.m3u8_lock.clear() # m3u8 url may be changed + + self.to_screen('[{}] {}: Connection error occured, reconnecting after {} seconds: {}'.format( + 'niconico:live', video_id, self._WEBSOCKET_RECONNECT_DELAY, str_or_none(e))) + time.sleep(self._WEBSOCKET_RECONNECT_DELAY) + + self.m3u8_lock.set() # Release possible locks thread = threading.Thread(target=ws_main, daemon=True) thread.start() - return dl.download(filename, new_info_dict) + try: + yield self + finally: + stopped.set() + if self.ws: + self.ws.close() + thread.join() + + def _master_m3u8_url(self): + """ Get the refreshed manifest url after WebSocket reconnection to prevent HTTP 403 """ + + self.m3u8_lock.wait() + return self.m3u8_url + + def real_download(self, filename, info_dict): + with self._ws_context(info_dict) as ws_context: + # live + if info_dict.get('is_live'): + info_dict = info_dict.copy() + info_dict['protocol'] = 'm3u8' + return FFmpegFD(self.ydl, self.params or {}).download(filename, info_dict) + + # timeshift VOD + from ..extractor.niconico import NiconicoIE + ie = NiconicoIE(self.ydl) + + video_id = info_dict['id'] + + # Get video info + total_duration = 0 + fragment_duration = 0 + for line in ie._download_webpage(info_dict['url'], video_id, note='Downloading m3u8').splitlines(): + if '#STREAM-DURATION' in line: + total_duration = int(float(line.split(':')[1])) + if '#EXT-X-TARGETDURATION' in line: + fragment_duration = int(line.split(':')[1]) + if not (total_duration and fragment_duration): + raise DownloadError('Unable to get required video info') + + ctx = { + 'filename': filename, + 'total_frags': math.ceil(total_duration / fragment_duration), + } + + self._prepare_and_start_frag_download(ctx, info_dict) + + downloaded_duration = ctx['fragment_index'] * fragment_duration + while True: + if downloaded_duration > total_duration: + break + + retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry) + for retry in retry_manager: + try: + # Refresh master m3u8 (if possible) to get the new URL of the previously-chose format + media_m3u8_url = ie._extract_m3u8_formats( + ws_context._master_m3u8_url(), video_id, note=False, + query={'start': downloaded_duration}, live=False)[0]['url'] + + # Get all fragments + media_m3u8 = ie._download_webpage( + media_m3u8_url, video_id, note=False, errnote='Unable to download media m3u8') + fragment_urls = traverse_obj(media_m3u8.splitlines(), ( + lambda _, v: not v.startswith('#'), {lambda url: urljoin(media_m3u8_url, url)})) + + with self.DurationLimiter(len(fragment_urls) * fragment_duration * self._PER_FRAGMENT_DOWNLOAD_RATIO): + for fragment_url in fragment_urls: + success = self._download_fragment(ctx, fragment_url, info_dict) + if not success: + return False + self._append_fragment(ctx, self._read_fragment(ctx)) + downloaded_duration += fragment_duration + + except (DownloadError, *network_exceptions) as err: + retry.error = err + continue + + if retry_manager.error: + return False + + return self._finish_frag_download(ctx, info_dict) + + class DurationLimiter: + def __init__(self, target): + self.target = target + + def __enter__(self): + self.start = time.time() + + def __exit__(self, *exc): + remaining = self.target - (time.time() - self.start) + if remaining > 0: + time.sleep(remaining) diff --git a/yt_dlp/extractor/niconico.py b/yt_dlp/extractor/niconico.py index 29fc1da1e2..d956349e34 100644 --- a/yt_dlp/extractor/niconico.py +++ b/yt_dlp/extractor/niconico.py @@ -7,7 +7,6 @@ import time import urllib.parse from .common import InfoExtractor, SearchInfoExtractor -from ..networking import Request from ..networking.exceptions import HTTPError from ..utils import ( ExtractorError, @@ -32,12 +31,56 @@ from ..utils import ( ) -class NiconicoIE(InfoExtractor): - IE_NAME = 'niconico' - IE_DESC = 'ニコニコ動画' +class NiconicoBaseIE(InfoExtractor): + _NETRC_MACHINE = 'niconico' _GEO_COUNTRIES = ['JP'] _GEO_BYPASS = False + def _perform_login(self, username, password): + login_ok = True + login_form_strs = { + 'mail_tel': username, + 'password': password, + } + self._request_webpage( + 'https://account.nicovideo.jp/login', None, + note='Acquiring Login session') + page = self._download_webpage( + 'https://account.nicovideo.jp/login/redirector?show_button_twitter=1&site=niconico&show_button_facebook=1', None, + note='Logging in', errnote='Unable to log in', + data=urlencode_postdata(login_form_strs), + headers={ + 'Referer': 'https://account.nicovideo.jp/login', + 'Content-Type': 'application/x-www-form-urlencoded', + }) + if 'oneTimePw' in page: + post_url = self._search_regex( + r'