mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2026-04-30 12:36:03 +00:00
Deprecate bytes_to_intlist/intlist_to_bytes
This commit is contained in:
parent
a6783a3b99
commit
3a91f39558
@ -11,13 +11,12 @@ import codecs
|
|||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from yt_dlp.aes import aes_encrypt, key_expansion
|
from yt_dlp.aes import aes_encrypt, key_expansion
|
||||||
from yt_dlp.utils import intlist_to_bytes
|
|
||||||
|
|
||||||
secret_msg = b'Secret message goes here'
|
secret_msg = b'Secret message goes here'
|
||||||
|
|
||||||
|
|
||||||
def hex_str(int_list):
|
def hex_str(int_list):
|
||||||
return codecs.encode(intlist_to_bytes(int_list), 'hex')
|
return codecs.encode(bytes(int_list), 'hex')
|
||||||
|
|
||||||
|
|
||||||
def openssl_encode(algo, key, iv):
|
def openssl_encode(algo, key, iv):
|
||||||
|
|||||||
@ -313,6 +313,8 @@ banned-from = [
|
|||||||
"yt_dlp.compat.compat_urllib_parse_urlparse".msg = "Use `urllib.parse.urlparse` instead."
|
"yt_dlp.compat.compat_urllib_parse_urlparse".msg = "Use `urllib.parse.urlparse` instead."
|
||||||
"yt_dlp.compat.compat_shlex_quote".msg = "Use `yt_dlp.utils.shell_quote` instead."
|
"yt_dlp.compat.compat_shlex_quote".msg = "Use `yt_dlp.utils.shell_quote` instead."
|
||||||
"yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead."
|
"yt_dlp.utils.error_to_compat_str".msg = "Use `str` instead."
|
||||||
|
"yt_dlp.utils.bytes_to_intlist".msg = "Use `list` instead."
|
||||||
|
"yt_dlp.utils.intlist_to_bytes".msg = "Use `bytes` instead."
|
||||||
|
|
||||||
[tool.autopep8]
|
[tool.autopep8]
|
||||||
max_line_length = 120
|
max_line_length = 120
|
||||||
|
|||||||
@ -27,7 +27,6 @@ from yt_dlp.aes import (
|
|||||||
pad_block,
|
pad_block,
|
||||||
)
|
)
|
||||||
from yt_dlp.dependencies import Cryptodome
|
from yt_dlp.dependencies import Cryptodome
|
||||||
from yt_dlp.utils import bytes_to_intlist, intlist_to_bytes
|
|
||||||
|
|
||||||
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
|
# the encrypted data can be generate with 'devscripts/generate_aes_testdata.py'
|
||||||
|
|
||||||
@ -40,33 +39,33 @@ class TestAES(unittest.TestCase):
|
|||||||
def test_encrypt(self):
|
def test_encrypt(self):
|
||||||
msg = b'message'
|
msg = b'message'
|
||||||
key = list(range(16))
|
key = list(range(16))
|
||||||
encrypted = aes_encrypt(bytes_to_intlist(msg), key)
|
encrypted = aes_encrypt(list(msg), key)
|
||||||
decrypted = intlist_to_bytes(aes_decrypt(encrypted, key))
|
decrypted = bytes(aes_decrypt(encrypted, key))
|
||||||
self.assertEqual(decrypted, msg)
|
self.assertEqual(decrypted, msg)
|
||||||
|
|
||||||
def test_cbc_decrypt(self):
|
def test_cbc_decrypt(self):
|
||||||
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
|
data = b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\x27\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd'
|
||||||
decrypted = intlist_to_bytes(aes_cbc_decrypt(bytes_to_intlist(data), self.key, self.iv))
|
decrypted = bytes(aes_cbc_decrypt(list(data), self.key, self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
if Cryptodome.AES:
|
if Cryptodome.AES:
|
||||||
decrypted = aes_cbc_decrypt_bytes(data, intlist_to_bytes(self.key), intlist_to_bytes(self.iv))
|
decrypted = aes_cbc_decrypt_bytes(data, bytes(self.key), bytes(self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
||||||
def test_cbc_encrypt(self):
|
def test_cbc_encrypt(self):
|
||||||
data = bytes_to_intlist(self.secret_msg)
|
data = list(self.secret_msg)
|
||||||
encrypted = intlist_to_bytes(aes_cbc_encrypt(data, self.key, self.iv))
|
encrypted = bytes(aes_cbc_encrypt(data, self.key, self.iv))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
encrypted,
|
encrypted,
|
||||||
b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd')
|
b'\x97\x92+\xe5\x0b\xc3\x18\x91ky9m&\xb3\xb5@\xe6\'\xc2\x96.\xc8u\x88\xab9-[\x9e|\xf1\xcd')
|
||||||
|
|
||||||
def test_ctr_decrypt(self):
|
def test_ctr_decrypt(self):
|
||||||
data = bytes_to_intlist(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
|
data = list(b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
|
||||||
decrypted = intlist_to_bytes(aes_ctr_decrypt(data, self.key, self.iv))
|
decrypted = bytes(aes_ctr_decrypt(data, self.key, self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
||||||
def test_ctr_encrypt(self):
|
def test_ctr_encrypt(self):
|
||||||
data = bytes_to_intlist(self.secret_msg)
|
data = list(self.secret_msg)
|
||||||
encrypted = intlist_to_bytes(aes_ctr_encrypt(data, self.key, self.iv))
|
encrypted = bytes(aes_ctr_encrypt(data, self.key, self.iv))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
encrypted,
|
encrypted,
|
||||||
b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
|
b'\x03\xc7\xdd\xd4\x8e\xb3\xbc\x1a*O\xdc1\x12+8Aio\xd1z\xb5#\xaf\x08')
|
||||||
@ -75,47 +74,47 @@ class TestAES(unittest.TestCase):
|
|||||||
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f.\x08\xb4T\xe4/\x17\xbd'
|
data = b'\x159Y\xcf5eud\x90\x9c\x85&]\x14\x1d\x0f.\x08\xb4T\xe4/\x17\xbd'
|
||||||
authentication_tag = b'\xe8&I\x80rI\x07\x9d}YWuU@:e'
|
authentication_tag = b'\xe8&I\x80rI\x07\x9d}YWuU@:e'
|
||||||
|
|
||||||
decrypted = intlist_to_bytes(aes_gcm_decrypt_and_verify(
|
decrypted = bytes(aes_gcm_decrypt_and_verify(
|
||||||
bytes_to_intlist(data), self.key, bytes_to_intlist(authentication_tag), self.iv[:12]))
|
list(data), self.key, list(authentication_tag), self.iv[:12]))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
if Cryptodome.AES:
|
if Cryptodome.AES:
|
||||||
decrypted = aes_gcm_decrypt_and_verify_bytes(
|
decrypted = aes_gcm_decrypt_and_verify_bytes(
|
||||||
data, intlist_to_bytes(self.key), authentication_tag, intlist_to_bytes(self.iv[:12]))
|
data, bytes(self.key), authentication_tag, bytes(self.iv[:12]))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
||||||
def test_decrypt_text(self):
|
def test_decrypt_text(self):
|
||||||
password = intlist_to_bytes(self.key).decode()
|
password = bytes(self.key).decode()
|
||||||
encrypted = base64.b64encode(
|
encrypted = base64.b64encode(
|
||||||
intlist_to_bytes(self.iv[:8])
|
bytes(self.iv[:8])
|
||||||
+ b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae',
|
+ b'\x17\x15\x93\xab\x8d\x80V\xcdV\xe0\t\xcdo\xc2\xa5\xd8ksM\r\xe27N\xae',
|
||||||
).decode()
|
).decode()
|
||||||
decrypted = (aes_decrypt_text(encrypted, password, 16))
|
decrypted = (aes_decrypt_text(encrypted, password, 16))
|
||||||
self.assertEqual(decrypted, self.secret_msg)
|
self.assertEqual(decrypted, self.secret_msg)
|
||||||
|
|
||||||
password = intlist_to_bytes(self.key).decode()
|
password = bytes(self.key).decode()
|
||||||
encrypted = base64.b64encode(
|
encrypted = base64.b64encode(
|
||||||
intlist_to_bytes(self.iv[:8])
|
bytes(self.iv[:8])
|
||||||
+ b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83',
|
+ b'\x0b\xe6\xa4\xd9z\x0e\xb8\xb9\xd0\xd4i_\x85\x1d\x99\x98_\xe5\x80\xe7.\xbf\xa5\x83',
|
||||||
).decode()
|
).decode()
|
||||||
decrypted = (aes_decrypt_text(encrypted, password, 32))
|
decrypted = (aes_decrypt_text(encrypted, password, 32))
|
||||||
self.assertEqual(decrypted, self.secret_msg)
|
self.assertEqual(decrypted, self.secret_msg)
|
||||||
|
|
||||||
def test_ecb_encrypt(self):
|
def test_ecb_encrypt(self):
|
||||||
data = bytes_to_intlist(self.secret_msg)
|
data = list(self.secret_msg)
|
||||||
encrypted = intlist_to_bytes(aes_ecb_encrypt(data, self.key))
|
encrypted = bytes(aes_ecb_encrypt(data, self.key))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
encrypted,
|
encrypted,
|
||||||
b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
|
b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
|
||||||
|
|
||||||
def test_ecb_decrypt(self):
|
def test_ecb_decrypt(self):
|
||||||
data = bytes_to_intlist(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
|
data = list(b'\xaa\x86]\x81\x97>\x02\x92\x9d\x1bR[[L/u\xd3&\xd1(h\xde{\x81\x94\xba\x02\xae\xbd\xa6\xd0:')
|
||||||
decrypted = intlist_to_bytes(aes_ecb_decrypt(data, self.key, self.iv))
|
decrypted = bytes(aes_ecb_decrypt(data, self.key, self.iv))
|
||||||
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
self.assertEqual(decrypted.rstrip(b'\x08'), self.secret_msg)
|
||||||
|
|
||||||
def test_key_expansion(self):
|
def test_key_expansion(self):
|
||||||
key = '4f6bdaa39e2f8cb07f5e722d9edef314'
|
key = '4f6bdaa39e2f8cb07f5e722d9edef314'
|
||||||
|
|
||||||
self.assertEqual(key_expansion(bytes_to_intlist(bytearray.fromhex(key))), [
|
self.assertEqual(key_expansion(list(bytearray.fromhex(key))), [
|
||||||
0x4F, 0x6B, 0xDA, 0xA3, 0x9E, 0x2F, 0x8C, 0xB0, 0x7F, 0x5E, 0x72, 0x2D, 0x9E, 0xDE, 0xF3, 0x14,
|
0x4F, 0x6B, 0xDA, 0xA3, 0x9E, 0x2F, 0x8C, 0xB0, 0x7F, 0x5E, 0x72, 0x2D, 0x9E, 0xDE, 0xF3, 0x14,
|
||||||
0x53, 0x66, 0x20, 0xA8, 0xCD, 0x49, 0xAC, 0x18, 0xB2, 0x17, 0xDE, 0x35, 0x2C, 0xC9, 0x2D, 0x21,
|
0x53, 0x66, 0x20, 0xA8, 0xCD, 0x49, 0xAC, 0x18, 0xB2, 0x17, 0xDE, 0x35, 0x2C, 0xC9, 0x2D, 0x21,
|
||||||
0x8C, 0xBE, 0xDD, 0xD9, 0x41, 0xF7, 0x71, 0xC1, 0xF3, 0xE0, 0xAF, 0xF4, 0xDF, 0x29, 0x82, 0xD5,
|
0x8C, 0xBE, 0xDD, 0xD9, 0x41, 0xF7, 0x71, 0xC1, 0xF3, 0xE0, 0xAF, 0xF4, 0xDF, 0x29, 0x82, 0xD5,
|
||||||
|
|||||||
@ -69,7 +69,6 @@ from yt_dlp.utils import (
|
|||||||
get_elements_html_by_class,
|
get_elements_html_by_class,
|
||||||
get_elements_text_and_html_by_attribute,
|
get_elements_text_and_html_by_attribute,
|
||||||
int_or_none,
|
int_or_none,
|
||||||
intlist_to_bytes,
|
|
||||||
iri_to_uri,
|
iri_to_uri,
|
||||||
is_html,
|
is_html,
|
||||||
join_nonempty,
|
join_nonempty,
|
||||||
@ -1310,11 +1309,6 @@ class TestUtil(unittest.TestCase):
|
|||||||
self.assertEqual(clean_html('a:\n "b"'), 'a: "b"')
|
self.assertEqual(clean_html('a:\n "b"'), 'a: "b"')
|
||||||
self.assertEqual(clean_html('a<br>\xa0b'), 'a\nb')
|
self.assertEqual(clean_html('a<br>\xa0b'), 'a\nb')
|
||||||
|
|
||||||
def test_intlist_to_bytes(self):
|
|
||||||
self.assertEqual(
|
|
||||||
intlist_to_bytes([0, 1, 127, 128, 255]),
|
|
||||||
b'\x00\x01\x7f\x80\xff')
|
|
||||||
|
|
||||||
def test_args_to_str(self):
|
def test_args_to_str(self):
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
args_to_str(['foo', 'ba/r', '-baz', '2 be', '']),
|
args_to_str(['foo', 'ba/r', '-baz', '2 be', '']),
|
||||||
|
|||||||
@ -3,7 +3,6 @@ from math import ceil
|
|||||||
|
|
||||||
from .compat import compat_ord
|
from .compat import compat_ord
|
||||||
from .dependencies import Cryptodome
|
from .dependencies import Cryptodome
|
||||||
from .utils import bytes_to_intlist, intlist_to_bytes
|
|
||||||
|
|
||||||
if Cryptodome.AES:
|
if Cryptodome.AES:
|
||||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||||
@ -17,15 +16,15 @@ if Cryptodome.AES:
|
|||||||
else:
|
else:
|
||||||
def aes_cbc_decrypt_bytes(data, key, iv):
|
def aes_cbc_decrypt_bytes(data, key, iv):
|
||||||
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
|
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
|
||||||
return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
|
return bytes(aes_cbc_decrypt(*map(list, (data, key, iv))))
|
||||||
|
|
||||||
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
|
def aes_gcm_decrypt_and_verify_bytes(data, key, tag, nonce):
|
||||||
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
|
""" Decrypt bytes with AES-GCM using native implementation since pycryptodome is unavailable """
|
||||||
return intlist_to_bytes(aes_gcm_decrypt_and_verify(*map(bytes_to_intlist, (data, key, tag, nonce))))
|
return bytes(aes_gcm_decrypt_and_verify(*map(list, (data, key, tag, nonce))))
|
||||||
|
|
||||||
|
|
||||||
def aes_cbc_encrypt_bytes(data, key, iv, **kwargs):
|
def aes_cbc_encrypt_bytes(data, key, iv, **kwargs):
|
||||||
return intlist_to_bytes(aes_cbc_encrypt(*map(bytes_to_intlist, (data, key, iv)), **kwargs))
|
return bytes(aes_cbc_encrypt(*map(list, (data, key, iv)), **kwargs))
|
||||||
|
|
||||||
|
|
||||||
BLOCK_SIZE_BYTES = 16
|
BLOCK_SIZE_BYTES = 16
|
||||||
@ -221,7 +220,7 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
|
|||||||
j0 = [*nonce, 0, 0, 0, 1]
|
j0 = [*nonce, 0, 0, 0, 1]
|
||||||
else:
|
else:
|
||||||
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
|
fill = (BLOCK_SIZE_BYTES - (len(nonce) % BLOCK_SIZE_BYTES)) % BLOCK_SIZE_BYTES + 8
|
||||||
ghash_in = nonce + [0] * fill + bytes_to_intlist((8 * len(nonce)).to_bytes(8, 'big'))
|
ghash_in = nonce + [0] * fill + list((8 * len(nonce)).to_bytes(8, 'big'))
|
||||||
j0 = ghash(hash_subkey, ghash_in)
|
j0 = ghash(hash_subkey, ghash_in)
|
||||||
|
|
||||||
# TODO: add nonce support to aes_ctr_decrypt
|
# TODO: add nonce support to aes_ctr_decrypt
|
||||||
@ -235,8 +234,8 @@ def aes_gcm_decrypt_and_verify(data, key, tag, nonce):
|
|||||||
hash_subkey,
|
hash_subkey,
|
||||||
data
|
data
|
||||||
+ [0] * (BLOCK_SIZE_BYTES - len(data) + pad_len) # pad
|
+ [0] * (BLOCK_SIZE_BYTES - len(data) + pad_len) # pad
|
||||||
+ bytes_to_intlist((0 * 8).to_bytes(8, 'big') # length of associated data
|
+ list((0 * 8).to_bytes(8, 'big') # length of associated data
|
||||||
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
|
+ ((len(data) * 8).to_bytes(8, 'big'))), # length of data
|
||||||
)
|
)
|
||||||
|
|
||||||
if tag != aes_ctr_encrypt(s_tag, key, j0):
|
if tag != aes_ctr_encrypt(s_tag, key, j0):
|
||||||
@ -300,8 +299,8 @@ def aes_decrypt_text(data, password, key_size_bytes):
|
|||||||
"""
|
"""
|
||||||
NONCE_LENGTH_BYTES = 8
|
NONCE_LENGTH_BYTES = 8
|
||||||
|
|
||||||
data = bytes_to_intlist(base64.b64decode(data))
|
data = list(base64.b64decode(data))
|
||||||
password = bytes_to_intlist(password.encode())
|
password = list(password.encode())
|
||||||
|
|
||||||
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
|
key = password[:key_size_bytes] + [0] * (key_size_bytes - len(password))
|
||||||
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES)
|
key = aes_encrypt(key[:BLOCK_SIZE_BYTES], key_expansion(key)) * (key_size_bytes // BLOCK_SIZE_BYTES)
|
||||||
@ -310,7 +309,7 @@ def aes_decrypt_text(data, password, key_size_bytes):
|
|||||||
cipher = data[NONCE_LENGTH_BYTES:]
|
cipher = data[NONCE_LENGTH_BYTES:]
|
||||||
|
|
||||||
decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
|
decrypted_data = aes_ctr_decrypt(cipher, key, nonce + [0] * (BLOCK_SIZE_BYTES - NONCE_LENGTH_BYTES))
|
||||||
return intlist_to_bytes(decrypted_data)
|
return bytes(decrypted_data)
|
||||||
|
|
||||||
|
|
||||||
RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36)
|
RCON = (0x8d, 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80, 0x1b, 0x36)
|
||||||
|
|||||||
@ -18,10 +18,8 @@ from ..networking.exceptions import TransportError
|
|||||||
from ..utils import (
|
from ..utils import (
|
||||||
ExtractorError,
|
ExtractorError,
|
||||||
OnDemandPagedList,
|
OnDemandPagedList,
|
||||||
bytes_to_intlist,
|
|
||||||
decode_base_n,
|
decode_base_n,
|
||||||
int_or_none,
|
int_or_none,
|
||||||
intlist_to_bytes,
|
|
||||||
time_seconds,
|
time_seconds,
|
||||||
traverse_obj,
|
traverse_obj,
|
||||||
update_url_query,
|
update_url_query,
|
||||||
@ -72,15 +70,15 @@ class AbemaLicenseRH(RequestHandler):
|
|||||||
})
|
})
|
||||||
|
|
||||||
res = decode_base_n(license_response['k'], table=self._STRTABLE)
|
res = decode_base_n(license_response['k'], table=self._STRTABLE)
|
||||||
encvideokey = bytes_to_intlist(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
|
encvideokey = list(struct.pack('>QQ', res >> 64, res & 0xffffffffffffffff))
|
||||||
|
|
||||||
h = hmac.new(
|
h = hmac.new(
|
||||||
binascii.unhexlify(self._HKEY),
|
binascii.unhexlify(self._HKEY),
|
||||||
(license_response['cid'] + self.ie._DEVICE_ID).encode(),
|
(license_response['cid'] + self.ie._DEVICE_ID).encode(),
|
||||||
digestmod=hashlib.sha256)
|
digestmod=hashlib.sha256)
|
||||||
enckey = bytes_to_intlist(h.digest())
|
enckey = list(h.digest())
|
||||||
|
|
||||||
return intlist_to_bytes(aes_ecb_decrypt(encvideokey, enckey))
|
return bytes(aes_ecb_decrypt(encvideokey, enckey))
|
||||||
|
|
||||||
|
|
||||||
class AbemaTVBaseIE(InfoExtractor):
|
class AbemaTVBaseIE(InfoExtractor):
|
||||||
|
|||||||
@ -11,11 +11,9 @@ from ..networking.exceptions import HTTPError
|
|||||||
from ..utils import (
|
from ..utils import (
|
||||||
ExtractorError,
|
ExtractorError,
|
||||||
ass_subtitles_timecode,
|
ass_subtitles_timecode,
|
||||||
bytes_to_intlist,
|
|
||||||
bytes_to_long,
|
bytes_to_long,
|
||||||
float_or_none,
|
float_or_none,
|
||||||
int_or_none,
|
int_or_none,
|
||||||
intlist_to_bytes,
|
|
||||||
join_nonempty,
|
join_nonempty,
|
||||||
long_to_bytes,
|
long_to_bytes,
|
||||||
parse_iso8601,
|
parse_iso8601,
|
||||||
@ -198,7 +196,7 @@ Format: Marked,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text'''
|
|||||||
|
|
||||||
links_url = try_get(options, lambda x: x['video']['url']) or (video_base_url + 'link')
|
links_url = try_get(options, lambda x: x['video']['url']) or (video_base_url + 'link')
|
||||||
self._K = ''.join(random.choices('0123456789abcdef', k=16))
|
self._K = ''.join(random.choices('0123456789abcdef', k=16))
|
||||||
message = bytes_to_intlist(json.dumps({
|
message = list(json.dumps({
|
||||||
'k': self._K,
|
'k': self._K,
|
||||||
't': token,
|
't': token,
|
||||||
}))
|
}))
|
||||||
@ -207,7 +205,7 @@ Format: Marked,Start,End,Style,Name,MarginL,MarginR,MarginV,Effect,Text'''
|
|||||||
# a different random padding
|
# a different random padding
|
||||||
links_data = None
|
links_data = None
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
padded_message = intlist_to_bytes(pkcs1pad(message, 128))
|
padded_message = bytes(pkcs1pad(message, 128))
|
||||||
n, e = self._RSA_KEY
|
n, e = self._RSA_KEY
|
||||||
encrypted_message = long_to_bytes(pow(bytes_to_long(padded_message), e, n))
|
encrypted_message = long_to_bytes(pow(bytes_to_long(padded_message), e, n))
|
||||||
authorization = base64.b64encode(encrypted_message).decode()
|
authorization = base64.b64encode(encrypted_message).decode()
|
||||||
|
|||||||
@ -8,10 +8,8 @@ import time
|
|||||||
from .common import InfoExtractor
|
from .common import InfoExtractor
|
||||||
from ..aes import aes_encrypt
|
from ..aes import aes_encrypt
|
||||||
from ..utils import (
|
from ..utils import (
|
||||||
bytes_to_intlist,
|
|
||||||
determine_ext,
|
determine_ext,
|
||||||
int_or_none,
|
int_or_none,
|
||||||
intlist_to_bytes,
|
|
||||||
join_nonempty,
|
join_nonempty,
|
||||||
smuggle_url,
|
smuggle_url,
|
||||||
strip_jsonp,
|
strip_jsonp,
|
||||||
@ -277,8 +275,8 @@ class AnvatoIE(InfoExtractor):
|
|||||||
server_time = self._server_time(access_key, video_id)
|
server_time = self._server_time(access_key, video_id)
|
||||||
input_data = f'{server_time}~{md5_text(video_data_url)}~{md5_text(server_time)}'
|
input_data = f'{server_time}~{md5_text(video_data_url)}~{md5_text(server_time)}'
|
||||||
|
|
||||||
auth_secret = intlist_to_bytes(aes_encrypt(
|
auth_secret = bytes(aes_encrypt(
|
||||||
bytes_to_intlist(input_data[:64]), bytes_to_intlist(self._AUTH_KEY)))
|
list(input_data[:64]), list(self._AUTH_KEY)))
|
||||||
query = {
|
query = {
|
||||||
'X-Anvato-Adst-Auth': base64.b64encode(auth_secret).decode('ascii'),
|
'X-Anvato-Adst-Auth': base64.b64encode(auth_secret).decode('ascii'),
|
||||||
'rtyp': 'fp',
|
'rtyp': 'fp',
|
||||||
|
|||||||
@ -4,8 +4,6 @@ from .common import InfoExtractor
|
|||||||
from ..aes import aes_cbc_decrypt, unpad_pkcs7
|
from ..aes import aes_cbc_decrypt, unpad_pkcs7
|
||||||
from ..utils import (
|
from ..utils import (
|
||||||
ExtractorError,
|
ExtractorError,
|
||||||
bytes_to_intlist,
|
|
||||||
intlist_to_bytes,
|
|
||||||
unified_strdate,
|
unified_strdate,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -68,10 +66,10 @@ class ShemarooMeIE(InfoExtractor):
|
|||||||
data_json = self._download_json('https://www.shemaroome.com/users/user_all_lists', video_id, data=data.encode())
|
data_json = self._download_json('https://www.shemaroome.com/users/user_all_lists', video_id, data=data.encode())
|
||||||
if not data_json.get('status'):
|
if not data_json.get('status'):
|
||||||
raise ExtractorError('Premium videos cannot be downloaded yet.', expected=True)
|
raise ExtractorError('Premium videos cannot be downloaded yet.', expected=True)
|
||||||
url_data = bytes_to_intlist(base64.b64decode(data_json['new_play_url']))
|
url_data = list(base64.b64decode(data_json['new_play_url']))
|
||||||
key = bytes_to_intlist(base64.b64decode(data_json['key']))
|
key = list(base64.b64decode(data_json['key']))
|
||||||
iv = [0] * 16
|
iv = [0] * 16
|
||||||
m3u8_url = unpad_pkcs7(intlist_to_bytes(aes_cbc_decrypt(url_data, key, iv))).decode('ascii')
|
m3u8_url = unpad_pkcs7(bytes(aes_cbc_decrypt(url_data, key, iv))).decode('ascii')
|
||||||
headers = {'stream_key': data_json['stream_key']}
|
headers = {'stream_key': data_json['stream_key']}
|
||||||
formats, m3u8_subs = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, fatal=False, headers=headers)
|
formats, m3u8_subs = self._extract_m3u8_formats_and_subtitles(m3u8_url, video_id, fatal=False, headers=headers)
|
||||||
for fmt in formats:
|
for fmt in formats:
|
||||||
|
|||||||
@ -9,6 +9,7 @@ passthrough_module(__name__, '.._legacy', callback=lambda attr: warnings.warn(
|
|||||||
del passthrough_module
|
del passthrough_module
|
||||||
|
|
||||||
|
|
||||||
|
import struct
|
||||||
from ._utils import preferredencoding
|
from ._utils import preferredencoding
|
||||||
|
|
||||||
|
|
||||||
@ -37,3 +38,18 @@ def decodeOption(optval):
|
|||||||
|
|
||||||
def error_to_compat_str(err):
|
def error_to_compat_str(err):
|
||||||
return str(err)
|
return str(err)
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_to_intlist(bs):
|
||||||
|
if not bs:
|
||||||
|
return []
|
||||||
|
if isinstance(bs[0], int): # Python 3
|
||||||
|
return list(bs)
|
||||||
|
else:
|
||||||
|
return [ord(c) for c in bs]
|
||||||
|
|
||||||
|
|
||||||
|
def intlist_to_bytes(xs):
|
||||||
|
if not xs:
|
||||||
|
return b''
|
||||||
|
return struct.pack('%dB' % len(xs), *xs)
|
||||||
|
|||||||
@ -1503,21 +1503,6 @@ def deprecation_warning(msg, *, printer=None, stacklevel=0, **kwargs):
|
|||||||
deprecation_warning._cache = set()
|
deprecation_warning._cache = set()
|
||||||
|
|
||||||
|
|
||||||
def bytes_to_intlist(bs):
|
|
||||||
if not bs:
|
|
||||||
return []
|
|
||||||
if isinstance(bs[0], int): # Python 3
|
|
||||||
return list(bs)
|
|
||||||
else:
|
|
||||||
return [ord(c) for c in bs]
|
|
||||||
|
|
||||||
|
|
||||||
def intlist_to_bytes(xs):
|
|
||||||
if not xs:
|
|
||||||
return b''
|
|
||||||
return struct.pack('%dB' % len(xs), *xs)
|
|
||||||
|
|
||||||
|
|
||||||
class LockingUnsupportedError(OSError):
|
class LockingUnsupportedError(OSError):
|
||||||
msg = 'File locking is not supported'
|
msg = 'File locking is not supported'
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user