From bc83b4b06cd2648276c7f075754ace8be22f889a Mon Sep 17 00:00:00 2001 From: Lesmiscore Date: Mon, 18 Jul 2022 22:06:54 +0900 Subject: [PATCH] [extractor/AbemaTVTitle] Implement paging (#4376) Authored by: Lesmiscore --- yt_dlp/extractor/abematv.py | 232 ++++++++++++++++++++---------------- 1 file changed, 132 insertions(+), 100 deletions(-) diff --git a/yt_dlp/extractor/abematv.py b/yt_dlp/extractor/abematv.py index ec1af1d0c..d8ad78705 100644 --- a/yt_dlp/extractor/abematv.py +++ b/yt_dlp/extractor/abematv.py @@ -1,5 +1,6 @@ import base64 import binascii +import functools import hashlib import hmac import io @@ -20,11 +21,11 @@ decode_base_n, int_or_none, intlist_to_bytes, + OnDemandPagedList, request_to_url, time_seconds, traverse_obj, update_url_query, - urljoin, ) # NOTE: network handler related code is temporary thing until network stack overhaul PRs are merged (#2861/#2862) @@ -145,17 +146,106 @@ def abematv_license_open(self, url): class AbemaTVBaseIE(InfoExtractor): + _USERTOKEN = None + _DEVICE_ID = None + _MEDIATOKEN = None + + _SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe' + + @classmethod + def _generate_aks(cls, deviceid): + deviceid = deviceid.encode('utf-8') + # add 1 hour and then drop minute and secs + ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600) + time_struct = time.gmtime(ts_1hour) + ts_1hour_str = str(ts_1hour).encode('utf-8') + + tmp = None + + def mix_once(nonce): + nonlocal tmp + h = hmac.new(cls._SECRETKEY, digestmod=hashlib.sha256) + h.update(nonce) + tmp = h.digest() + + def mix_tmp(count): + nonlocal tmp + for i in range(count): + mix_once(tmp) + + def mix_twist(nonce): + nonlocal tmp + mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce) + + mix_once(cls._SECRETKEY) + mix_tmp(time_struct.tm_mon) + mix_twist(deviceid) + mix_tmp(time_struct.tm_mday % 5) + mix_twist(ts_1hour_str) + mix_tmp(time_struct.tm_hour % 5) + + return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8') + + def _get_device_token(self): + if self._USERTOKEN: + return self._USERTOKEN + + AbemaTVBaseIE._DEVICE_ID = str(uuid.uuid4()) + aks = self._generate_aks(self._DEVICE_ID) + user_data = self._download_json( + 'https://api.abema.io/v1/users', None, note='Authorizing', + data=json.dumps({ + 'deviceId': self._DEVICE_ID, + 'applicationKeySecret': aks, + }).encode('utf-8'), + headers={ + 'Content-Type': 'application/json', + }) + AbemaTVBaseIE._USERTOKEN = user_data['token'] + + # don't allow adding it 2 times or more, though it's guarded + remove_opener(self._downloader, AbemaLicenseHandler) + add_opener(self._downloader, AbemaLicenseHandler(self)) + + return self._USERTOKEN + + def _get_media_token(self, invalidate=False, to_show=True): + if not invalidate and self._MEDIATOKEN: + return self._MEDIATOKEN + + AbemaTVBaseIE._MEDIATOKEN = self._download_json( + 'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False, + query={ + 'osName': 'android', + 'osVersion': '6.0.1', + 'osLang': 'ja_JP', + 'osTimezone': 'Asia/Tokyo', + 'appId': 'tv.abema', + 'appVersion': '3.27.1' + }, headers={ + 'Authorization': f'bearer {self._get_device_token()}', + })['token'] + + return self._MEDIATOKEN + + def _call_api(self, endpoint, video_id, query=None, note='Downloading JSON metadata'): + return self._download_json( + f'https://api.abema.io/{endpoint}', video_id, query=query or {}, + note=note, + headers={ + 'Authorization': f'bearer {self._get_device_token()}', + }) + def _extract_breadcrumb_list(self, webpage, video_id): for jld in re.finditer( r'(?is)]+type=(["\']?)application/ld\+json\1[^>]*>(?P.+?)', webpage): jsonld = self._parse_json(jld.group('json_ld'), video_id, fatal=False) - if jsonld: - if jsonld.get('@type') != 'BreadcrumbList': - continue - trav = traverse_obj(jsonld, ('itemListElement', ..., 'name')) - if trav: - return trav + if traverse_obj(jsonld, '@type') != 'BreadcrumbList': + continue + items = traverse_obj(jsonld, ('itemListElement', ..., 'name')) + if items: + return items return [] @@ -207,87 +297,7 @@ class AbemaTVIE(AbemaTVBaseIE): }, 'skip': 'Not supported until yt-dlp implements native live downloader OR AbemaTV can start a local HTTP server', }] - _USERTOKEN = None - _DEVICE_ID = None _TIMETABLE = None - _MEDIATOKEN = None - - _SECRETKEY = b'v+Gjs=25Aw5erR!J8ZuvRrCx*rGswhB&qdHd_SYerEWdU&a?3DzN9BRbp5KwY4hEmcj5#fykMjJ=AuWz5GSMY-d@H7DMEh3M@9n2G552Us$$k9cD=3TxwWe86!x#Zyhe' - - def _generate_aks(self, deviceid): - deviceid = deviceid.encode('utf-8') - # add 1 hour and then drop minute and secs - ts_1hour = int((time_seconds(hours=9) // 3600 + 1) * 3600) - time_struct = time.gmtime(ts_1hour) - ts_1hour_str = str(ts_1hour).encode('utf-8') - - tmp = None - - def mix_once(nonce): - nonlocal tmp - h = hmac.new(self._SECRETKEY, digestmod=hashlib.sha256) - h.update(nonce) - tmp = h.digest() - - def mix_tmp(count): - nonlocal tmp - for i in range(count): - mix_once(tmp) - - def mix_twist(nonce): - nonlocal tmp - mix_once(base64.urlsafe_b64encode(tmp).rstrip(b'=') + nonce) - - mix_once(self._SECRETKEY) - mix_tmp(time_struct.tm_mon) - mix_twist(deviceid) - mix_tmp(time_struct.tm_mday % 5) - mix_twist(ts_1hour_str) - mix_tmp(time_struct.tm_hour % 5) - - return base64.urlsafe_b64encode(tmp).rstrip(b'=').decode('utf-8') - - def _get_device_token(self): - if self._USERTOKEN: - return self._USERTOKEN - - self._DEVICE_ID = str(uuid.uuid4()) - aks = self._generate_aks(self._DEVICE_ID) - user_data = self._download_json( - 'https://api.abema.io/v1/users', None, note='Authorizing', - data=json.dumps({ - 'deviceId': self._DEVICE_ID, - 'applicationKeySecret': aks, - }).encode('utf-8'), - headers={ - 'Content-Type': 'application/json', - }) - self._USERTOKEN = user_data['token'] - - # don't allow adding it 2 times or more, though it's guarded - remove_opener(self._downloader, AbemaLicenseHandler) - add_opener(self._downloader, AbemaLicenseHandler(self)) - - return self._USERTOKEN - - def _get_media_token(self, invalidate=False, to_show=True): - if not invalidate and self._MEDIATOKEN: - return self._MEDIATOKEN - - self._MEDIATOKEN = self._download_json( - 'https://api.abema.io/v1/media/token', None, note='Fetching media token' if to_show else False, - query={ - 'osName': 'android', - 'osVersion': '6.0.1', - 'osLang': 'ja_JP', - 'osTimezone': 'Asia/Tokyo', - 'appId': 'tv.abema', - 'appVersion': '3.27.1' - }, headers={ - 'Authorization': 'bearer ' + self._get_device_token() - })['token'] - - return self._MEDIATOKEN def _perform_login(self, username, password): if '@' in username: # don't strictly check if it's email address or not @@ -301,13 +311,13 @@ def _perform_login(self, username, password): method: username, 'password': password }).encode('utf-8'), headers={ - 'Authorization': 'bearer ' + self._get_device_token(), + 'Authorization': f'bearer {self._get_device_token()}', 'Origin': 'https://abema.tv', 'Referer': 'https://abema.tv/', 'Content-Type': 'application/json', }) - self._USERTOKEN = login_response['token'] + AbemaTVBaseIE._USERTOKEN = login_response['token'] self._get_media_token(True) def _real_extract(self, url): @@ -442,6 +452,7 @@ def _real_extract(self, url): class AbemaTVTitleIE(AbemaTVBaseIE): _VALID_URL = r'https?://abema\.tv/video/title/(?P[^?/]+)' + _PAGE_SIZE = 25 _TESTS = [{ 'url': 'https://abema.tv/video/title/90-1597', @@ -457,18 +468,39 @@ class AbemaTVTitleIE(AbemaTVBaseIE): 'title': '真心が届く~僕とスターのオフィス・ラブ!?~', }, 'playlist_mincount': 16, + }, { + 'url': 'https://abema.tv/video/title/25-102', + 'info_dict': { + 'id': '25-102', + 'title': 'ソードアート・オンライン アリシゼーション', + }, + 'playlist_mincount': 24, }] + def _fetch_page(self, playlist_id, series_version, page): + programs = self._call_api( + f'v1/video/series/{playlist_id}/programs', playlist_id, + note=f'Downloading page {page + 1}', + query={ + 'seriesVersion': series_version, + 'offset': str(page * self._PAGE_SIZE), + 'order': 'seq', + 'limit': str(self._PAGE_SIZE), + }) + yield from ( + self.url_result(f'https://abema.tv/video/episode/{x}') + for x in traverse_obj(programs, ('programs', ..., 'id'), default=[])) + + def _entries(self, playlist_id, series_version): + return OnDemandPagedList( + functools.partial(self._fetch_page, playlist_id, series_version), + self._PAGE_SIZE) + def _real_extract(self, url): - video_id = self._match_id(url) - webpage = self._download_webpage(url, video_id) + playlist_id = self._match_id(url) + series_info = self._call_api(f'v1/video/series/{playlist_id}', playlist_id) - playlist_title, breadcrumb = None, self._extract_breadcrumb_list(webpage, video_id) - if breadcrumb: - playlist_title = breadcrumb[-1] - - playlist = [ - self.url_result(urljoin('https://abema.tv/', mobj.group(1))) - for mobj in re.finditer(r'