From 9e3f19919aa3e152760f4cd0a7cad348428f3b35 Mon Sep 17 00:00:00 2001 From: Philipp Hagemeister Date: Sun, 1 Feb 2015 22:38:26 +0100 Subject: [PATCH] [jsinterp] Beef up and add tests In preparation for #4822, extend jsinterp by a lot. (We may even have to/want to write a proper interpreter with actual parsing) --- test/test_jsinterp.py | 95 +++++++++++++++++++++++ youtube_dl/jsinterp.py | 172 ++++++++++++++++++++++++++++++----------- 2 files changed, 221 insertions(+), 46 deletions(-) create mode 100644 test/test_jsinterp.py diff --git a/test/test_jsinterp.py b/test/test_jsinterp.py new file mode 100644 index 000000000..b91b8c492 --- /dev/null +++ b/test/test_jsinterp.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +from __future__ import unicode_literals + +# Allow direct execution +import os +import sys +import unittest +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from youtube_dl.jsinterp import JSInterpreter + + +class TestJSInterpreter(unittest.TestCase): + def test_basic(self): + jsi = JSInterpreter('function x(){;}') + self.assertEqual(jsi.call_function('x'), None) + + jsi = JSInterpreter('function x3(){return 42;}') + self.assertEqual(jsi.call_function('x3'), 42) + + def test_calc(self): + jsi = JSInterpreter('function x4(a){return 2*a+1;}') + self.assertEqual(jsi.call_function('x4', 3), 7) + + def test_empty_return(self): + jsi = JSInterpreter('function f(){return; y()}') + self.assertEqual(jsi.call_function('f'), None) + + def test_morespace(self): + jsi = JSInterpreter('function x (a) { return 2 * a + 1 ; }') + self.assertEqual(jsi.call_function('x', 3), 7) + + jsi = JSInterpreter('function f () { x = 2 ; return x; }') + self.assertEqual(jsi.call_function('f'), 2) + + def test_strange_chars(self): + jsi = JSInterpreter('function $_xY1 ($_axY1) { var $_axY2 = $_axY1 + 1; return $_axY2; }') + self.assertEqual(jsi.call_function('$_xY1', 20), 21) + + def test_operators(self): + jsi = JSInterpreter('function f(){return 1 << 5;}') + self.assertEqual(jsi.call_function('f'), 32) + + jsi = JSInterpreter('function f(){return 19 & 21;}') + self.assertEqual(jsi.call_function('f'), 17) + + jsi = JSInterpreter('function f(){return 11 >> 2;}') + self.assertEqual(jsi.call_function('f'), 2) + + def test_array_access(self): + jsi = JSInterpreter('function f(){var x = [1,2,3]; x[0] = 4; x[0] = 5; x[2] = 7; return x;}') + self.assertEqual(jsi.call_function('f'), [5, 2, 7]) + + def test_parens(self): + jsi = JSInterpreter('function f(){return (1) + (2) * ((( (( (((((3)))))) )) ));}') + self.assertEqual(jsi.call_function('f'), 7) + + jsi = JSInterpreter('function f(){return (1 + 2) * 3;}') + self.assertEqual(jsi.call_function('f'), 9) + + def test_assignments(self): + jsi = JSInterpreter('function f(){var x = 20; x = 30 + 1; return x;}') + self.assertEqual(jsi.call_function('f'), 31) + + jsi = JSInterpreter('function f(){var x = 20; x += 30 + 1; return x;}') + self.assertEqual(jsi.call_function('f'), 51) + + jsi = JSInterpreter('function f(){var x = 20; x -= 30 + 1; return x;}') + self.assertEqual(jsi.call_function('f'), -11) + + def test_comments(self): + jsi = JSInterpreter(''' + function x() { + var x = /* 1 + */ 2; + var y = /* 30 + * 40 */ 50; + return x + y; + } + ''') + self.assertEqual(jsi.call_function('x'), 52) + + def test_precedence(self): + jsi = JSInterpreter(''' + function x() { + var a = [10, 20, 30, 40, 50]; + var b = 6; + a[0]=a[b%a.length]; + return a; + }''') + self.assertEqual(jsi.call_function('x'), [20, 20, 30, 40, 50]) + + +if __name__ == '__main__': + unittest.main() diff --git a/youtube_dl/jsinterp.py b/youtube_dl/jsinterp.py index b4617fbad..49364786b 100644 --- a/youtube_dl/jsinterp.py +++ b/youtube_dl/jsinterp.py @@ -1,59 +1,122 @@ from __future__ import unicode_literals import json +import operator import re from .utils import ( ExtractorError, ) +_OPERATORS = [ + ('|', operator.or_), + ('^', operator.xor), + ('&', operator.and_), + ('>>', operator.rshift), + ('<<', operator.lshift), + ('-', operator.sub), + ('+', operator.add), + ('%', operator.mod), + ('/', operator.div), + ('*', operator.mul), +] +_ASSIGN_OPERATORS = [(op + '=', opfunc) for op, opfunc in _OPERATORS] +_ASSIGN_OPERATORS.append(('=', lambda cur, right: right)) + +_NAME_RE = r'[a-zA-Z_$][a-zA-Z_$0-9]*' + class JSInterpreter(object): - def __init__(self, code): - self.code = code + def __init__(self, code, objects=None): + if objects is None: + objects = {} + self.code = self._remove_comments(code) self._functions = {} - self._objects = {} + self._objects = objects - def interpret_statement(self, stmt, local_vars, allow_recursion=20): + def _remove_comments(self, code): + return re.sub(r'(?s)/\*.*?\*/', '', code) + + def interpret_statement(self, stmt, local_vars, allow_recursion=100): if allow_recursion < 0: raise ExtractorError('Recursion limit reached') - if stmt.startswith('var '): - stmt = stmt[len('var '):] - ass_m = re.match(r'^(?P[a-z]+)(?:\[(?P[^\]]+)\])?' + - r'=(?P.*)$', stmt) - if ass_m: - if ass_m.groupdict().get('index'): - def assign(val): - lvar = local_vars[ass_m.group('out')] - idx = self.interpret_expression( - ass_m.group('index'), local_vars, allow_recursion) - assert isinstance(idx, int) - lvar[idx] = val - return val - expr = ass_m.group('expr') - else: - def assign(val): - local_vars[ass_m.group('out')] = val - return val - expr = ass_m.group('expr') - elif stmt.startswith('return '): - assign = lambda v: v - expr = stmt[len('return '):] + should_abort = False + stmt = stmt.lstrip() + stmt_m = re.match(r'var\s', stmt) + if stmt_m: + expr = stmt[len(stmt_m.group(0)):] else: - # Try interpreting it as an expression - expr = stmt - assign = lambda v: v + return_m = re.match(r'return(?:\s+|$)', stmt) + if return_m: + expr = stmt[len(return_m.group(0)):] + should_abort = True + else: + # Try interpreting it as an expression + expr = stmt v = self.interpret_expression(expr, local_vars, allow_recursion) - return assign(v) + return v, should_abort def interpret_expression(self, expr, local_vars, allow_recursion): + expr = expr.strip() + + if expr == '': # Empty expression + return None + + if expr.startswith('('): + parens_count = 0 + for m in re.finditer(r'[()]', expr): + if m.group(0) == '(': + parens_count += 1 + else: + parens_count -= 1 + if parens_count == 0: + sub_expr = expr[1:m.start()] + sub_result = self.interpret_expression( + sub_expr, local_vars, allow_recursion) + remaining_expr = expr[m.end():].strip() + if not remaining_expr: + return sub_result + else: + expr = json.dumps(sub_result) + remaining_expr + break + else: + raise ExtractorError('Premature end of parens in %r' % expr) + + for op, opfunc in _ASSIGN_OPERATORS: + m = re.match(r'''(?x) + (?P%s)(?:\[(?P[^\]]+?)\])? + \s*%s + (?P.*)$''' % (_NAME_RE, re.escape(op)), expr) + if not m: + continue + right_val = self.interpret_expression( + m.group('expr'), local_vars, allow_recursion - 1) + + if m.groupdict().get('index'): + lvar = local_vars[m.group('out')] + idx = self.interpret_expression( + m.group('index'), local_vars, allow_recursion) + assert isinstance(idx, int) + cur = lvar[idx] + val = opfunc(cur, right_val) + lvar[idx] = val + return val + else: + cur = local_vars.get(m.group('out')) + val = opfunc(cur, right_val) + local_vars[m.group('out')] = val + return val + if expr.isdigit(): return int(expr) - if expr.isalpha(): - return local_vars[expr] + var_m = re.match( + r'(?!if|return|true|false)(?P%s)$' % _NAME_RE, + expr) + if var_m: + return local_vars[var_m.group('name')] try: return json.loads(expr) @@ -61,7 +124,7 @@ def interpret_expression(self, expr, local_vars, allow_recursion): pass m = re.match( - r'^(?P[$a-zA-Z0-9_]+)\.(?P[^(]+)(?:\(+(?P[^()]*)\))?$', + r'(?P%s)\.(?P[^(]+)(?:\(+(?P[^()]*)\))?$' % _NAME_RE, expr) if m: variable = m.group('var') @@ -114,23 +177,31 @@ def interpret_expression(self, expr, local_vars, allow_recursion): return obj[member](argvals) m = re.match( - r'^(?P[a-z]+)\[(?P.+)\]$', expr) + r'(?P%s)\[(?P.+)\]$' % _NAME_RE, expr) if m: val = local_vars[m.group('in')] idx = self.interpret_expression( m.group('idx'), local_vars, allow_recursion - 1) return val[idx] - m = re.match(r'^(?P.+?)(?P[%])(?P.+?)$', expr) - if m: - a = self.interpret_expression( - m.group('a'), local_vars, allow_recursion) - b = self.interpret_expression( - m.group('b'), local_vars, allow_recursion) - return a % b + for op, opfunc in _OPERATORS: + m = re.match(r'(?P.+?)%s(?P.+)' % re.escape(op), expr) + if not m: + continue + x, abort = self.interpret_statement( + m.group('x'), local_vars, allow_recursion - 1) + if abort: + raise ExtractorError( + 'Premature left-side return of %s in %r' % (op, expr)) + y, abort = self.interpret_statement( + m.group('y'), local_vars, allow_recursion - 1) + if abort: + raise ExtractorError( + 'Premature right-side return of %s in %r' % (op, expr)) + return opfunc(x, y) m = re.match( - r'^(?P[a-zA-Z$]+)\((?P[a-z0-9,]+)\)$', expr) + r'^(?P%s)\((?P[a-zA-Z0-9_$,]+)\)$' % _NAME_RE, expr) if m: fname = m.group('func') argvals = tuple([ @@ -139,6 +210,7 @@ def interpret_expression(self, expr, local_vars, allow_recursion): if fname not in self._functions: self._functions[fname] = self.extract_function(fname) return self._functions[fname](argvals) + raise ExtractorError('Unsupported JS expression %r' % expr) def extract_object(self, objname): @@ -162,9 +234,11 @@ def extract_object(self, objname): def extract_function(self, funcname): func_m = re.search( - (r'(?:function %s|[{;]%s\s*=\s*function)' % ( - re.escape(funcname), re.escape(funcname))) + - r'\((?P[a-z,]+)\){(?P[^}]+)}', + r'''(?x) + (?:function\s+%s|[{;]%s\s*=\s*function)\s* + \((?P[^)]*)\)\s* + \{(?P[^}]+)\}''' % ( + re.escape(funcname), re.escape(funcname)), self.code) if func_m is None: raise ExtractorError('Could not find JS function %r' % funcname) @@ -172,10 +246,16 @@ def extract_function(self, funcname): return self.build_function(argnames, func_m.group('code')) + def call_function(self, funcname, *args): + f = self.extract_function(funcname) + return f(args) + def build_function(self, argnames, code): def resf(args): local_vars = dict(zip(argnames, args)) for stmt in code.split(';'): - res = self.interpret_statement(stmt, local_vars) + res, abort = self.interpret_statement(stmt, local_vars) + if abort: + break return res return resf