[ie/gem.cbc.ca] Fix login support (#12414)

Closes #12406
Authored by: bashonly
This commit is contained in:
bashonly 2025-02-23 03:56:47 -06:00 committed by GitHub
parent 6933f5670c
commit eb1417786a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,17 +1,17 @@
import base64
import functools
import json
import re
import time
import urllib.parse
from .common import InfoExtractor
from ..networking import HEADRequest
from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
float_or_none,
int_or_none,
js_to_json,
jwt_decode_hs256,
mimetype2ext,
orderedSet,
parse_age_limit,
@ -24,6 +24,7 @@
update_url,
url_basename,
url_or_none,
urlencode_postdata,
)
from ..utils.traversal import require, traverse_obj, trim_str
@ -608,66 +609,82 @@ class CBCGemIE(CBCGemBaseIE):
'only_matching': True,
}]
_TOKEN_API_KEY = '3f4beddd-2061-49b0-ae80-6f1f2ed65b37'
_CLIENT_ID = 'fc05b0ee-3865-4400-a3cc-3da82c330c23'
_refresh_token = None
_access_token = None
_claims_token = None
def _new_claims_token(self, email, password):
data = json.dumps({
'email': email,
'password': password,
}).encode()
headers = {'content-type': 'application/json'}
query = {'apikey': self._TOKEN_API_KEY}
resp = self._download_json('https://api.loginradius.com/identity/v2/auth/login',
None, data=data, headers=headers, query=query)
access_token = resp['access_token']
@functools.cached_property
def _ropc_settings(self):
return self._download_json(
'https://services.radio-canada.ca/ott/catalog/v1/gem/settings', None,
'Downloading site settings', query={'device': 'web'})['identityManagement']['ropc']
query = {
'access_token': access_token,
'apikey': self._TOKEN_API_KEY,
'jwtapp': 'jwt',
}
resp = self._download_json('https://cloud-api.loginradius.com/sso/jwt/api/token',
None, headers=headers, query=query)
sig = resp['signature']
def _is_jwt_expired(self, token):
return jwt_decode_hs256(token)['exp'] - time.time() < 300
data = json.dumps({'jwt': sig}).encode()
headers = {'content-type': 'application/json', 'ott-device-type': 'web'}
resp = self._download_json('https://services.radio-canada.ca/ott/cbc-api/v2/token',
None, data=data, headers=headers, expected_status=426)
cbc_access_token = resp['accessToken']
def _call_oauth_api(self, oauth_data, note='Refreshing access token'):
response = self._download_json(
self._ropc_settings['url'], None, note, data=urlencode_postdata({
'client_id': self._CLIENT_ID,
**oauth_data,
'scope': self._ropc_settings['scopes'],
}))
self._refresh_token = response['refresh_token']
self._access_token = response['access_token']
self.cache.store(self._NETRC_MACHINE, 'token_data', [self._refresh_token, self._access_token])
headers = {'content-type': 'application/json', 'ott-device-type': 'web', 'ott-access-token': cbc_access_token}
resp = self._download_json('https://services.radio-canada.ca/ott/cbc-api/v2/profile',
None, headers=headers, expected_status=426)
return resp['claimsToken']
def _perform_login(self, username, password):
if not self._refresh_token:
self._refresh_token, self._access_token = self.cache.load(
self._NETRC_MACHINE, 'token_data', default=[None, None])
def _get_claims_token_expiry(self):
# Token is a JWT
# JWT is decoded here and 'exp' field is extracted
# It is a Unix timestamp for when the token expires
b64_data = self._claims_token.split('.')[1]
data = base64.urlsafe_b64decode(b64_data + '==')
return json.loads(data)['exp']
def claims_token_expired(self):
exp = self._get_claims_token_expiry()
# It will expire in less than 10 seconds, or has already expired
return exp - time.time() < 10
def claims_token_valid(self):
return self._claims_token is not None and not self.claims_token_expired()
def _get_claims_token(self, email, password):
if not self.claims_token_valid():
self._claims_token = self._new_claims_token(email, password)
self.cache.store(self._NETRC_MACHINE, 'claims_token', self._claims_token)
return self._claims_token
def _real_initialize(self):
if self.claims_token_valid():
return
if self._refresh_token and self._access_token:
self.write_debug('Using cached refresh token')
if not self._claims_token:
self._claims_token = self.cache.load(self._NETRC_MACHINE, 'claims_token')
return
try:
self._call_oauth_api({
'grant_type': 'password',
'username': username,
'password': password,
}, note='Logging in')
except ExtractorError as e:
if isinstance(e.cause, HTTPError) and e.cause.status == 400:
raise ExtractorError('Invalid username and/or password', expected=True)
raise
def _fetch_access_token(self):
if self._is_jwt_expired(self._access_token):
try:
self._call_oauth_api({
'grant_type': 'refresh_token',
'refresh_token': self._refresh_token,
})
except ExtractorError:
self._refresh_token, self._access_token = None, None
self.cache.store(self._NETRC_MACHINE, 'token_data', [None, None])
self.report_warning('Refresh token has been invalidated; retrying with credentials')
self._perform_login(*self._get_login_info())
return self._access_token
def _fetch_claims_token(self):
if not self._get_login_info()[0]:
return None
if not self._claims_token or self._is_jwt_expired(self._claims_token):
self._claims_token = self._download_json(
'https://services.radio-canada.ca/ott/subscription/v2/gem/Subscriber/profile',
None, 'Downloading claims token', query={'device': 'web'},
headers={'Authorization': f'Bearer {self._fetch_access_token()}'})['claimsToken']
self.cache.store(self._NETRC_MACHINE, 'claims_token', self._claims_token)
else:
self.write_debug('Using cached claims token')
return self._claims_token
def _real_extract(self, url):
video_id, season_number = self._match_valid_url(url).group('id', 'season')
@ -675,14 +692,10 @@ def _real_extract(self, url):
item_info = traverse_obj(video_info, (
'content', ..., 'lineups', ..., 'items',
lambda _, v: v['url'] == video_id, any, {require('item info')}))
media_id = item_info['idMedia']
email, password = self._get_login_info()
if email and password:
claims_token = self._get_claims_token(email, password)
headers = {'x-claims-token': claims_token}
else:
headers = {}
if claims_token := self._fetch_claims_token():
headers['x-claims-token'] = claims_token
m3u8_info = self._download_json(
'https://services.radio-canada.ca/media/validation/v2/',
@ -695,7 +708,7 @@ def _real_extract(self, url):
'tech': 'hls',
'manifestVersion': '2',
'manifestType': 'desktop',
'idMedia': media_id,
'idMedia': item_info['idMedia'],
})
if m3u8_info.get('errorCode') == 1: