import unittest |
from test import test_support |
from py.compat import subprocess |
import sys |
import signal |
import os |
import tempfile |
import time |
import re |
|
mswindows = (sys.platform == "win32") |
|
|
|
|
|
if mswindows: |
SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' |
'os.O_BINARY);') |
else: |
SETBINARY = '' |
|
|
|
|
def remove_stderr_debug_decorations(stderr): |
return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) |
|
class ProcessTestCase(unittest.TestCase): |
def mkstemp(self): |
"""wrapper for mkstemp, calling mktemp if mkstemp is not available""" |
if hasattr(tempfile, "mkstemp"): |
return tempfile.mkstemp() |
else: |
fname = tempfile.mktemp() |
return os.open(fname, os.O_RDWR|os.O_CREAT), fname |
|
|
|
|
def test_call_seq(self): |
|
rc = subprocess.call([sys.executable, "-c", |
"import sys; sys.exit(47)"]) |
self.assertEqual(rc, 47) |
|
def test_call_kwargs(self): |
|
newenv = os.environ.copy() |
newenv["FRUIT"] = "banana" |
rc = subprocess.call([sys.executable, "-c", |
'import sys, os;' \ |
'sys.exit(os.getenv("FRUIT")=="banana")'], |
env=newenv) |
self.assertEqual(rc, 1) |
|
def test_stdin_none(self): |
|
p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
p.wait() |
self.assertEqual(p.stdin, None) |
|
def test_stdout_none(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'print " this bit of output is from a ' |
'test of stdout in a different ' |
'process ..."'], |
stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
p.wait() |
self.assertEqual(p.stdout, None) |
|
def test_stderr_none(self): |
|
p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], |
stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
p.wait() |
self.assertEqual(p.stderr, None) |
|
def test_executable(self): |
p = subprocess.Popen(["somethingyoudonthave", |
"-c", "import sys; sys.exit(47)"], |
executable=sys.executable) |
p.wait() |
self.assertEqual(p.returncode, 47) |
|
def test_stdin_pipe(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.exit(sys.stdin.read() == "pear")'], |
stdin=subprocess.PIPE) |
p.stdin.write("pear") |
p.stdin.close() |
p.wait() |
self.assertEqual(p.returncode, 1) |
|
def test_stdin_filedes(self): |
|
tf = tempfile.TemporaryFile() |
d = tf.fileno() |
os.write(d, "pear") |
os.lseek(d, 0, 0) |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.exit(sys.stdin.read() == "pear")'], |
stdin=d) |
p.wait() |
self.assertEqual(p.returncode, 1) |
|
def test_stdin_fileobj(self): |
|
tf = tempfile.TemporaryFile() |
tf.write("pear") |
tf.seek(0) |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.exit(sys.stdin.read() == "pear")'], |
stdin=tf) |
p.wait() |
self.assertEqual(p.returncode, 1) |
|
def test_stdout_pipe(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stdout.write("orange")'], |
stdout=subprocess.PIPE) |
self.assertEqual(p.stdout.read(), "orange") |
|
def test_stdout_filedes(self): |
|
tf = tempfile.TemporaryFile() |
d = tf.fileno() |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stdout.write("orange")'], |
stdout=d) |
p.wait() |
os.lseek(d, 0, 0) |
self.assertEqual(os.read(d, 1024), "orange") |
|
def test_stdout_fileobj(self): |
|
tf = tempfile.TemporaryFile() |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stdout.write("orange")'], |
stdout=tf) |
p.wait() |
tf.seek(0) |
self.assertEqual(tf.read(), "orange") |
|
def test_stderr_pipe(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stderr.write("strawberry")'], |
stderr=subprocess.PIPE) |
self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), |
"strawberry") |
|
def test_stderr_filedes(self): |
|
tf = tempfile.TemporaryFile() |
d = tf.fileno() |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stderr.write("strawberry")'], |
stderr=d) |
p.wait() |
os.lseek(d, 0, 0) |
self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), |
"strawberry") |
|
def test_stderr_fileobj(self): |
|
tf = tempfile.TemporaryFile() |
p = subprocess.Popen([sys.executable, "-c", |
'import sys; sys.stderr.write("strawberry")'], |
stderr=tf) |
p.wait() |
tf.seek(0) |
self.assertEqual(remove_stderr_debug_decorations(tf.read()), |
"strawberry") |
|
def test_stdout_stderr_pipe(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys;' \ |
'sys.stdout.write("apple");' \ |
'sys.stdout.flush();' \ |
'sys.stderr.write("orange")'], |
stdout=subprocess.PIPE, |
stderr=subprocess.STDOUT) |
output = p.stdout.read() |
stripped = remove_stderr_debug_decorations(output) |
self.assertEqual(stripped, "appleorange") |
|
def test_stdout_stderr_file(self): |
|
tf = tempfile.TemporaryFile() |
p = subprocess.Popen([sys.executable, "-c", |
'import sys;' \ |
'sys.stdout.write("apple");' \ |
'sys.stdout.flush();' \ |
'sys.stderr.write("orange")'], |
stdout=tf, |
stderr=tf) |
p.wait() |
tf.seek(0) |
output = tf.read() |
stripped = remove_stderr_debug_decorations(output) |
self.assertEqual(stripped, "appleorange") |
|
def test_cwd(self): |
tmpdir = os.getenv("TEMP", "/tmp") |
|
|
cwd = os.getcwd() |
os.chdir(tmpdir) |
tmpdir = os.getcwd() |
os.chdir(cwd) |
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' \ |
'sys.stdout.write(os.getcwd())'], |
stdout=subprocess.PIPE, |
cwd=tmpdir) |
normcase = os.path.normcase |
self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) |
|
def test_env(self): |
newenv = os.environ.copy() |
newenv["FRUIT"] = "orange" |
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' \ |
'sys.stdout.write(os.getenv("FRUIT"))'], |
stdout=subprocess.PIPE, |
env=newenv) |
self.assertEqual(p.stdout.read(), "orange") |
|
def test_communicate(self): |
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' \ |
'sys.stderr.write("pineapple");' \ |
'sys.stdout.write(sys.stdin.read())'], |
stdin=subprocess.PIPE, |
stdout=subprocess.PIPE, |
stderr=subprocess.PIPE) |
(stdout, stderr) = p.communicate("banana") |
self.assertEqual(stdout, "banana") |
self.assertEqual(remove_stderr_debug_decorations(stderr), |
"pineapple") |
|
def test_communicate_returns(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
"import sys; sys.exit(47)"]) |
(stdout, stderr) = p.communicate() |
self.assertEqual(stdout, None) |
self.assertEqual(stderr, None) |
|
def test_communicate_pipe_buf(self): |
|
|
|
x, y = os.pipe() |
if mswindows: |
pipe_buf = 512 |
else: |
pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") |
os.close(x) |
os.close(y) |
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' |
'sys.stdout.write(sys.stdin.read(47));' \ |
'sys.stderr.write("xyz"*%d);' \ |
'sys.stdout.write(sys.stdin.read())' % pipe_buf], |
stdin=subprocess.PIPE, |
stdout=subprocess.PIPE, |
stderr=subprocess.PIPE) |
string_to_write = "abc"*pipe_buf |
(stdout, stderr) = p.communicate(string_to_write) |
self.assertEqual(stdout, string_to_write) |
|
def test_writes_before_communicate(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' \ |
'sys.stdout.write(sys.stdin.read())'], |
stdin=subprocess.PIPE, |
stdout=subprocess.PIPE, |
stderr=subprocess.PIPE) |
p.stdin.write("banana") |
(stdout, stderr) = p.communicate("split") |
self.assertEqual(stdout, "bananasplit") |
self.assertEqual(remove_stderr_debug_decorations(stderr), "") |
|
def test_universal_newlines(self): |
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' + SETBINARY + |
'sys.stdout.write("line1\\n");' |
'sys.stdout.flush();' |
'sys.stdout.write("line2\\r");' |
'sys.stdout.flush();' |
'sys.stdout.write("line3\\r\\n");' |
'sys.stdout.flush();' |
'sys.stdout.write("line4\\r");' |
'sys.stdout.flush();' |
'sys.stdout.write("\\nline5");' |
'sys.stdout.flush();' |
'sys.stdout.write("\\nline6");'], |
stdout=subprocess.PIPE, |
universal_newlines=1) |
stdout = p.stdout.read() |
if hasattr(open, 'newlines'): |
|
self.assertEqual(stdout, |
"line1\nline2\nline3\nline4\nline5\nline6") |
else: |
|
self.assertEqual(stdout, |
"line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
def test_universal_newlines_communicate(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' + SETBINARY + |
'sys.stdout.write("line1\\n");' |
'sys.stdout.flush();' |
'sys.stdout.write("line2\\r");' |
'sys.stdout.flush();' |
'sys.stdout.write("line3\\r\\n");' |
'sys.stdout.flush();' |
'sys.stdout.write("line4\\r");' |
'sys.stdout.flush();' |
'sys.stdout.write("\\nline5");' |
'sys.stdout.flush();' |
'sys.stdout.write("\\nline6");'], |
stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
universal_newlines=1) |
(stdout, stderr) = p.communicate() |
if hasattr(open, 'newlines'): |
|
self.assertEqual(stdout, |
"line1\nline2\nline3\nline4\nline5\nline6") |
else: |
|
self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6") |
|
def test_no_leaking(self): |
|
if test_support.is_resource_enabled("subprocess") and not mswindows: |
max_handles = 1026 |
else: |
max_handles = 65 |
for i in range(max_handles): |
p = subprocess.Popen([sys.executable, "-c", |
"import sys;sys.stdout.write(sys.stdin.read())"], |
stdin=subprocess.PIPE, |
stdout=subprocess.PIPE, |
stderr=subprocess.PIPE) |
data = p.communicate("lime")[0] |
self.assertEqual(data, "lime") |
|
|
def test_list2cmdline(self): |
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), |
'"a b c" d e') |
self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), |
'ab\\"c \\ d') |
self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), |
'a\\\\\\b "de fg" h') |
self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), |
'a\\\\\\"b c d') |
self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), |
'"a\\\\b c" d e') |
self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), |
'"a\\\\b\\ c" d e') |
|
|
def test_poll(self): |
p = subprocess.Popen([sys.executable, |
"-c", "import time; time.sleep(1)"]) |
count = 0 |
while p.poll() is None: |
time.sleep(0.1) |
count += 1 |
|
|
|
|
self.assert_(count >= 2) |
|
self.assertEqual(p.poll(), 0) |
|
|
def test_wait(self): |
p = subprocess.Popen([sys.executable, |
"-c", "import time; time.sleep(2)"]) |
self.assertEqual(p.wait(), 0) |
|
self.assertEqual(p.wait(), 0) |
|
|
def test_invalid_bufsize(self): |
|
|
try: |
subprocess.Popen([sys.executable, "-c", "pass"], "orange") |
except TypeError: |
pass |
else: |
self.fail("Expected TypeError") |
|
|
|
|
if not mswindows: |
def test_exceptions(self): |
|
try: |
p = subprocess.Popen([sys.executable, "-c", ""], |
cwd="/this/path/does/not/exist") |
except OSError, e: |
|
|
self.assertNotEqual(e.child_traceback.find("os.chdir"), -1) |
else: |
self.fail("Expected OSError") |
|
def test_run_abort(self): |
|
p = subprocess.Popen([sys.executable, |
"-c", "import os; os.abort()"]) |
p.wait() |
self.assertEqual(-p.returncode, signal.SIGABRT) |
|
def test_preexec(self): |
|
p = subprocess.Popen([sys.executable, "-c", |
'import sys,os;' \ |
'sys.stdout.write(os.getenv("FRUIT"))'], |
stdout=subprocess.PIPE, |
preexec_fn=lambda: os.putenv("FRUIT", "apple")) |
self.assertEqual(p.stdout.read(), "apple") |
|
def test_args_string(self): |
|
f, fname = self.mkstemp() |
os.write(f, "#!/bin/sh\n") |
os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % |
sys.executable) |
os.close(f) |
os.chmod(fname, 0700) |
p = subprocess.Popen(fname) |
p.wait() |
os.remove(fname) |
self.assertEqual(p.returncode, 47) |
|
def test_invalid_args(self): |
|
self.assertRaises(ValueError, subprocess.call, |
[sys.executable, |
"-c", "import sys; sys.exit(47)"], |
startupinfo=47) |
self.assertRaises(ValueError, subprocess.call, |
[sys.executable, |
"-c", "import sys; sys.exit(47)"], |
creationflags=47) |
|
def test_shell_sequence(self): |
|
newenv = os.environ.copy() |
newenv["FRUIT"] = "apple" |
p = subprocess.Popen(["echo $FRUIT"], shell=1, |
stdout=subprocess.PIPE, |
env=newenv) |
self.assertEqual(p.stdout.read().strip(), "apple") |
|
def test_shell_string(self): |
|
newenv = os.environ.copy() |
newenv["FRUIT"] = "apple" |
p = subprocess.Popen("echo $FRUIT", shell=1, |
stdout=subprocess.PIPE, |
env=newenv) |
self.assertEqual(p.stdout.read().strip(), "apple") |
|
def test_call_string(self): |
|
f, fname = self.mkstemp() |
os.write(f, "#!/bin/sh\n") |
os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % |
sys.executable) |
os.close(f) |
os.chmod(fname, 0700) |
rc = subprocess.call(fname) |
os.remove(fname) |
self.assertEqual(rc, 47) |
|
|
|
|
|
if mswindows: |
def test_startupinfo(self): |
|
|
|
STARTF_USESHOWWINDOW = 1 |
SW_MAXIMIZE = 3 |
startupinfo = subprocess.STARTUPINFO() |
startupinfo.dwFlags = STARTF_USESHOWWINDOW |
startupinfo.wShowWindow = SW_MAXIMIZE |
|
|
|
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], |
startupinfo=startupinfo) |
|
def test_creationflags(self): |
|
CREATE_NEW_CONSOLE = 16 |
sys.stderr.write(" a DOS box should flash briefly ...\n") |
subprocess.call(sys.executable + |
' -c "import time; time.sleep(0.25)"', |
creationflags=CREATE_NEW_CONSOLE) |
|
def test_invalid_args(self): |
|
self.assertRaises(ValueError, subprocess.call, |
[sys.executable, |
"-c", "import sys; sys.exit(47)"], |
preexec_fn=lambda: 1) |
self.assertRaises(ValueError, subprocess.call, |
[sys.executable, |
"-c", "import sys; sys.exit(47)"], |
close_fds=True) |
|
def test_shell_sequence(self): |
|
newenv = os.environ.copy() |
newenv["FRUIT"] = "physalis" |
p = subprocess.Popen(["set"], shell=1, |
stdout=subprocess.PIPE, |
env=newenv) |
self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
def test_shell_string(self): |
|
newenv = os.environ.copy() |
newenv["FRUIT"] = "physalis" |
p = subprocess.Popen("set", shell=1, |
stdout=subprocess.PIPE, |
env=newenv) |
self.assertNotEqual(p.stdout.read().find("physalis"), -1) |
|
def test_call_string(self): |
|
rc = subprocess.call(sys.executable + |
' -c "import sys; sys.exit(47)"') |
self.assertEqual(rc, 47) |
|
|
def test_main(): |
test_support.run_unittest(ProcessTestCase) |
|
if __name__ == "__main__": |
test_main() |
|