fix: youtube: Polymer UI and JSON endpoints for playlists

We already had a few copies of Polymer-style pagination handling logic
for certain circumstances, but now we're forced into using it for all
playlists since we can no longer disable Polymer. Refactor the logic to
move it to the parent class for all entry lists (including e.g. search
results, feeds, and list of playlists), and generify a bit to cover the
child classes' use cases.
This commit is contained in:
Luc Ritchie 2020-11-10 03:38:26 -05:00
parent 651bae3d23
commit 9833e7a015

View File

@ -36,6 +36,7 @@
get_element_by_attribute, get_element_by_attribute,
get_element_by_id, get_element_by_id,
int_or_none, int_or_none,
js_to_json,
mimetype2ext, mimetype2ext,
orderedSet, orderedSet,
parse_codecs, parse_codecs,
@ -70,6 +71,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
_LOGIN_REQUIRED = False _LOGIN_REQUIRED = False
_PLAYLIST_ID_RE = r'(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}' _PLAYLIST_ID_RE = r'(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}'
_INITIAL_DATA_RE = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_YTCFG_DATA_RE = r"ytcfg.set\(({.*?})\)"
_YOUTUBE_CLIENT_HEADERS = { _YOUTUBE_CLIENT_HEADERS = {
'x-youtube-client-name': '1', 'x-youtube-client-name': '1',
@ -274,7 +277,6 @@ def warn(message):
def _download_webpage_handle(self, *args, **kwargs): def _download_webpage_handle(self, *args, **kwargs):
query = kwargs.get('query', {}).copy() query = kwargs.get('query', {}).copy()
query['disable_polymer'] = 'true'
kwargs['query'] = query kwargs['query'] = query
return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle( return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle(
*args, **compat_kwargs(kwargs)) *args, **compat_kwargs(kwargs))
@ -297,15 +299,60 @@ def _real_initialize(self):
class YoutubeEntryListBaseInfoExtractor(YoutubeBaseInfoExtractor): class YoutubeEntryListBaseInfoExtractor(YoutubeBaseInfoExtractor):
# Extract entries from page with "Load more" button def _find_entries_in_json(self, extracted):
entries = []
c = {}
def _real_find(obj):
if obj is None or isinstance(obj, str):
return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if self._is_entry(obj):
entries.append(obj)
return
if 'continuationCommand' in obj:
c['continuation'] = obj
return
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return entries, try_get(c, lambda x: x["continuation"])
def _entries(self, page, playlist_id): def _entries(self, page, playlist_id):
more_widget_html = content_html = page seen = []
yt_conf = {}
for m in re.finditer(self._YTCFG_DATA_RE, page):
parsed = self._parse_json(m.group(1), playlist_id,
transform_source=js_to_json, fatal=False)
if parsed:
yt_conf.update(parsed)
data_json = self._parse_json(self._search_regex(self._INITIAL_DATA_RE, page, 'ytInitialData'), None)
for page_num in itertools.count(1): for page_num in itertools.count(1):
for entry in self._process_page(content_html): entries, continuation = self._find_entries_in_json(data_json)
processed = self._process_entries(entries, seen)
if not processed:
break
for entry in processed:
yield entry yield entry
mobj = re.search(r'data-uix-load-more-href="/?(?P<more>[^"]+)"', more_widget_html) if not continuation or not yt_conf:
if not mobj: break
continuation_token = try_get(continuation, lambda x: x['continuationCommand']['token'])
continuation_url = try_get(continuation, lambda x: x['commandMetadata']['webCommandMetadata']['apiUrl'])
if not continuation_token or not continuation_url:
break break
count = 0 count = 0
@ -314,12 +361,22 @@ def _entries(self, page, playlist_id):
try: try:
# Downloading page may result in intermittent 5xx HTTP error # Downloading page may result in intermittent 5xx HTTP error
# that is usually worked around with a retry # that is usually worked around with a retry
more = self._download_json( data_json = self._download_json(
'https://www.youtube.com/%s' % mobj.group('more'), playlist_id, 'https://www.youtube.com%s' % continuation_url,
'Downloading page #%s%s' playlist_id,
% (page_num, ' (retry #%d)' % count if count else ''), 'Downloading page #%s%s' % (page_num, ' (retry #%d)' % count if count else ''),
transform_source=uppercase_escape, transform_source=uppercase_escape,
headers=self._YOUTUBE_CLIENT_HEADERS) query={
'key': try_get(yt_conf, lambda x: x['INNERTUBE_API_KEY'])
},
data=bytes(json.dumps({
'context': try_get(yt_conf, lambda x: x['INNERTUBE_CONTEXT']),
'continuation': continuation_token
}), encoding='utf-8'),
headers={
'Content-Type': 'application/json'
}
)
break break
except ExtractorError as e: except ExtractorError as e:
if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503): if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503):
@ -328,31 +385,30 @@ def _entries(self, page, playlist_id):
continue continue
raise raise
content_html = more['content_html'] def _extract_title(self, renderer):
if not content_html.strip(): title = try_get(renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
# Some webpages show a "Load more" button but they don't if title:
# have more videos return title
break return try_get(renderer, lambda x: x['title']['simpleText'], compat_str)
more_widget_html = more['load_more_widget_html']
class YoutubePlaylistBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor): class YoutubePlaylistBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor):
def _process_page(self, content): def _is_entry(self, obj):
for video_id, video_title in self.extract_videos_from_page(content): return 'videoId' in obj
yield self.url_result(video_id, 'Youtube', video_id, video_title)
def extract_videos_from_page_impl(self, video_re, page, ids_in_page, titles_in_page): def _process_entries(self, entries, seen):
for mobj in re.finditer(video_re, page): ids_in_page = []
# The link with index 0 is not the first video of the playlist (not sure if still actual) titles_in_page = []
if 'index' in mobj.groupdict() and mobj.group('id') == '0': for renderer in entries:
video_id = try_get(renderer, lambda x: x['videoId'])
video_title = self._extract_title(renderer)
if video_id is None or video_title is None:
# we do not have a videoRenderer or title extraction broke
continue continue
video_id = mobj.group('id')
video_title = unescapeHTML(
mobj.group('title')) if 'title' in mobj.groupdict() else None
if video_title:
video_title = video_title.strip() video_title = video_title.strip()
if video_title == '► Play all':
video_title = None
try: try:
idx = ids_in_page.index(video_id) idx = ids_in_page.index(video_id)
if video_title and not titles_in_page[idx]: if video_title and not titles_in_page[idx]:
@ -361,19 +417,16 @@ def extract_videos_from_page_impl(self, video_re, page, ids_in_page, titles_in_p
ids_in_page.append(video_id) ids_in_page.append(video_id)
titles_in_page.append(video_title) titles_in_page.append(video_title)
def extract_videos_from_page(self, page): for video_id, video_title in zip(ids_in_page, titles_in_page):
ids_in_page = [] yield self.url_result(video_id, 'Youtube', video_id, video_title)
titles_in_page = []
self.extract_videos_from_page_impl(
self._VIDEO_RE, page, ids_in_page, titles_in_page)
return zip(ids_in_page, titles_in_page)
class YoutubePlaylistsBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor): class YoutubePlaylistsBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor):
def _process_page(self, content): def _is_entry(self, obj):
for playlist_id in orderedSet(re.findall( return 'playlistId' in obj
r'<h3[^>]+class="[^"]*yt-lockup-title[^"]*"[^>]*><a[^>]+href="/?playlist\?list=([0-9A-Za-z-_]{10,})"',
content)): def _process_entries(self, entries, seen):
for playlist_id in orderedSet(try_get(r, lambda x: x['playlistId']) for r in entries):
yield self.url_result( yield self.url_result(
'https://www.youtube.com/playlist?list=%s' % playlist_id, 'YoutubePlaylist') 'https://www.youtube.com/playlist?list=%s' % playlist_id, 'YoutubePlaylist')
@ -3240,11 +3293,7 @@ class YoutubePlaylistsIE(YoutubePlaylistsBaseInfoExtractor):
}] }]
class YoutubeSearchBaseInfoExtractor(YoutubePlaylistBaseInfoExtractor): class YoutubeSearchIE(SearchInfoExtractor, YoutubePlaylistBaseInfoExtractor):
_VIDEO_RE = r'href="\s*/watch\?v=(?P<id>[0-9A-Za-z_-]{11})(?:[^"]*"[^>]+\btitle="(?P<title>[^"]+))?'
class YoutubeSearchIE(SearchInfoExtractor, YoutubeSearchBaseInfoExtractor):
IE_DESC = 'YouTube.com searches' IE_DESC = 'YouTube.com searches'
# there doesn't appear to be a real limit, for example if you search for # there doesn't appear to be a real limit, for example if you search for
# 'python' you get more than 8.000.000 results # 'python' you get more than 8.000.000 results
@ -3341,11 +3390,10 @@ class YoutubeSearchDateIE(YoutubeSearchIE):
_SEARCH_PARAMS = 'CAI%3D' _SEARCH_PARAMS = 'CAI%3D'
class YoutubeSearchURLIE(YoutubeSearchBaseInfoExtractor): class YoutubeSearchURLIE(YoutubePlaylistBaseInfoExtractor):
IE_DESC = 'YouTube.com search URLs' IE_DESC = 'YouTube.com search URLs'
IE_NAME = 'youtube:search_url' IE_NAME = 'youtube:search_url'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?P<query>[^&]+)(?:[&]|$)' _VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?P<query>[^&]+)(?:[&]|$)'
_SEARCH_DATA = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_TESTS = [{ _TESTS = [{
'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video', 'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video',
'playlist_mincount': 5, 'playlist_mincount': 5,
@ -3357,28 +3405,14 @@ class YoutubeSearchURLIE(YoutubeSearchBaseInfoExtractor):
'only_matching': True, 'only_matching': True,
}] }]
def _find_videos_in_json(self, extracted): def _process_json_dict(self, obj, videos, c):
videos = []
def _real_find(obj):
if obj is None or isinstance(obj, str):
return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if "videoId" in obj: if "videoId" in obj:
videos.append(obj) videos.append(obj)
return return
for _, o in obj.items(): if "nextContinuationData" in obj:
_real_find(o) c["continuation"] = obj["nextContinuationData"]
return
_real_find(extracted)
return videos
def extract_videos_from_page_impl(self, page, ids_in_page, titles_in_page): def extract_videos_from_page_impl(self, page, ids_in_page, titles_in_page):
search_response = self._parse_json(self._search_regex(self._SEARCH_DATA, page, 'ytInitialData'), None) search_response = self._parse_json(self._search_regex(self._SEARCH_DATA, page, 'ytInitialData'), None)
@ -3413,7 +3447,8 @@ def _real_extract(self, url):
mobj = re.match(self._VALID_URL, url) mobj = re.match(self._VALID_URL, url)
query = compat_urllib_parse_unquote_plus(mobj.group('query')) query = compat_urllib_parse_unquote_plus(mobj.group('query'))
webpage = self._download_webpage(url, query) webpage = self._download_webpage(url, query)
return self.playlist_result(self._process_page(webpage), playlist_title=query) data_json = self._process_initial_data(webpage)
return self.playlist_result(self._process_data(data_json), playlist_title=query)
class YoutubeShowIE(YoutubePlaylistsBaseInfoExtractor): class YoutubeShowIE(YoutubePlaylistsBaseInfoExtractor):
@ -3435,14 +3470,12 @@ def _real_extract(self, url):
'https://www.youtube.com/show/%s/playlists' % playlist_id) 'https://www.youtube.com/show/%s/playlists' % playlist_id)
class YoutubeFeedsInfoExtractor(YoutubeBaseInfoExtractor): class YoutubeFeedsInfoExtractor(YoutubePlaylistBaseInfoExtractor):
""" """
Base class for feed extractors Base class for feed extractors
Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties. Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties.
""" """
_LOGIN_REQUIRED = True _LOGIN_REQUIRED = True
_FEED_DATA = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_YTCFG_DATA = r"ytcfg.set\(({.*?})\)"
@property @property
def IE_NAME(self): def IE_NAME(self):
@ -3451,53 +3484,15 @@ def IE_NAME(self):
def _real_initialize(self): def _real_initialize(self):
self._login() self._login()
def _find_videos_in_json(self, extracted): def _process_entries(self, entries, seen):
videos = []
c = {}
def _real_find(obj):
if obj is None or isinstance(obj, str):
return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if "videoId" in obj:
videos.append(obj)
return
if "nextContinuationData" in obj:
c["continuation"] = obj["nextContinuationData"]
return
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return videos, try_get(c, lambda x: x["continuation"])
def _entries(self, page):
info = []
yt_conf = self._parse_json(self._search_regex(self._YTCFG_DATA, page, 'ytcfg.set', default="null"), None, fatal=False)
search_response = self._parse_json(self._search_regex(self._FEED_DATA, page, 'ytInitialData'), None)
for page_num in itertools.count(1):
video_info, continuation = self._find_videos_in_json(search_response)
new_info = [] new_info = []
for v in entries:
for v in video_info:
v_id = try_get(v, lambda x: x['videoId']) v_id = try_get(v, lambda x: x['videoId'])
if not v_id: if not v_id:
continue continue
have_video = False have_video = False
for old in info: for old in seen:
if old['videoId'] == v_id: if old['videoId'] == v_id:
have_video = True have_video = True
break break
@ -3506,41 +3501,18 @@ def _entries(self, page):
new_info.append(v) new_info.append(v)
if not new_info: if not new_info:
break return
info.extend(new_info)
seen.extend(new_info)
for video in new_info: for video in new_info:
yield self.url_result(try_get(video, lambda x: x['videoId']), YoutubeIE.ie_key(), video_title=try_get(video, lambda x: x['title']['runs'][0]['text']) or try_get(video, lambda x: x['title']['simpleText'])) yield self.url_result(try_get(video, lambda x: x['videoId']), YoutubeIE.ie_key(), video_title=self._extract_title(video))
if not continuation or not yt_conf:
break
search_response = self._download_json(
'https://www.youtube.com/browse_ajax', self._PLAYLIST_TITLE,
'Downloading page #%s' % page_num,
transform_source=uppercase_escape,
query={
"ctoken": try_get(continuation, lambda x: x["continuation"]),
"continuation": try_get(continuation, lambda x: x["continuation"]),
"itct": try_get(continuation, lambda x: x["clickTrackingParams"])
},
headers={
"X-YouTube-Client-Name": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_NAME"]),
"X-YouTube-Client-Version": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_VERSION"]),
"X-Youtube-Identity-Token": try_get(yt_conf, lambda x: x["ID_TOKEN"]),
"X-YouTube-Device": try_get(yt_conf, lambda x: x["DEVICE"]),
"X-YouTube-Page-CL": try_get(yt_conf, lambda x: x["PAGE_CL"]),
"X-YouTube-Page-Label": try_get(yt_conf, lambda x: x["PAGE_BUILD_LABEL"]),
"X-YouTube-Variants-Checksum": try_get(yt_conf, lambda x: x["VARIANTS_CHECKSUM"]),
})
def _real_extract(self, url): def _real_extract(self, url):
page = self._download_webpage( page = self._download_webpage(
'https://www.youtube.com/feed/%s' % self._FEED_NAME, 'https://www.youtube.com/feed/%s' % self._FEED_NAME,
self._PLAYLIST_TITLE) self._PLAYLIST_TITLE)
return self.playlist_result( return self.playlist_result(self._entries(page, self._PLAYLIST_TITLE),
self._entries(page), playlist_title=self._PLAYLIST_TITLE) playlist_title=self._PLAYLIST_TITLE)
class YoutubeWatchLaterIE(YoutubePlaylistIE): class YoutubeWatchLaterIE(YoutubePlaylistIE):