made multithreading work correctly on Windows by waiting on a queue instead of thread join() loop

This commit is contained in:
Andrey Guskov
2014-08-06 20:23:54 +04:00
parent 96d3363cb3
commit fbccf0f8b0

View File

@@ -297,8 +297,8 @@ def run_test(testname):
# pull tests to run from the given queue and run them. Multiple copies of # pull tests to run from the given queue and run them. Multiple copies of
# this function will be running in parallel across all of the CPU cores of # this function will be running in parallel across all of the CPU cores of
# the system. # the system.
def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test_length_arg, counter, mutex, glob_var): def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_tests_arg, max_test_length_arg, counter, mutex, glob_var):
# This is needed on windows because windows doen't copy globals from parent process whili multiprocessing # This is needed on windows because windows doesn't copy globals from parent process while multiprocessing
global is_windows global is_windows
is_windows = glob_var[0] is_windows = glob_var[0]
global options global options
@@ -321,13 +321,37 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test
else: else:
olddir = "" olddir = ""
queue_error.put('ERROR')
compile_error_files = [ ] compile_error_files = [ ]
run_succeed_files = [ ] run_succeed_files = [ ]
run_error_files = [ ] run_error_files = [ ]
skip_files = [ ] skip_files = [ ]
while True: while True:
filename = queue.get() if not queue.empty():
if (filename == 'STOP'): filename = queue.get()
if check_test(filename):
try:
(compile_error, run_error) = run_test(filename)
except:
print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log)
queue_finish.get()
queue_finish.task_done()
break # This is in case the child has unexpectedly died or some other exception happened
if compile_error == 0 and run_error == 0:
run_succeed_files += [ filename ]
if compile_error != 0:
compile_error_files += [ filename ]
if run_error != 0:
run_error_files += [ filename ]
with mutex:
update_progress(filename, total_tests_arg, counter, max_test_length_arg)
else:
skip_files += [ filename ]
else:
queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files)) queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files))
if is_windows: if is_windows:
try: try:
@@ -342,26 +366,10 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test
except: except:
None None
sys.exit(0) queue_error.get()
queue_finish.get()
if check_test(filename): queue_finish.task_done()
try: break
(compile_error, run_error) = run_test(filename)
except:
print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log)
sys.exit(-1) # This is in case the child has unexpectedly died or some other exception happened
if compile_error == 0 and run_error == 0:
run_succeed_files += [ filename ]
if compile_error != 0:
compile_error_files += [ filename ]
if run_error != 0:
run_error_files += [ filename ]
with mutex:
update_progress(filename, total_tests_arg, counter, max_test_length_arg)
else:
skip_files += [ filename ]
def sigint(signum, frame): def sigint(signum, frame):
@@ -684,10 +692,12 @@ def run_tests(options1, args, print_version):
q = multiprocessing.Queue() q = multiprocessing.Queue()
for fn in files: for fn in files:
q.put(fn) q.put(fn)
for x in range(nthreads):
q.put('STOP')
qret = multiprocessing.Queue() qret = multiprocessing.Queue()
qskip = multiprocessing.Queue() qerr = multiprocessing.Queue()
qfin = multiprocessing.JoinableQueue()
for x in range(nthreads):
qfin.put('STOP')
# need to catch sigint so that we can terminate all of the tasks if # need to catch sigint so that we can terminate all of the tasks if
# we're interrupted # we're interrupted
@@ -702,14 +712,14 @@ def run_tests(options1, args, print_version):
global task_threads global task_threads
task_threads = [0] * nthreads task_threads = [0] * nthreads
for x in range(nthreads): for x in range(nthreads):
task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qskip, total_tests, task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qerr, qfin, total_tests,
max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var)) max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var))
task_threads[x].start() task_threads[x].start()
# wait for them to all finish and then return the number that failed # wait for them to all finish and then return the number that failed
# (i.e. return 0 if all is ok) # (i.e. return 0 if all is ok)
for t in task_threads: qfin.join()
t.join()
if options.non_interactive == False: if options.non_interactive == False:
print_debug("\n", s, run_tests_log) print_debug("\n", s, run_tests_log)
@@ -752,9 +762,8 @@ def run_tests(options1, args, print_version):
for jb in task_threads: if not qerr.empty():
if not jb.exitcode == 0: raise OSError(2, 'Some test subprocess has thrown an exception', '')
raise OSError(2, 'Some test subprocess has thrown an exception', '')
if options.non_interactive: if options.non_interactive:
print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log) print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log)