Merge pull request #838 from aguskov/rt_thr_win
Fixed multithreading on Windows (alloy.py)
This commit is contained in:
97
run_tests.py
97
run_tests.py
@@ -297,8 +297,8 @@ def run_test(testname):
|
|||||||
# pull tests to run from the given queue and run them. Multiple copies of
|
# pull tests to run from the given queue and run them. Multiple copies of
|
||||||
# this function will be running in parallel across all of the CPU cores of
|
# this function will be running in parallel across all of the CPU cores of
|
||||||
# the system.
|
# the system.
|
||||||
def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test_length_arg, counter, mutex, glob_var):
|
def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_tests_arg, max_test_length_arg, counter, mutex, glob_var):
|
||||||
# This is needed on windows because windows doen't copy globals from parent process whili multiprocessing
|
# This is needed on windows because windows doesn't copy globals from parent process while multiprocessing
|
||||||
global is_windows
|
global is_windows
|
||||||
is_windows = glob_var[0]
|
is_windows = glob_var[0]
|
||||||
global options
|
global options
|
||||||
@@ -320,14 +320,44 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test
|
|||||||
os.chdir(tmpdir)
|
os.chdir(tmpdir)
|
||||||
else:
|
else:
|
||||||
olddir = ""
|
olddir = ""
|
||||||
|
|
||||||
|
# by default, the thread is presumed to fail
|
||||||
|
queue_error.put('ERROR')
|
||||||
compile_error_files = [ ]
|
compile_error_files = [ ]
|
||||||
run_succeed_files = [ ]
|
run_succeed_files = [ ]
|
||||||
run_error_files = [ ]
|
run_error_files = [ ]
|
||||||
skip_files = [ ]
|
skip_files = [ ]
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
filename = queue.get()
|
if not queue.empty():
|
||||||
if (filename == 'STOP'):
|
filename = queue.get()
|
||||||
|
if check_test(filename):
|
||||||
|
try:
|
||||||
|
(compile_error, run_error) = run_test(filename)
|
||||||
|
except:
|
||||||
|
# This is in case the child has unexpectedly died or some other exception happened
|
||||||
|
# it`s not what we wanted, so we leave ERROR in queue_error
|
||||||
|
print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log)
|
||||||
|
# minus one thread, minus one STOP
|
||||||
|
queue_finish.get()
|
||||||
|
# needed for queue join
|
||||||
|
queue_finish.task_done()
|
||||||
|
# exiting the loop, returning from the thread
|
||||||
|
break
|
||||||
|
|
||||||
|
if compile_error == 0 and run_error == 0:
|
||||||
|
run_succeed_files += [ filename ]
|
||||||
|
if compile_error != 0:
|
||||||
|
compile_error_files += [ filename ]
|
||||||
|
if run_error != 0:
|
||||||
|
run_error_files += [ filename ]
|
||||||
|
|
||||||
|
with mutex:
|
||||||
|
update_progress(filename, total_tests_arg, counter, max_test_length_arg)
|
||||||
|
else:
|
||||||
|
skip_files += [ filename ]
|
||||||
|
|
||||||
|
else:
|
||||||
queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files))
|
queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files))
|
||||||
if is_windows:
|
if is_windows:
|
||||||
try:
|
try:
|
||||||
@@ -341,27 +371,16 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test
|
|||||||
os.rmdir(tmpdir)
|
os.rmdir(tmpdir)
|
||||||
except:
|
except:
|
||||||
None
|
None
|
||||||
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
if check_test(filename):
|
# the next line is crucial for error indication!
|
||||||
try:
|
# this thread ended correctly, so take ERROR back
|
||||||
(compile_error, run_error) = run_test(filename)
|
queue_error.get()
|
||||||
except:
|
# minus one thread, minus one STOP
|
||||||
print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log)
|
queue_finish.get()
|
||||||
sys.exit(-1) # This is in case the child has unexpectedly died or some other exception happened
|
# needed for queue join
|
||||||
|
queue_finish.task_done()
|
||||||
if compile_error == 0 and run_error == 0:
|
# exiting the loop, returning from the thread
|
||||||
run_succeed_files += [ filename ]
|
break
|
||||||
if compile_error != 0:
|
|
||||||
compile_error_files += [ filename ]
|
|
||||||
if run_error != 0:
|
|
||||||
run_error_files += [ filename ]
|
|
||||||
|
|
||||||
with mutex:
|
|
||||||
update_progress(filename, total_tests_arg, counter, max_test_length_arg)
|
|
||||||
else:
|
|
||||||
skip_files += [ filename ]
|
|
||||||
|
|
||||||
|
|
||||||
def sigint(signum, frame):
|
def sigint(signum, frame):
|
||||||
@@ -684,10 +703,16 @@ def run_tests(options1, args, print_version):
|
|||||||
q = multiprocessing.Queue()
|
q = multiprocessing.Queue()
|
||||||
for fn in files:
|
for fn in files:
|
||||||
q.put(fn)
|
q.put(fn)
|
||||||
for x in range(nthreads):
|
# qret is a queue for returned data
|
||||||
q.put('STOP')
|
|
||||||
qret = multiprocessing.Queue()
|
qret = multiprocessing.Queue()
|
||||||
qskip = multiprocessing.Queue()
|
# qerr is an error indication queue
|
||||||
|
qerr = multiprocessing.Queue()
|
||||||
|
# qfin is a waiting queue: JoinableQueue has join() and task_done() methods
|
||||||
|
qfin = multiprocessing.JoinableQueue()
|
||||||
|
|
||||||
|
# for each thread, there is a STOP in qfin to synchronize execution
|
||||||
|
for x in range(nthreads):
|
||||||
|
qfin.put('STOP')
|
||||||
|
|
||||||
# need to catch sigint so that we can terminate all of the tasks if
|
# need to catch sigint so that we can terminate all of the tasks if
|
||||||
# we're interrupted
|
# we're interrupted
|
||||||
@@ -702,14 +727,14 @@ def run_tests(options1, args, print_version):
|
|||||||
global task_threads
|
global task_threads
|
||||||
task_threads = [0] * nthreads
|
task_threads = [0] * nthreads
|
||||||
for x in range(nthreads):
|
for x in range(nthreads):
|
||||||
task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qskip, total_tests,
|
task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qerr, qfin, total_tests,
|
||||||
max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var))
|
max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var))
|
||||||
task_threads[x].start()
|
task_threads[x].start()
|
||||||
|
|
||||||
# wait for them to all finish and then return the number that failed
|
# wait for them all to finish and rid the queue of STOPs
|
||||||
# (i.e. return 0 if all is ok)
|
# join() here just waits for synchronization
|
||||||
for t in task_threads:
|
qfin.join()
|
||||||
t.join()
|
|
||||||
if options.non_interactive == False:
|
if options.non_interactive == False:
|
||||||
print_debug("\n", s, run_tests_log)
|
print_debug("\n", s, run_tests_log)
|
||||||
|
|
||||||
@@ -752,9 +777,9 @@ def run_tests(options1, args, print_version):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
for jb in task_threads:
|
# if all threads ended correctly, qerr is empty
|
||||||
if not jb.exitcode == 0:
|
if not qerr.empty():
|
||||||
raise OSError(2, 'Some test subprocess has thrown an exception', '')
|
raise OSError(2, 'Some test subprocess has thrown an exception', '')
|
||||||
|
|
||||||
if options.non_interactive:
|
if options.non_interactive:
|
||||||
print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log)
|
print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log)
|
||||||
|
|||||||
Reference in New Issue
Block a user