From f49b551a0c4c25358d2afaeda4ee63989d2d56ab Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Sun, 28 Jun 2026 12:35:50 -0500 Subject: [PATCH] [ie/instagram] Rework extractor (#17075) Closes #7165 Closes #8408 Closes #17074 Authored by: bashonly --- README.md | 2 +- yt_dlp/extractor/instagram.py | 347 ++++++++++++++++------------------ 2 files changed, 162 insertions(+), 187 deletions(-) diff --git a/README.md b/README.md index 5400f97af7..a3af033142 100644 --- a/README.md +++ b/README.md @@ -1916,7 +1916,7 @@ The following extractors use this feature: * `dr`: dynamic range to ignore - one or more of `sdr`, `hdr10`, `dv` #### instagram -* `app_id`: The value of the `X-IG-App-ID` header used for API requests. Default is the web app ID, `936619743392459` +* `app_id`: The value of the `X-IG-App-ID` header used for API requests. Can be the actual ID number, `ios`, or `web` (default) #### niconicochannelplus * `max_comments`: Maximum number of comments to extract - default is `120` diff --git a/yt_dlp/extractor/instagram.py b/yt_dlp/extractor/instagram.py index 98f70c2672..9e1fc59c52 100644 --- a/yt_dlp/extractor/instagram.py +++ b/yt_dlp/extractor/instagram.py @@ -1,3 +1,4 @@ +import functools import hashlib import itertools import json @@ -16,11 +17,10 @@ from ..utils import ( get_element_by_attribute, int_or_none, join_nonempty, - lowercase_escape, str_or_none, - str_to_int, traverse_obj, url_or_none, + urlencode_postdata, ) _ENCODING_CHARS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' @@ -41,13 +41,31 @@ def _id_to_pk(shortcode): class InstagramBaseIE(InfoExtractor): _API_BASE_URL = 'https://i.instagram.com/api/v1' + _BASE_URL = 'https://www.instagram.com/' _LOGIN_URL = 'https://www.instagram.com/accounts/login' + _APP_IDS = { + 'ios': '124024574287414', + 'web': '936619743392459', # default + } + + @property + def _is_logged_in(self): + return bool(self._get_cookies(self._BASE_URL).get('sessionid')) + + @functools.cached_property + def _app_id(self): + user_input = self._configuration_arg('app_id', [None], ie_key=InstagramIE)[0] + return self._APP_IDS.get(user_input, user_input or self._APP_IDS['web']) + + @property + def _is_web_app(self): + return self._app_id == self._APP_IDS['web'] @property def _api_headers(self): return { - 'X-IG-App-ID': self._configuration_arg('app_id', ['936619743392459'], ie_key=InstagramIE)[0], - 'X-ASBD-ID': '198387', + 'X-IG-App-ID': self._app_id, + 'X-ASBD-ID': '359341', 'X-IG-WWW-Claim': '0', 'Origin': 'https://www.instagram.com', 'Accept': '*/*', @@ -106,71 +124,83 @@ class InstagramBaseIE(InfoExtractor): } def _extract_product_media(self, product_media): - media_id = product_media.get('code') or _pk_to_id(product_media.get('pk')) - vcodec = product_media.get('video_codec') - dash_manifest_raw = product_media.get('video_dash_manifest') - videos_list = product_media.get('video_versions') - if not (dash_manifest_raw or videos_list): - return {} + video_id = traverse_obj(product_media, ('pk', {_pk_to_id})) - formats = [{ - 'format_id': fmt.get('id'), - 'url': fmt.get('url'), - 'width': fmt.get('width'), - 'height': fmt.get('height'), - 'vcodec': vcodec, - } for fmt in videos_list or []] - if dash_manifest_raw: - formats.extend(self._parse_mpd_formats(self._parse_xml(dash_manifest_raw, media_id), mpd_id='dash')) + formats = traverse_obj(product_media, ('video_versions', lambda _, v: url_or_none(v['url']), { + 'url': 'url', + 'format_id': (('id', {str}), ('type', {int}, {str_or_none}), any), + 'width': ('width', {int_or_none}), + 'height': ('height', {int_or_none}), + })) + format_info = traverse_obj(product_media, { + 'vcodec': ('video_codec', {str}), + 'acodec': ('has_audio', {bool}, {lambda x: 'none' if x is False else None}), + }) + for f in formats: + f.update(format_info) + + dash = traverse_obj(product_media, ('video_dash_manifest', {str})) + if dash: + formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash')) - thumbnails = [{ - 'url': thumbnail.get('url'), - 'width': thumbnail.get('width'), - 'height': thumbnail.get('height'), - } for thumbnail in traverse_obj(product_media, ('image_versions2', 'candidates')) or []] return { - 'id': media_id, - 'duration': float_or_none(product_media.get('video_duration')), + 'id': video_id, 'formats': formats, - 'thumbnails': thumbnails, + 'duration': traverse_obj(product_media, ('video_duration', {float_or_none})), + 'thumbnails': list(reversed(traverse_obj(product_media, ( + 'image_versions2', 'candidates', + lambda _, v: url_or_none(v['url']), { + 'url': 'url', + 'width': ('width', {int}), + 'height': 'height', + }, + )))), } - def _extract_product(self, product_info): + def _extract_product(self, product_info, video_id=None, get_comments=True): if isinstance(product_info, list): product_info = product_info[0] - user_info = product_info.get('user') or {} info_dict = { - 'id': _pk_to_id(traverse_obj(product_info, 'pk', 'id', expected_type=str_or_none)[:19]), - 'title': product_info.get('title') or f'Video by {user_info.get("username")}', - 'description': traverse_obj(product_info, ('caption', 'text'), expected_type=str_or_none), - 'timestamp': int_or_none(product_info.get('taken_at')), - 'channel': user_info.get('username'), - 'uploader': user_info.get('full_name'), - 'uploader_id': str_or_none(user_info.get('pk')), - 'view_count': int_or_none(product_info.get('view_count')), - 'like_count': int_or_none(product_info.get('like_count')), - 'comment_count': int_or_none(product_info.get('comment_count')), - '__post_extractor': self.extract_comments(_pk_to_id(product_info.get('pk'))), + 'id': video_id, + 'title': format_field(product_info, [('user', 'username', {str})], 'Video by %s', default=None), + **traverse_obj(product_info, { + 'id': ('pk', {_pk_to_id}), + 'title': ('title', {str}), + 'description': ('caption', 'text', {str}), + 'channel': ('user', 'username', {str}), + 'uploader_id': ('user', 'pk', {str_or_none}), + 'uploader': ('user', 'full_name', {str}), + 'timestamp': ('taken_at', {int_or_none}), + 'view_count': ('view_count', {int_or_none}), + 'like_count': ('like_count', {int_or_none}), + 'comment_count': ('comment_count', {int_or_none}), + }), 'http_headers': { 'Referer': 'https://www.instagram.com/', }, } - carousel_media = product_info.get('carousel_media') - if carousel_media: + + if carousel_media := traverse_obj(product_info, ('carousel_media', ..., {dict})): + comments = None + if get_comments and self.get_param('getcomments'): + comments = self._get_comments(info_dict.get('id')) + return { '_type': 'playlist', **info_dict, - 'title': f'Post by {user_info.get("username")}', + 'title': format_field(info_dict, 'channel', 'Post by %s', default=None), 'entries': [{ **info_dict, **self._extract_product_media(product_media), + 'comments': comments, } for product_media in carousel_media], } return { **info_dict, **self._extract_product_media(product_info), + '__post_extractor': self.extract_comments(info_dict.get('id')) if get_comments else None, } def _get_comments(self, video_id): @@ -182,9 +212,9 @@ class InstagramBaseIE(InfoExtractor): for comment_dict in comment_data or []: yield { 'author': traverse_obj(comment_dict, ('node', 'owner', 'username'), ('user', 'username')), - 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id'), ('user', 'pk')), + 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id'), ('user', 'pk'), expected_type=str_or_none), 'author_thumbnail': traverse_obj(comment_dict, ('node', 'owner', 'profile_pic_url'), ('user', 'profile_pic_url'), expected_type=url_or_none), - 'id': traverse_obj(comment_dict, ('node', 'id'), 'pk'), + 'id': traverse_obj(comment_dict, ('node', 'id'), 'pk', expected_type=str_or_none), 'text': traverse_obj(comment_dict, ('node', 'text'), 'text'), 'like_count': traverse_obj(comment_dict, ('node', 'edge_liked_by', 'count'), 'comment_like_count', expected_type=int_or_none), 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), 'created_at', expected_type=int_or_none), @@ -350,6 +380,8 @@ class InstagramIE(InstagramBaseIE): 'only_matching': True, }] + _lsd_token = None + @classmethod def _extract_embed_urls(cls, url, webpage): res = tuple(super()._extract_embed_urls(url, webpage)) @@ -361,165 +393,107 @@ class InstagramIE(InstagramBaseIE): if mobj: return [mobj.group('link')] + def _real_initialize(self): + if self._is_logged_in: + return + if not self._lsd_token: + webpage = self._download_webpage( + self._BASE_URL, None, 'Setting up session', + impersonate=True, require_impersonation=True) + eqmc = self._search_json( + r']* id="__eqmc"[^>]*>', webpage, 'eqmc JSON', None, default={}) + self._lsd_token = ( + traverse_obj(eqmc, ('l', {str})) + or self._search_regex(r'\["LSD",\[\],\{"token":"([^"]+)"', webpage, 'LSD token')) + def _real_extract(self, url): video_id, url = self._match_valid_url(url).group('id', 'url') - media, webpage = {}, '' + media_id = str(_id_to_pk(video_id)) - if self._get_cookies(url).get('sessionid'): - info = traverse_obj(self._download_json( - f'{self._API_BASE_URL}/media/{_id_to_pk(video_id)}/info/', video_id, - fatal=False, errnote='Video info extraction failed', - note='Downloading video info', headers=self._api_headers), ('items', 0)) - if info: - media.update(info) - return self._extract_product(media) + if self._is_logged_in: + return self._extract_product(self._download_json( + f'{self._API_BASE_URL}/media/{media_id}/info/', video_id, + 'Downloading video info', 'Video info extraction failed', + impersonate=self._is_web_app, headers=self._api_headers)['items'][0]) api_check = self._download_json( - f'{self._API_BASE_URL}/web/get_ruling_for_content/?content_type=MEDIA&target_id={_id_to_pk(video_id)}', - video_id, headers=self._api_headers, fatal=False, note='Setting up session', errnote=False) or {} - csrf_token = self._get_cookies('https://www.instagram.com').get('csrftoken') + f'{self._API_BASE_URL}/web/get_ruling_for_content/', video_id, + 'Checking post accessibility', errnote=False, fatal=False, + impersonate=True, require_impersonation=True, headers=self._api_headers, + query={'content_type': 'MEDIA', 'target_id': media_id}) or {} + csrf_token = self._get_cookies('https://www.instagram.com').get('csrftoken') if not csrf_token: - self.report_warning('No csrf token set by Instagram API', video_id) + self.report_warning('No CSRF token set by Instagram API', video_id) else: csrf_token = csrf_token.value if api_check.get('status') == 'ok' else None if not csrf_token: self.report_warning('Instagram API is not granting access', video_id) - variables = { - 'shortcode': video_id, - 'child_comment_count': 3, - 'fetch_comment_count': 40, - 'parent_comment_count': 24, - 'has_threaded_comments': True, - } - general_info = self._download_json( - 'https://www.instagram.com/graphql/query/', video_id, fatal=False, errnote=False, - headers={ + response = self._download_json( + 'https://www.instagram.com/api/graphql', video_id, + impersonate=True, require_impersonation=True, + headers=filter_dict({ **self._api_headers, - 'X-CSRFToken': csrf_token or '', + 'X-FB-Friendly-Name': 'PolarisLoggedOutDesktopWWWPostRootContentQuery', + 'X-CSRFToken': csrf_token, + 'X-FB-LSD': self._lsd_token, 'X-Requested-With': 'XMLHttpRequest', 'Referer': url, - }, query={ - 'doc_id': '8845758582119845', - 'variables': json.dumps(variables, separators=(',', ':')), - }) + }), data=urlencode_postdata({ + 'av': '0', + '__d': 'www', + '__user': '0', + 'dpr': '1', + 'lsd': self._lsd_token, + 'fb_api_caller_class': 'RelayModern', + 'fb_api_req_friendly_name': 'PolarisLoggedOutDesktopWWWPostRootContentQuery', + 'server_timestamps': 'true', + 'variables': json.dumps({'media_id': media_id}, separators=(',', ':')), + 'doc_id': '27130156389949648', + })) - if not general_info: - self.report_warning('General metadata extraction failed (some metadata might be missing).', video_id) - webpage, urlh = self._download_webpage_handle(url, video_id) - shared_data = self._search_json( - r'window\._sharedData\s*=', webpage, 'shared data', video_id, fatal=False) or {} + media = traverse_obj(response, ('data', 'xig_polaris_media', {dict})) + product_info = traverse_obj(media, ('if_not_gated_logged_out', {dict})) + if not product_info: + error = join_nonempty('title', 'description', delim=': ', from_dict=api_check) + if 'Restricted Video' in error: + self.raise_login_required(error) + elif error: + raise ExtractorError(error, expected=True) + elif len(video_id) > 28: + # It's a private post (video_id == shortcode + 28 extra characters) + # Only raise after getting empty response; sometimes "long"-shortcode posts are public + self.raise_login_required( + 'This content is only available for registered users who follow this account') + raise ExtractorError( + 'Instagram sent an empty media response. Check if this post is accessible in your ' + f'browser without being logged-in. If it is not, then u{self._login_hint()[1:]}. ' + 'Otherwise, if the post is accessible in browser without being logged-in' + f'{bug_reports_message(before=",")}', expected=True) - if shared_data and self._LOGIN_URL not in urlh.url: - media.update(traverse_obj( - shared_data, ('entry_data', 'PostPage', 0, 'graphql', 'shortcode_media'), - ('entry_data', 'PostPage', 0, 'media'), expected_type=dict) or {}) - else: - self.report_warning('Main webpage is locked behind the login page. Retrying with embed webpage (some metadata might be missing).') - webpage = self._download_webpage( - f'{url}/embed/', video_id, note='Downloading embed webpage', fatal=False) or '' - additional_data = self._search_json( - r'window\.__additionalDataLoaded\s*\(\s*[^,]+,', webpage, 'additional data', video_id, fatal=False) - if not additional_data and not media: - self.raise_login_required('Requested content is not available, rate-limit reached or login required') + info_dict = self._extract_product(product_info, video_id=video_id, get_comments=False) + is_playlist = info_dict.get('_type') == 'playlist' + if not is_playlist and not info_dict.get('formats'): + self.raise_no_formats('There is no video in this post', expected=True) - product_item = traverse_obj(additional_data, ('items', 0), expected_type=dict) - if product_item: - media.update(product_item) - return self._extract_product(media) - - media.update(traverse_obj( - additional_data, ('graphql', 'shortcode_media'), 'shortcode_media', expected_type=dict) or {}) + comments = traverse_obj(media, ( + 'comments_connection', 'edges', lambda _, v: v['node']['text'], 'node', { + 'author': ('user', 'username', {str}), + 'author_id': ('user', 'pk', {str_or_none}), + 'id': ('pk', {str_or_none}), + 'text': ('text', {str}), + 'timestamp': ('created_at', {int_or_none}), + 'like_count': ('comment_like_count', {int_or_none}), + })) + if is_playlist: + for entry in info_dict['entries']: + entry['comments'] = comments else: - xdt_shortcode_media = traverse_obj(general_info, ('data', 'xdt_shortcode_media', {dict})) or {} - if not xdt_shortcode_media: - error = join_nonempty('title', 'description', delim=': ', from_dict=api_check) - if 'Restricted Video' in error: - self.raise_login_required(error) - elif error: - raise ExtractorError(error, expected=True) - elif len(video_id) > 28: - # It's a private post (video_id == shortcode + 28 extra characters) - # Only raise after getting empty response; sometimes "long"-shortcode posts are public - self.raise_login_required( - 'This content is only available for registered users who follow this account') - raise ExtractorError( - 'Instagram sent an empty media response. Check if this post is accessible in your ' - f'browser without being logged-in. If it is not, then u{self._login_hint()[1:]}. ' - 'Otherwise, if the post is accessible in browser without being logged-in' - f'{bug_reports_message(before=",")}', expected=True) - media.update(xdt_shortcode_media) + info_dict['comments'] = comments - username = traverse_obj(media, ('owner', 'username')) or self._search_regex( - r'"owner"\s*:\s*{\s*"username"\s*:\s*"(.+?)"', webpage, 'username', fatal=False) - - description = ( - traverse_obj(media, ('edge_media_to_caption', 'edges', 0, 'node', 'text'), expected_type=str) - or media.get('caption')) - if not description: - description = self._search_regex( - r'"caption"\s*:\s*"(.+?)"', webpage, 'description', default=None) - if description is not None: - description = lowercase_escape(description) - - video_url = media.get('video_url') - if not video_url: - nodes = traverse_obj(media, ('edge_sidecar_to_children', 'edges', ..., 'node'), expected_type=dict) or [] - if nodes: - return self.playlist_result( - self._extract_nodes(nodes, True), video_id, - format_field(username, None, 'Post by %s'), description) - raise ExtractorError('There is no video in this post', expected=True) - - formats = [{ - 'url': video_url, - 'width': self._get_dimension('width', media, webpage), - 'height': self._get_dimension('height', media, webpage), - }] - dash = traverse_obj(media, ('dash_info', 'video_dash_manifest')) - if dash: - formats.extend(self._parse_mpd_formats(self._parse_xml(dash, video_id), mpd_id='dash')) - - comment_data = traverse_obj(media, ('edge_media_to_parent_comment', 'edges')) - comments = [{ - 'author': traverse_obj(comment_dict, ('node', 'owner', 'username')), - 'author_id': traverse_obj(comment_dict, ('node', 'owner', 'id')), - 'id': traverse_obj(comment_dict, ('node', 'id')), - 'text': traverse_obj(comment_dict, ('node', 'text')), - 'timestamp': traverse_obj(comment_dict, ('node', 'created_at'), expected_type=int_or_none), - } for comment_dict in comment_data] if comment_data else None - - display_resources = ( - media.get('display_resources') - or [{'src': media.get(key)} for key in ('display_src', 'display_url')] - or [{'src': self._og_search_thumbnail(webpage)}]) - thumbnails = [{ - 'url': thumbnail['src'], - 'width': thumbnail.get('config_width'), - 'height': thumbnail.get('config_height'), - } for thumbnail in display_resources if thumbnail.get('src')] - - return { - 'id': video_id, - 'formats': formats, - 'title': media.get('title') or f'Video by {username}', - 'description': description, - 'duration': float_or_none(media.get('video_duration')), - 'timestamp': traverse_obj(media, 'taken_at_timestamp', 'date', expected_type=int_or_none), - 'uploader_id': traverse_obj(media, ('owner', 'id')), - 'uploader': traverse_obj(media, ('owner', 'full_name')), - 'channel': username, - 'like_count': self._get_count(media, 'likes', 'preview_like') or str_to_int(self._search_regex( - r'data-log-event="likeCountClick"[^>]*>[^\d]*([\d,\.]+)', webpage, 'like count', fatal=False)), - 'comment_count': self._get_count(media, 'comments', 'preview_comment', 'to_comment', 'to_parent_comment'), - 'comments': comments, - 'thumbnails': thumbnails, - 'http_headers': { - 'Referer': 'https://www.instagram.com/', - }, - } + return info_dict class InstagramPlaylistBaseIE(InstagramBaseIE): @@ -697,12 +671,12 @@ class InstagramStoryIE(InstagramBaseIE): if username == 'highlights' and not story_id: # story id is only mandatory for highlights raise ExtractorError('Input URL is missing a highlight ID', expected=True) display_id = story_id or username - story_info = self._download_webpage(url, display_id) + story_info = self._download_webpage(url, display_id, impersonate=self._is_web_app) user_info = self._search_json(r'"user":', story_info, 'user info', display_id, fatal=False) if not user_info: self.raise_login_required('This content is unreachable') - user_id = traverse_obj(user_info, 'pk', 'id', expected_type=str) + user_id = traverse_obj(user_info, 'pk', 'id', expected_type=str_or_none) if username == 'highlights': story_info_url = f'highlight:{story_id}' else: @@ -712,7 +686,8 @@ class InstagramStoryIE(InstagramBaseIE): videos = traverse_obj(self._download_json( f'{self._API_BASE_URL}/feed/reels_media/?reel_ids={story_info_url}', - display_id, errnote=False, fatal=False, headers=self._api_headers), 'reels') + display_id, errnote=False, fatal=False, impersonate=self._is_web_app, + headers=self._api_headers), 'reels') if not videos: self.raise_login_required('You need to log in to access this content') user_info = traverse_obj(videos, (user_id, 'user', {dict})) or {}