| #!/usr/bin/env python |
| # Copyright 2018 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Unit tests for the client_replay.CommandSequence class.""" |
| |
| import optparse |
| import os |
| import StringIO |
| import sys |
| import unittest |
| |
| import client_replay |
| |
| _THIS_DIR = os.path.abspath(os.path.dirname(__file__)) |
| _PARENT_DIR = os.path.join(_THIS_DIR, os.pardir) |
| _TEST_DIR = os.path.join(_PARENT_DIR, "test") |
| # pylint: disable=g-import-not-at-top |
| sys.path.insert(1, _TEST_DIR) |
| import unittest_util |
| sys.path.remove(_TEST_DIR) |
| |
| sys.path.insert(1, _PARENT_DIR) |
| import util |
| sys.path.insert(1, _PARENT_DIR) |
| # pylint: enable=g-import-not-at-top |
| |
| _SESSION_ID = "b15232d5497ec0d8300a5a1ea56f33ce" |
| _SESSION_ID_ALT = "a81dc5521092a5ba132b9c0b6cf6e84f" |
| |
| _NO_PARAMS = ("[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] " |
| "COMMAND GetTitle {\n\n}\n" |
| "[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] " |
| "RESPONSE GetTitle\n") |
| _WITH_PARAMS = ('[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'COMMAND GetTitle {\n"param1": 7\n}\n' |
| '[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'RESPONSE GetTitle {\n"param2": 42\n}\n') |
| _COMMAND_ONLY = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'COMMAND GetTitle {\n"param1": 7\n}\n') |
| _RESPONSE_ONLY = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'RESPONSE GetTitle {\n"param2": 42\n}\n') |
| _PAYLOAD_SCRIPT = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]' |
| ' RESPONSE GetTitle {\n"param2": "function(){func()}"\n}\n') |
| _BAD_SCRIPT = ('[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce]' |
| ' RESPONSE GetTitle {\n"param2": "))}\\})}/{)}({(})}"\n}\n') |
| _MULTI_SESSION = ('[1531428669.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'COMMAND GetSessions {\n\n}\n' |
| '[1531428669.535][INFO]: [a81dc5521092a5ba132b9c0b6cf6e84f] ' |
| 'COMMAND GetSessions {\n\n}\n' |
| '[1531428670.535][INFO]: [b15232d5497ec0d8300a5a1ea56f33ce] ' |
| 'RESPONSE GetSessions {\n"param2": 42\n}\n' |
| '[1531428670.535][INFO]: [a81dc5521092a5ba132b9c0b6cf6e84f] ' |
| 'RESPONSE GetSessions {\n"param2": 42\n}\n' + _COMMAND_ONLY) |
| |
| _WINDOW_IDS = ["CDwindow-00", "CDwindow-98", "other thing"] |
| _ELEMENT_ID = {"ELEMENT": "0.87-1"} |
| _ELEMENT_IDS = [{"ELEMENT": "0.87-1"}, {"ELEMENT": "0.87-2"}] |
| |
| |
| class ChromeDriverClientReplayUnitTest(unittest.TestCase): |
| """base class for test cases""" |
| |
| def __init__(self, *args, **kwargs): |
| super(ChromeDriverClientReplayUnitTest, self).__init__(*args, **kwargs) |
| |
| def testNextCommandEmptyParams(self): |
| string_buffer = StringIO.StringIO(_NO_PARAMS) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| command = command_sequence.NextCommand(None) |
| response = command_sequence._last_response |
| |
| self.assertEqual(command.name, "GetTitle") |
| self.assertEqual(command.GetPayloadPrimitive(), {"sessionId": _SESSION_ID}) |
| self.assertEqual(command.session_id, _SESSION_ID) |
| |
| self.assertEqual(response.name, "GetTitle") |
| self.assertEqual(response.GetPayloadPrimitive(), None) |
| self.assertEqual(response.session_id, _SESSION_ID) |
| |
| def testNextCommandWithParams(self): |
| string_buffer = StringIO.StringIO(_WITH_PARAMS) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| command = command_sequence.NextCommand(None) |
| response = command_sequence._last_response |
| |
| self.assertEqual(command.name, "GetTitle") |
| self.assertEqual(command.GetPayloadPrimitive(), {"param1": 7, |
| "sessionId": _SESSION_ID}) |
| self.assertEqual(command.session_id, _SESSION_ID) |
| |
| self.assertEqual(response.name, "GetTitle") |
| self.assertEqual(response.GetPayloadPrimitive(), {"param2": 42}) |
| self.assertEqual(response.session_id, _SESSION_ID) |
| |
| def testParserGetNext(self): |
| string_buffer = StringIO.StringIO(_WITH_PARAMS) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| command = command_sequence._parser.GetNext() |
| |
| self.assertEqual(command.name, "GetTitle") |
| self.assertEqual(command.GetPayloadPrimitive(), {"param1": 7}) |
| self.assertEqual(command.session_id, _SESSION_ID) |
| |
| def testIngestLoggedResponse(self): |
| string_buffer = StringIO.StringIO(_RESPONSE_ONLY) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| response = command_sequence._parser.GetNext() |
| |
| self.assertEqual(response.name, "GetTitle") |
| self.assertEqual(response.GetPayloadPrimitive(), {"param2": 42}) |
| self.assertEqual(response.session_id, _SESSION_ID) |
| |
| def testGetPayload_simple(self): |
| string_buffer = StringIO.StringIO(_RESPONSE_ONLY) |
| header = string_buffer.readline() |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| payload_string = command_sequence._parser._GetPayloadString(header) |
| self.assertEqual(payload_string, '{"param2": 42\n}\n') |
| |
| def testGetPayload_script(self): |
| string_buffer = StringIO.StringIO(_PAYLOAD_SCRIPT) |
| header = string_buffer.readline() |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| payload_string = command_sequence._parser._GetPayloadString(header) |
| self.assertEqual(payload_string, '{"param2": "function(){func()}"\n}\n') |
| |
| def testGetPayload_badscript(self): |
| string_buffer = StringIO.StringIO(_BAD_SCRIPT) |
| header = string_buffer.readline() |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| payload_string = command_sequence._parser._GetPayloadString(header) |
| self.assertEqual(payload_string, '{"param2": "))}\\})}/{)}({(})}"\n}\n') |
| |
| def testSubstitutePayloadIds_element(self): |
| id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"} |
| substituted = {"ELEMENT": "0.78-1"} |
| client_replay._ReplaceWindowAndElementIds(substituted, id_map) |
| self.assertEqual(substituted, {"ELEMENT": "0.00-0"}) |
| |
| def testSubstitutePayloadIds_elements(self): |
| id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"} |
| substituted = [{"ELEMENT": "0.78-1"}, {"ELEMENT": "0.78-2"}] |
| client_replay._ReplaceWindowAndElementIds(substituted, id_map) |
| self.assertEqual(substituted, |
| [{"ELEMENT": "0.00-0"}, {"ELEMENT": "0.00-1"}]) |
| |
| def testSubstitutePayloadIds_windows(self): |
| id_map = {"CDwindow-98": "CDwindow-00"} |
| substituted = ["CDwindow-98"] |
| client_replay._ReplaceWindowAndElementIds(substituted, id_map) |
| self.assertEqual(substituted, ["CDwindow-00"]) |
| |
| def testSubstitutePayloadIds_recursion(self): |
| id_map = {"0.78-1": "0.00-0", "0.78-2": "0.00-1"} |
| substituted = {"args": [{"1": "0.78-1", "2": "0.78-2"}]} |
| client_replay._ReplaceWindowAndElementIds(substituted, id_map) |
| self.assertEqual(substituted, {"args": [{"1": "0.00-0", "2": "0.00-1"}]}) |
| |
| def testGetAnyElementids_window(self): |
| ids = client_replay._GetAnyElementIds(_WINDOW_IDS) |
| self.assertEqual(ids, ["CDwindow-00", "CDwindow-98"]) |
| |
| def testGetAnyElementids_element(self): |
| ids = client_replay._GetAnyElementIds(_ELEMENT_ID) |
| self.assertEqual(ids, ["0.87-1"]) |
| |
| def testGetAnyElementids_elements(self): |
| ids = client_replay._GetAnyElementIds(_ELEMENT_IDS) |
| self.assertEqual(ids, ["0.87-1", "0.87-2"]) |
| |
| def testGetAnyElementids_string(self): |
| ids = client_replay._GetAnyElementIds("true") |
| self.assertEqual(ids, None) |
| |
| def testGetAnyElementids_invalid(self): |
| ids = client_replay._GetAnyElementIds("[ gibberish ]") |
| self.assertEqual(ids, None) |
| |
| def testCountChar_positive(self): |
| self.assertEqual(client_replay._CountChar("{;{;{)]", "{", "}"), 3) |
| |
| def testCountChar_onebrace(self): |
| self.assertEqual(client_replay._CountChar("{", "{", "}"), 1) |
| |
| def testCountChar_nothing(self): |
| self.assertEqual(client_replay._CountChar("", "{", "}"), 0) |
| |
| def testCountChar_negative(self): |
| self.assertEqual(client_replay._CountChar("}){((}{(/)}=}", "{", "}"), -2) |
| |
| def testCountChar_quotes(self): |
| self.assertEqual( |
| client_replay._CountChar('[[][]"[[]]]]]"[[]', "[", "]"), 2) |
| |
| def testReplaceUrl_simple(self): |
| base_url = "https://base.url.test.com:0000" |
| payload = {"url": "https://localhost:12345/"} |
| client_replay._ReplaceUrl(payload, base_url) |
| self.assertEqual(payload, {"url": "https://base.url.test.com:0000/"}) |
| |
| def testReplaceUrl_nothing(self): |
| payload = {"url": "https://localhost:12345/"} |
| client_replay._ReplaceUrl(payload, None) |
| self.assertEqual(payload, {"url": "https://localhost:12345/"}) |
| |
| def testReplaceBinary(self): |
| payload_dict = { |
| "desiredCapabilities": { |
| "goog:chromeOptions": { |
| "binary": "/path/to/logged binary/with spaces/" |
| }, |
| "other_things": ["some", "uninteresting", "strings"] |
| } |
| } |
| payload_replaced = { |
| "desiredCapabilities": { |
| "goog:chromeOptions": { |
| "binary": "replacement_binary" |
| }, |
| "other_things": ["some", "uninteresting", "strings"] |
| } |
| } |
| client_replay._ReplaceBinary(payload_dict, "replacement_binary") |
| self.assertEqual(payload_replaced, payload_dict) |
| |
| def testReplaceBinary_none(self): |
| payload_dict = { |
| "desiredCapabilities": { |
| "goog:chromeOptions": { |
| "binary": "/path/to/logged binary/with spaces/" |
| }, |
| "other_things": ["some", "uninteresting", "strings"] |
| } |
| } |
| payload_replaced = { |
| "desiredCapabilities": { |
| "goog:chromeOptions": {}, |
| "other_things": ["some", "uninteresting", "strings"] |
| } |
| } |
| client_replay._ReplaceBinary(payload_dict, None) |
| self.assertEqual(payload_replaced, payload_dict) |
| |
| def testReplaceBinary_nocapabilities(self): |
| payload_dict = {"desiredCapabilities": {}} |
| payload_replaced = { |
| "desiredCapabilities": { |
| "goog:chromeOptions": { |
| "binary": "replacement_binary" |
| } |
| } |
| } |
| client_replay._ReplaceBinary(payload_dict, "replacement_binary") |
| self.assertEqual(payload_replaced, payload_dict) |
| |
| def testGetCommandName(self): |
| self.assertEqual(client_replay._GetCommandName(_PAYLOAD_SCRIPT), "GetTitle") |
| |
| def testGetSessionId(self): |
| self.assertEqual(client_replay._GetSessionId(_PAYLOAD_SCRIPT), |
| _SESSION_ID) |
| |
| def testParseCommand_true(self): |
| string_buffer = StringIO.StringIO(_COMMAND_ONLY) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| self.assertTrue(command_sequence._parser.GetNext().IsCommand()) |
| |
| def testParseCommand_false(self): |
| string_buffer = StringIO.StringIO(_RESPONSE_ONLY) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| self.assertFalse(command_sequence._parser.GetNext().IsCommand()) |
| |
| def testParseResponse_true(self): |
| string_buffer = StringIO.StringIO(_RESPONSE_ONLY) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| self.assertTrue(command_sequence._parser.GetNext().IsResponse()) |
| |
| def testParseResponse_false(self): |
| string_buffer = StringIO.StringIO(_COMMAND_ONLY) |
| command_sequence = client_replay.CommandSequence() |
| command_sequence._parser = client_replay._Parser(string_buffer) |
| self.assertFalse(command_sequence._parser.GetNext().IsResponse()) |
| |
| def testHandleGetSessions(self): |
| string_buffer = StringIO.StringIO(_MULTI_SESSION) |
| command_sequence = client_replay.CommandSequence(string_buffer) |
| first_command = command_sequence._parser.GetNext() |
| command = command_sequence._HandleGetSessions( |
| first_command) |
| responses = command_sequence._last_response |
| |
| self.assertEqual(command.name, "GetSessions") |
| self.assertEqual(command.GetPayloadPrimitive(), {}) |
| self.assertEqual(command.session_id, _SESSION_ID) |
| |
| self.assertEqual(responses.name, "GetSessions") |
| self.assertEqual(responses.GetPayloadPrimitive(), [ |
| { |
| "capabilities": {"param2": 42}, |
| "id": _SESSION_ID |
| }, { |
| "capabilities": {"param2": 42}, |
| "id": _SESSION_ID_ALT |
| } |
| ]) |
| self.assertEqual(responses.session_id, "") |
| self.assertEqual(command_sequence._parser._saved_log_entry.name, "GetTitle") |
| |
| |
| def main(): |
| parser = optparse.OptionParser() |
| parser.add_option( |
| "", "--filter", type="string", default="*", |
| help=('Filter for specifying what tests to run, "*" will run all. E.g., ' |
| "*testReplaceUrl_nothing")) |
| parser.add_option( |
| "", "--isolated-script-test-output", |
| help="JSON output file used by swarming") |
| # this option is ignored |
| parser.add_option("--isolated-script-test-perf-output", type=str) |
| |
| options, _ = parser.parse_args() |
| |
| all_tests_suite = unittest.defaultTestLoader.loadTestsFromModule( |
| sys.modules[__name__]) |
| tests = unittest_util.FilterTestSuite(all_tests_suite, options.filter) |
| result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(tests) |
| |
| if options.isolated_script_test_output: |
| util.WriteResultToJSONFile(tests, result, |
| options.isolated_script_test_output) |
| |
| sys.exit(len(result.failures) + len(result.errors)) |
| |
| if __name__ == "__main__": |
| main() |