Skip to content

Commit

Permalink
Merge pull request #1 from cy20lin/main
Browse files Browse the repository at this point in the history
fix test support on Windows
  • Loading branch information
OldLiu001 committed Jul 8, 2023
2 parents b51d235 + 9e239ee commit ae946aa
Showing 1 changed file with 149 additions and 55 deletions.
204 changes: 149 additions & 55 deletions runtest.py
@@ -1,18 +1,22 @@
#!/usr/bin/env python

from __future__ import print_function
import os, sys, re
import argparse, time
import signal, atexit

from subprocess import Popen, STDOUT, PIPE
from select import select

# Pseudo-TTY and terminal manipulation
import pty, array, fcntl, termios

IS_PY_3 = sys.version_info[0] == 3

if os.name == 'posix':
from select import select
else:
if IS_PY_3:
import threading, queue
from subprocess import TimeoutExpired
else:
import threading
import Queue as queue

debug_file = None
log_file = None

Expand Down Expand Up @@ -83,75 +87,151 @@ def __init__(self, args, no_pty=False, line_break="\n"):
env['TERM'] = 'dumb'
env['INPUTRC'] = '/dev/null'
env['PERL_RL'] = 'false'
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
if os.name == 'posix':
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
# Pseudo-TTY and terminal manipulation
import pty, array, fcntl, termios

# provide tty to get 'interactive' readline to work
master, slave = pty.openpty()

# Set terminal size large so that readline will not send
# ANSI/VT escape codes when the lines are long.
buf = array.array('h', [100, 200, 0, 0])
fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True)

self.p = Popen(args, bufsize=0,
stdin=slave, stdout=slave, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
# Now close slave so that we will get an exception from
# read when the child exits early
# http://stackoverflow.com/questions/11165521
os.close(slave)
self.stdin = os.fdopen(master, 'r+b', 0)
self.stdout = self.stdin
elif os.name == 'nt':
if no_pty:
from subprocess import CREATE_NEW_PROCESS_GROUP
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
creationflags=CREATE_NEW_PROCESS_GROUP,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
raise ValueError('pty not supported on os.name="{}"'.format(os.name))
else:
# provide tty to get 'interactive' readline to work
master, slave = pty.openpty()

# Set terminal size large so that readline will not send
# ANSI/VT escape codes when the lines are long.
buf = array.array('h', [100, 200, 0, 0])
fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True)

self.p = Popen(args, bufsize=0,
stdin=slave, stdout=slave, stderr=STDOUT,
preexec_fn=os.setsid,
env=env)
# Now close slave so that we will get an exception from
# read when the child exits early
# http://stackoverflow.com/questions/11165521
os.close(slave)
self.stdin = os.fdopen(master, 'r+b', 0)
self.stdout = self.stdin
if no_pty:
self.p = Popen(args, bufsize=0,
stdin=PIPE, stdout=PIPE, stderr=STDOUT,
env=env)
self.stdin = self.p.stdin
self.stdout = self.p.stdout
else:
raise ValueError('pty not supported on os.name="{}"'.format(os.name))

#print "started"
self.buf = ""
self.last_prompt = ""

self.line_break = line_break

if os.name == 'posix':
self.q = None
self.t = None
else:
self.q = queue.Queue()
self.t = threading.Thread(target=self._reader, args=())
self.t.daemon = True
self.t.start()

def _reader(self):
try:
f = self.stdout
ok = True
while ok:
try:
new_data = f.read(1)
if len(new_data) == 0: # EOF
ok = False
except Exception as e:
# catch the read exception and send it to queue
ok = False
new_data = e
self.q.put(new_data)
except:
pass

def read_to_prompt(self, prompts, timeout):
end_time = time.time() + timeout
while time.time() < end_time:
[outs,_,_] = select([self.stdout], [], [], 1)
if self.stdout in outs:
while True:
current_timeout = max(end_time - time.time(), 0.)
if current_timeout == 0.:
break
if os.name == 'posix':
[outs,_,_] = select([self.stdout], [], [], 1)
if self.stdout not in outs:
break
new_data = self.stdout.read(1)
new_data = new_data.decode("utf-8") if IS_PY_3 else new_data
#print("new_data: '%s'" % new_data)
debug(new_data)
# Perform newline cleanup
self.buf += new_data.replace("\r", "")
for prompt in prompts:
regexp = re.compile(prompt)
match = regexp.search(self.buf)
if match:
end = match.end()
buf = self.buf[0:match.start()]
self.buf = self.buf[end:]
self.last_prompt = prompt
return buf
else:
try:
new_data = self.q.get(timeout=current_timeout)
except queue.Empty:
break
if isinstance(new_data, Exception):
raise new_data
if len(new_data) == 0: # EOF
break
new_data = new_data.decode("utf-8") if IS_PY_3 else new_data
#print("new_data: '%s'" % new_data)
debug(new_data)
# Perform newline cleanup
self.buf += new_data.replace("\r", "")
for prompt in prompts:
regexp = re.compile(prompt)
match = regexp.search(self.buf)
if match:
end = match.end()
buf = self.buf[0:match.start()]
self.buf = self.buf[end:]
self.last_prompt = prompt
return buf
# MAYBE we should distinguish EOF from TIMEOUT,
# return None for both cases currently
return None

def writeline(self, str):
def _to_bytes(s):
return bytes(s, "utf-8") if IS_PY_3 else s

self.stdin.write(_to_bytes(str.replace('\r', '\x16\r') + self.line_break))

def cleanup(self):
#print "cleaning up"
if self.p:
try:
os.killpg(self.p.pid, signal.SIGTERM)
if os.name == 'posix':
os.killpg(self.p.pid, signal.SIGTERM)
elif os.name == 'nt':
self.p.send_signal(signal.CTRL_BREAK_EVENT)
else:
self.p.terminate()
if IS_PY_3:
try:
self.p.communicate(timeout=1.0)
except TimeoutExpired:
self.p.kill()
except OSError:
pass
self.p = None
self.stdin = None
self.stdout = None

class TestReader:
def __init__(self, test_file):
Expand Down Expand Up @@ -267,6 +347,14 @@ def assert_prompt(runner, prompts, timeout):
class TestTimeout(Exception):
pass

def has_any_match(expects, res):
success = False
for expect in expects:
success = re.search(expect, res, re.S)
if success:
break
return success

while t.next():
if args.deferrable == False and t.deferrable:
log(t.deferrable)
Expand All @@ -287,8 +375,15 @@ class TestTimeout(Exception):
# The repeated form is to get around an occasional OS X issue
# where the form is repeated.
# https://github.com/kanaka/mal/issues/30
expects = [".*%s%s%s" % (sep, t.out, re.escape(t.ret)),
".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))]
if os.name == 'posix':
expects = [".*%s%s%s" % (sep, t.out, re.escape(t.ret)),
".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))]
elif os.name == 'nt':
expects = ["%s%s" % (t.out, re.escape(t.ret))]
else:
expects = ["%s%s" % (t.out, re.escape(t.ret)),
".*%s%s%s" % (sep, t.out, re.escape(t.ret)),
".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))]

r.writeline(t.form)
try:
Expand All @@ -302,8 +397,7 @@ class TestTimeout(Exception):
elif (t.ret == "" and t.out == ""):
log(" -> SUCCESS (result ignored)")
pass_cnt += 1
elif (re.search(expects[0], res, re.S) or
re.search(expects[1], res, re.S)):
elif has_any_match(expects, res):
log(" -> SUCCESS")
pass_cnt += 1
else:
Expand Down

0 comments on commit ae946aa

Please sign in to comment.