"""
This module makes it convenient to write self-verifying HTML and XML
documents. You simply embed special tags in your document, specifying
what programs to run, what output to expect from them, and what input
to provide to them. The tags are in their own namespace, and so they
shouldn't interfere with the rest of your document.
This module doesn't actually require that the document be well-formed,
so it's possible to use this module to test pretty much any document,
as long as you're able to embed the special test tags in it.
Here's a simple example of a self-verifying document:
Hello world!
Type this command:
echo hello world.
The command should print
hello world
back at you.
Here's the complete list of tags:
...
Launch a program and keep it open, so that we can send input to stdin,
and read output from stdout.
Expect the previously-spawned program with the given id to exit.
The "status" attribute is optional, and defaults to zero (i.e. success).
If the program doesn't exit, or if it exits with a different status
code, the test fails.
The "ignore_output" attribute is optional, and defaults to zero
(i.e. do not ignore output). If the program prints anything before it
exits, and ignore_output is zero, then the test will fail.
...
Read the given text from the previously-spawned program with the given
id. If the output doesn't match, or if no output is read, the test
fails.
The "prog" attribute is optional if and only if you use the
tag.
The "filter" attribute is optional, and specifes python code to run
before doing the comparison. For example, the filter can modify the
actual text and/or the expected text, by changing the actual_text and
expected_text variables. NOTE: the filter may be called multiple
times.
The "timeout" attribute is optional, and specifies how long to wait
for the result (in seconds). The default timeout is 2 seconds.
...
Write the given text to the stdin of the previously-spawned program
with the given id. If the write fails (e.g. because the program has
exited), the test fails.
The "prog" attribute is optional if and only if you use the
tag.
The "filter" attribute is optional, and specifies python code
to run before doing the actual write. The python code can change
the text before it's written by modifying the write_text variable.
For read and write tags that have no "prog" attribute, this specifies
the default prog id.
For read and write tags that have no "filter" attribute, this
specifies the default filter to use.
...
Execute the given python code. The test fails if any exception is
thrown.
...
Execute the given shell command. The test will fail if the command
returns a nonzero exit code. This is equivalent to
followed by .
"""
import pexpect, time
import os, re # these are imported for the convenience of embedded python code
# long-lived global state, shared between tags.
the_default_timeout = 2
the_default_progid = None
the_default_filter = None
# short-lived global state, initialized by tags in order to be used by
# any embedded code the tags contain.
actual_text = None # used by read tag
expected_text = None # used by read tag
write_text = None # used by write tag
# map from progid to program (instance of class pexpect.spawn)
progs = {}
def tag_exec_python(_content=None, _code=None):
assert not (_content and _code), \
'"code" attribute not allowed with text content'
assert (_content or _code), \
'either "code" attribute or text content is required'
if _code == None: _code = _content
_code.replace("\r\n", "\n")
exec _code in globals()
def tag_exec_shell(_content=None, _command=None, _timeout=None):
if _timeout == None: _timeout = the_default_timeout
_timeout = int(_timeout)
assert not (_content and _command), \
'"command" attribute not allowed with text content'
assert (_content or _command), \
'either "command" attribute or text content is required'
if not _command: _command = _content
_command = 'sh -c \"' + _command.replace('"', '\\"') + '"'
tag_prog(_command=_command, _id="__exec_shell__")
tag_prog_exit(_prog="__exec_shell__", _status=0, _ignore_output=1, _timeout=_timeout)
def tag_prog(_content=None, _id=None, _command=None):
assert _id, ' requires an id attribute'
assert not (_command and _content), \
'"command" attribute not allowed with text content'
assert _command or _content, \
'either "command" attribute or text content is required'
if not _command:
_command = _content
prog = pexpect.spawn(_command)
progs[_id] = prog
prog.leftover_output = ""
prog.last_write = ""
def tag_prog_exit(_prog, _content=None, _status=0, _ignore_output=0, _timeout=None):
if _timeout == None: _timeout = the_default_timeout
_timeout = int(_timeout)
prog = progs[_prog]
try:
prog.expect(pexpect.EOF, timeout=_timeout)
except pexpect.TIMEOUT:
raise AssertionError("prog '%s' didn't exit: %s" % (_prog, prog.before))
if prog.isalive():
time.sleep(0.5) # there's a time lag between eof and exit
assert not prog.isalive(), \
"expected %s to exit, but it didn't" % (_prog)
assert prog.before == "" or _ignore_output, \
"%s produced unexpected output at exit: '%s'" % (_prog, prog.before)
assert prog.exitstatus == _status, \
"expected exit code %s from %s; got %s: %s" % \
(_status, _prog, prog.exitstatus, prog.before)
def tag_set_default_prog(_content=None, _prog=None):
assert not _content, 'text content is not allowed'
assert _prog, '"prog" attribute is required'
global the_default_progid
the_default_progid = _prog
def tag_set_default_filter(_content=None, _filter=None):
assert not _content, 'text content is not allowed'
assert _filter, '"filter" attribute is required'
global the_default_filter
the_default_filter = _filter
def tag_read(_content=None, _text=None, _filter=None, _prog=None, _timeout=None):
if _prog == None: _prog = the_default_progid
if _timeout == None: _timeout = the_default_timeout
if _filter == None: _filter = the_default_filter
_timeout = int(_timeout)
assert _prog, '"prog" attribute is required'
assert not (_content and _text), \
'"text" attribute not allowed with text content'
assert _content or _text, \
'either "text" attribute or text content is required'
if _text == None: _text = _content
prog = progs[_prog]
global actual_text, expected_text
expected_text = _text
actual_text = prog.leftover_output
start_time = time.time()
while time.time() - start_time < _timeout:
got_eof = 0
try:
actual_text += prog.read_nonblocking(timeout=_timeout)
except pexpect.EOF:
got_eof = 1
except pexpect.TIMEOUT:
raise AssertionError("timed out; got '%s'" % actual_text)
if _filter:
exec _filter in globals()
# normalize newlines
actual_text = actual_text.replace("\r\n", "\n")
expected_text = expected_text.replace("\r\n", "\n")
if actual_text.startswith(expected_text):
# remove the part that matches, and leave the rest for later
actual_text = actual_text[len(expected_text):]
prog.leftover_output = actual_text
return
# if pexpect echoes back the text we just sent, ignore it
if actual_text.startswith(prog.last_write + expected_text):
actual_text = actual_text[len(prog.last_write)+len(expected_text):]
prog.leftover_output = actual_text
return
assert not got_eof, \
"expected '%s', got '%s'" % (expected_text, actual_text)
raise AssertionError("expected '%s', got '%s'" %
(expected_text, actual_text))
def tag_write(_content=None, _text=None, _filter=None, _prog=None):
if _prog == None: _prog = the_default_progid
assert _prog, '"prog" attribute is required'
assert not (_content and _text), \
'"text" attribute not allowed with text content'
assert _content or _text, \
'either "text" attribute or text content is required'
if _text == None: _text = _content
if _filter == None: _filter = the_default_filter
prog = progs[_prog]
write_text = _text
if _filter:
exec _filter in globals()
prog.write(write_text)
# pexpect sometimes echoes back the text you just sent to the program.
# so we keep a copy of it and ignore it if it shows up.
prog.last_write = write_text
#############################################################################
# Parsing
class tag:
def __init__(self, name, attributes, text_content):
self.name = name
self.attributes = attributes
self.text_content = text_content
# returns a (tag, text) pair, where tag is an instance of the tag class and
# text is the remaining text after the tag definition.
def parse_next_tag(text):
def consume_whitespace(text):
while text<>"" and text[0] in [" ", "\n", "\r", "\t"]:
text = text[1:]
return text
def consume_word(text):
word = ""
while text<>"" and text[0] not in [" ", "/", "=", ">", "<"]:
word += text[0]
text = text[1:]
return word, text
def consume_quoted_string(text):
assert text<>"" and text[0] in ["'", '"']
matching_quote = text[0]
text = text[1:]
result = ""
while text[0] != matching_quote:
result += text[0]
text = text[1:]
text = text[1:] # skip past matching quote
return result, text
# returns a (name, value) pair, or (None, None) if no attribute found
def consume_attribute(text):
name, text = consume_word(text)
if text<>"" and text[0] == "=":
text = text[1:]
value, text = consume_quoted_string(text)
return name, value, text
return None, None, text
index = text.find(""" and text[0] == "/":
# no matching end tag expected
assert text[1] == ">"
text = text[2:]
elif text<>"" and text[0] == ">":
index = text.find("" % tag_name)
assert index >= 0, "no matching end tag found for " % tag_name
text_content = text[1:index]
# TODO: should use httmllib here...
text_content = text_content.replace('<', '<')
text_content = text_content.replace('>', '>')
text_content = text_content.replace('&', '&')
text_content = text_content.replace(' ', ' ')
index += len("" % tag_name)
text = text[index:]
else:
raise AssertionError("expected end of tag but found \"%s\"" %
(tag_name, text))
return tag(tag_name, tag_attributes, text_content), text
#####################################################################
# Testing
import unittest
class test(unittest.TestCase):
def test_pass_empty_doc1(self):
self.do_test("")
def test_pass_empty_doc2(self):
self.do_test("blah")
def test_pass_empty_doc3(self):
self.do_test(" blah ")
def test_pass_single_tag(self):
self.do_test("blah")
def test_pass_hello_world(self):
self.do_test("echo hello world" \
"hello world\n" \
"")
def test_pass_quotes(self):
self.do_test('')
def test_fail_extra_output(self):
self.fail_test("echo hello world" \
"hello" \
"",
"unexpected output at exit: ' world")
def test_exec_python(self):
self.do_test("")
self.do_test("")
self.fail_test("xxx", "got 'hello")
self.do_test("")
self.do_test("xxx")
def test_fail_broken_tag1(self):
self.fail_test("", "unrecognized tag")
def test_fail_unrecognized_attribute(self):
self.fail_test("", "blah")
def do_test(self, text):
execute_text(text)
def fail_test(self, text, error_msg):
try:
execute_text(text)
except AssertionError, e:
e = e.__str__()
assert e.find(error_msg) != -1, \
"Expected '%s'; got '%s'" % (error_msg, e)
return
self.fail("test should have failed, but didn't")
#####################################################################
# Toplevel logic
import sys
def execute_text(text):
while 1:
tag, text = parse_next_tag(text)
if not tag: break # no more tags to process
try:
tag_handler = globals()["tag_"+tag.name]
except KeyError:
raise AssertionError("unrecognized tag: " % tag.name)
# tag_desc is a string description of the tag, used in error messages
tag_desc = "%s" % (tag.text_content, tag.name)
else:
tag_desc += "/>"
try:
tag_handler(_content=tag.text_content, **tag.attributes)
except AssertionError, e:
raise AssertionError("error in tag %s: %s" % (tag_desc, e))
except TypeError, e:
raise AssertionError("error in tag %s: %s" % (tag_desc, e))
def execute_file(filename):
try:
contents = "".join(open(filename).readlines())
execute_text(contents)
except AssertionError, e:
raise AssertionError("%s: %s" % (filename, e))
def main():
this_script = sys.argv[0]
if len(sys.argv) < 2:
print "Usage: %s file1 [file2 [file3 ...]]" % this_script
return
if sys.argv[1] == "--test":
sys.argv = sys.argv[1:]
unittest.main()
return
exit_status = 0
for filename in sys.argv[1:]:
this_file_failed = 0
try:
execute_file(filename)
except AssertionError, e:
print "%s: %s" % (this_script, e)
exit_status = 1
this_file_failed = 1
if not this_file_failed:
print "%s: %s: OK" % (this_script, filename)
sys.exit(exit_status)
if __name__ == "__main__":
main()