""" This module makes it convenient to write self-verifying HTML and XML documents. You simply embed special tags in your document, specifying what programs to run, what output to expect from them, and what input to provide to them. The tags are in their own namespace, and so they shouldn't interfere with the rest of your document. This module doesn't actually require that the document be well-formed, so it's possible to use this module to test pretty much any document, as long as you're able to embed the special test tags in it. Here's a simple example of a self-verifying document: Hello world!

Type this command: echo hello world. The command should print hello world back at you. Here's the complete list of tags: ... Launch a program and keep it open, so that we can send input to stdin, and read output from stdout. Expect the previously-spawned program with the given id to exit. The "status" attribute is optional, and defaults to zero (i.e. success). If the program doesn't exit, or if it exits with a different status code, the test fails. The "ignore_output" attribute is optional, and defaults to zero (i.e. do not ignore output). If the program prints anything before it exits, and ignore_output is zero, then the test will fail. ... Read the given text from the previously-spawned program with the given id. If the output doesn't match, or if no output is read, the test fails. The "prog" attribute is optional if and only if you use the tag. The "filter" attribute is optional, and specifes python code to run before doing the comparison. For example, the filter can modify the actual text and/or the expected text, by changing the actual_text and expected_text variables. NOTE: the filter may be called multiple times. The "timeout" attribute is optional, and specifies how long to wait for the result (in seconds). The default timeout is 2 seconds. ... Write the given text to the stdin of the previously-spawned program with the given id. If the write fails (e.g. because the program has exited), the test fails. The "prog" attribute is optional if and only if you use the tag. The "filter" attribute is optional, and specifies python code to run before doing the actual write. The python code can change the text before it's written by modifying the write_text variable. For read and write tags that have no "prog" attribute, this specifies the default prog id. For read and write tags that have no "filter" attribute, this specifies the default filter to use. ... Execute the given python code. The test fails if any exception is thrown. ... Execute the given shell command. The test will fail if the command returns a nonzero exit code. This is equivalent to followed by . """ import pexpect, time import os, re # these are imported for the convenience of embedded python code # long-lived global state, shared between tags. the_default_timeout = 2 the_default_progid = None the_default_filter = None # short-lived global state, initialized by tags in order to be used by # any embedded code the tags contain. actual_text = None # used by read tag expected_text = None # used by read tag write_text = None # used by write tag # map from progid to program (instance of class pexpect.spawn) progs = {} def tag_exec_python(_content=None, _code=None): assert not (_content and _code), \ '"code" attribute not allowed with text content' assert (_content or _code), \ 'either "code" attribute or text content is required' if _code == None: _code = _content _code.replace("\r\n", "\n") exec _code in globals() def tag_exec_shell(_content=None, _command=None, _timeout=None): if _timeout == None: _timeout = the_default_timeout _timeout = int(_timeout) assert not (_content and _command), \ '"command" attribute not allowed with text content' assert (_content or _command), \ 'either "command" attribute or text content is required' if not _command: _command = _content _command = 'sh -c \"' + _command.replace('"', '\\"') + '"' tag_prog(_command=_command, _id="__exec_shell__") tag_prog_exit(_prog="__exec_shell__", _status=0, _ignore_output=1, _timeout=_timeout) def tag_prog(_content=None, _id=None, _command=None): assert _id, ' requires an id attribute' assert not (_command and _content), \ '"command" attribute not allowed with text content' assert _command or _content, \ 'either "command" attribute or text content is required' if not _command: _command = _content prog = pexpect.spawn(_command) progs[_id] = prog prog.leftover_output = "" prog.last_write = "" def tag_prog_exit(_prog, _content=None, _status=0, _ignore_output=0, _timeout=None): if _timeout == None: _timeout = the_default_timeout _timeout = int(_timeout) prog = progs[_prog] try: prog.expect(pexpect.EOF, timeout=_timeout) except pexpect.TIMEOUT: raise AssertionError("prog '%s' didn't exit: %s" % (_prog, prog.before)) if prog.isalive(): time.sleep(0.5) # there's a time lag between eof and exit assert not prog.isalive(), \ "expected %s to exit, but it didn't" % (_prog) assert prog.before == "" or _ignore_output, \ "%s produced unexpected output at exit: '%s'" % (_prog, prog.before) assert prog.exitstatus == _status, \ "expected exit code %s from %s; got %s: %s" % \ (_status, _prog, prog.exitstatus, prog.before) def tag_set_default_prog(_content=None, _prog=None): assert not _content, 'text content is not allowed' assert _prog, '"prog" attribute is required' global the_default_progid the_default_progid = _prog def tag_set_default_filter(_content=None, _filter=None): assert not _content, 'text content is not allowed' assert _filter, '"filter" attribute is required' global the_default_filter the_default_filter = _filter def tag_read(_content=None, _text=None, _filter=None, _prog=None, _timeout=None): if _prog == None: _prog = the_default_progid if _timeout == None: _timeout = the_default_timeout if _filter == None: _filter = the_default_filter _timeout = int(_timeout) assert _prog, '"prog" attribute is required' assert not (_content and _text), \ '"text" attribute not allowed with text content' assert _content or _text, \ 'either "text" attribute or text content is required' if _text == None: _text = _content prog = progs[_prog] global actual_text, expected_text expected_text = _text actual_text = prog.leftover_output start_time = time.time() while time.time() - start_time < _timeout: got_eof = 0 try: actual_text += prog.read_nonblocking(timeout=_timeout) except pexpect.EOF: got_eof = 1 except pexpect.TIMEOUT: raise AssertionError("timed out; got '%s'" % actual_text) if _filter: exec _filter in globals() # normalize newlines actual_text = actual_text.replace("\r\n", "\n") expected_text = expected_text.replace("\r\n", "\n") if actual_text.startswith(expected_text): # remove the part that matches, and leave the rest for later actual_text = actual_text[len(expected_text):] prog.leftover_output = actual_text return # if pexpect echoes back the text we just sent, ignore it if actual_text.startswith(prog.last_write + expected_text): actual_text = actual_text[len(prog.last_write)+len(expected_text):] prog.leftover_output = actual_text return assert not got_eof, \ "expected '%s', got '%s'" % (expected_text, actual_text) raise AssertionError("expected '%s', got '%s'" % (expected_text, actual_text)) def tag_write(_content=None, _text=None, _filter=None, _prog=None): if _prog == None: _prog = the_default_progid assert _prog, '"prog" attribute is required' assert not (_content and _text), \ '"text" attribute not allowed with text content' assert _content or _text, \ 'either "text" attribute or text content is required' if _text == None: _text = _content if _filter == None: _filter = the_default_filter prog = progs[_prog] write_text = _text if _filter: exec _filter in globals() prog.write(write_text) # pexpect sometimes echoes back the text you just sent to the program. # so we keep a copy of it and ignore it if it shows up. prog.last_write = write_text ############################################################################# # Parsing class tag: def __init__(self, name, attributes, text_content): self.name = name self.attributes = attributes self.text_content = text_content # returns a (tag, text) pair, where tag is an instance of the tag class and # text is the remaining text after the tag definition. def parse_next_tag(text): def consume_whitespace(text): while text<>"" and text[0] in [" ", "\n", "\r", "\t"]: text = text[1:] return text def consume_word(text): word = "" while text<>"" and text[0] not in [" ", "/", "=", ">", "<"]: word += text[0] text = text[1:] return word, text def consume_quoted_string(text): assert text<>"" and text[0] in ["'", '"'] matching_quote = text[0] text = text[1:] result = "" while text[0] != matching_quote: result += text[0] text = text[1:] text = text[1:] # skip past matching quote return result, text # returns a (name, value) pair, or (None, None) if no attribute found def consume_attribute(text): name, text = consume_word(text) if text<>"" and text[0] == "=": text = text[1:] value, text = consume_quoted_string(text) return name, value, text return None, None, text index = text.find(""" and text[0] == "/": # no matching end tag expected assert text[1] == ">" text = text[2:] elif text<>"" and text[0] == ">": index = text.find("" % tag_name) assert index >= 0, "no matching end tag found for " % tag_name text_content = text[1:index] # TODO: should use httmllib here... text_content = text_content.replace('<', '<') text_content = text_content.replace('>', '>') text_content = text_content.replace('&', '&') text_content = text_content.replace(' ', ' ') index += len("" % tag_name) text = text[index:] else: raise AssertionError("expected end of tag but found \"%s\"" % (tag_name, text)) return tag(tag_name, tag_attributes, text_content), text ##################################################################### # Testing import unittest class test(unittest.TestCase): def test_pass_empty_doc1(self): self.do_test("") def test_pass_empty_doc2(self): self.do_test("blah") def test_pass_empty_doc3(self): self.do_test("

blah ") def test_pass_single_tag(self): self.do_test("blah") def test_pass_hello_world(self): self.do_test("echo hello world" \ "hello world\n" \ "") def test_pass_quotes(self): self.do_test('') def test_fail_extra_output(self): self.fail_test("echo hello world" \ "hello" \ "", "unexpected output at exit: ' world") def test_exec_python(self): self.do_test("") self.do_test("") self.fail_test("xxx", "got 'hello") self.do_test("") self.do_test("xxx") def test_fail_broken_tag1(self): self.fail_test("", "unrecognized tag") def test_fail_unrecognized_attribute(self): self.fail_test("", "blah") def do_test(self, text): execute_text(text) def fail_test(self, text, error_msg): try: execute_text(text) except AssertionError, e: e = e.__str__() assert e.find(error_msg) != -1, \ "Expected '%s'; got '%s'" % (error_msg, e) return self.fail("test should have failed, but didn't") ##################################################################### # Toplevel logic import sys def execute_text(text): while 1: tag, text = parse_next_tag(text) if not tag: break # no more tags to process try: tag_handler = globals()["tag_"+tag.name] except KeyError: raise AssertionError("unrecognized tag: " % tag.name) # tag_desc is a string description of the tag, used in error messages tag_desc = "