From fbccf0f8b0a735f9f8ba857b12094f548fecec54 Mon Sep 17 00:00:00 2001 From: Andrey Guskov Date: Wed, 6 Aug 2014 20:23:54 +0400 Subject: [PATCH 1/2] made multithreading work correctly on Windows by waiting on a queue instead of thread join() loop --- run_tests.py | 75 +++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/run_tests.py b/run_tests.py index 363ae5cd..04cede43 100755 --- a/run_tests.py +++ b/run_tests.py @@ -297,8 +297,8 @@ def run_test(testname): # pull tests to run from the given queue and run them. Multiple copies of # this function will be running in parallel across all of the CPU cores of # the system. -def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test_length_arg, counter, mutex, glob_var): - # This is needed on windows because windows doen't copy globals from parent process whili multiprocessing +def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_tests_arg, max_test_length_arg, counter, mutex, glob_var): + # This is needed on windows because windows doesn't copy globals from parent process while multiprocessing global is_windows is_windows = glob_var[0] global options @@ -321,13 +321,37 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test else: olddir = "" + queue_error.put('ERROR') compile_error_files = [ ] run_succeed_files = [ ] run_error_files = [ ] skip_files = [ ] + while True: - filename = queue.get() - if (filename == 'STOP'): + if not queue.empty(): + filename = queue.get() + if check_test(filename): + try: + (compile_error, run_error) = run_test(filename) + except: + print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log) + queue_finish.get() + queue_finish.task_done() + break # This is in case the child has unexpectedly died or some other exception happened + + if compile_error == 0 and run_error == 0: + run_succeed_files += [ filename ] + if compile_error != 0: + compile_error_files += [ filename ] + if run_error != 0: + run_error_files += [ filename ] + + with mutex: + update_progress(filename, total_tests_arg, counter, max_test_length_arg) + else: + skip_files += [ filename ] + + else: queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files)) if is_windows: try: @@ -341,27 +365,11 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test os.rmdir(tmpdir) except: None - - sys.exit(0) - if check_test(filename): - try: - (compile_error, run_error) = run_test(filename) - except: - print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log) - sys.exit(-1) # This is in case the child has unexpectedly died or some other exception happened - - if compile_error == 0 and run_error == 0: - run_succeed_files += [ filename ] - if compile_error != 0: - compile_error_files += [ filename ] - if run_error != 0: - run_error_files += [ filename ] - - with mutex: - update_progress(filename, total_tests_arg, counter, max_test_length_arg) - else: - skip_files += [ filename ] + queue_error.get() + queue_finish.get() + queue_finish.task_done() + break def sigint(signum, frame): @@ -684,10 +692,12 @@ def run_tests(options1, args, print_version): q = multiprocessing.Queue() for fn in files: q.put(fn) - for x in range(nthreads): - q.put('STOP') qret = multiprocessing.Queue() - qskip = multiprocessing.Queue() + qerr = multiprocessing.Queue() + qfin = multiprocessing.JoinableQueue() + + for x in range(nthreads): + qfin.put('STOP') # need to catch sigint so that we can terminate all of the tasks if # we're interrupted @@ -702,14 +712,14 @@ def run_tests(options1, args, print_version): global task_threads task_threads = [0] * nthreads for x in range(nthreads): - task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qskip, total_tests, + task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qerr, qfin, total_tests, max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var)) task_threads[x].start() # wait for them to all finish and then return the number that failed # (i.e. return 0 if all is ok) - for t in task_threads: - t.join() + qfin.join() + if options.non_interactive == False: print_debug("\n", s, run_tests_log) @@ -752,9 +762,8 @@ def run_tests(options1, args, print_version): - for jb in task_threads: - if not jb.exitcode == 0: - raise OSError(2, 'Some test subprocess has thrown an exception', '') + if not qerr.empty(): + raise OSError(2, 'Some test subprocess has thrown an exception', '') if options.non_interactive: print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log) From f87ecf0573222cdaab4703734659f7587113026f Mon Sep 17 00:00:00 2001 From: Andrey Guskov Date: Thu, 7 Aug 2014 19:28:15 +0400 Subject: [PATCH 2/2] added comments to the fix --- run_tests.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/run_tests.py b/run_tests.py index 04cede43..0f2ae3d2 100755 --- a/run_tests.py +++ b/run_tests.py @@ -320,7 +320,8 @@ def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_test os.chdir(tmpdir) else: olddir = "" - + + # by default, the thread is presumed to fail queue_error.put('ERROR') compile_error_files = [ ] run_succeed_files = [ ] @@ -334,10 +335,15 @@ def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_test try: (compile_error, run_error) = run_test(filename) except: + # This is in case the child has unexpectedly died or some other exception happened + # it`s not what we wanted, so we leave ERROR in queue_error print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log) + # minus one thread, minus one STOP queue_finish.get() + # needed for queue join queue_finish.task_done() - break # This is in case the child has unexpectedly died or some other exception happened + # exiting the loop, returning from the thread + break if compile_error == 0 and run_error == 0: run_succeed_files += [ filename ] @@ -366,9 +372,14 @@ def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_test except: None + # the next line is crucial for error indication! + # this thread ended correctly, so take ERROR back queue_error.get() + # minus one thread, minus one STOP queue_finish.get() + # needed for queue join queue_finish.task_done() + # exiting the loop, returning from the thread break @@ -692,10 +703,14 @@ def run_tests(options1, args, print_version): q = multiprocessing.Queue() for fn in files: q.put(fn) + # qret is a queue for returned data qret = multiprocessing.Queue() + # qerr is an error indication queue qerr = multiprocessing.Queue() + # qfin is a waiting queue: JoinableQueue has join() and task_done() methods qfin = multiprocessing.JoinableQueue() + # for each thread, there is a STOP in qfin to synchronize execution for x in range(nthreads): qfin.put('STOP') @@ -716,8 +731,8 @@ def run_tests(options1, args, print_version): max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var)) task_threads[x].start() - # wait for them to all finish and then return the number that failed - # (i.e. return 0 if all is ok) + # wait for them all to finish and rid the queue of STOPs + # join() here just waits for synchronization qfin.join() if options.non_interactive == False: @@ -762,6 +777,7 @@ def run_tests(options1, args, print_version): + # if all threads ended correctly, qerr is empty if not qerr.empty(): raise OSError(2, 'Some test subprocess has thrown an exception', '')