diff --git a/run_tests.py b/run_tests.py index 363ae5cd..0f2ae3d2 100755 --- a/run_tests.py +++ b/run_tests.py @@ -297,8 +297,8 @@ def run_test(testname): # pull tests to run from the given queue and run them. Multiple copies of # this function will be running in parallel across all of the CPU cores of # the system. -def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test_length_arg, counter, mutex, glob_var): - # This is needed on windows because windows doen't copy globals from parent process whili multiprocessing +def run_tasks_from_queue(queue, queue_ret, queue_error, queue_finish, total_tests_arg, max_test_length_arg, counter, mutex, glob_var): + # This is needed on windows because windows doesn't copy globals from parent process while multiprocessing global is_windows is_windows = glob_var[0] global options @@ -320,14 +320,44 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test os.chdir(tmpdir) else: olddir = "" - + + # by default, the thread is presumed to fail + queue_error.put('ERROR') compile_error_files = [ ] run_succeed_files = [ ] run_error_files = [ ] skip_files = [ ] + while True: - filename = queue.get() - if (filename == 'STOP'): + if not queue.empty(): + filename = queue.get() + if check_test(filename): + try: + (compile_error, run_error) = run_test(filename) + except: + # This is in case the child has unexpectedly died or some other exception happened + # it`s not what we wanted, so we leave ERROR in queue_error + print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log) + # minus one thread, minus one STOP + queue_finish.get() + # needed for queue join + queue_finish.task_done() + # exiting the loop, returning from the thread + break + + if compile_error == 0 and run_error == 0: + run_succeed_files += [ filename ] + if compile_error != 0: + compile_error_files += [ filename ] + if run_error != 0: + run_error_files += [ filename ] + + with mutex: + update_progress(filename, total_tests_arg, counter, max_test_length_arg) + else: + skip_files += [ filename ] + + else: queue_ret.put((compile_error_files, run_error_files, skip_files, run_succeed_files)) if is_windows: try: @@ -341,27 +371,16 @@ def run_tasks_from_queue(queue, queue_ret, queue_skip, total_tests_arg, max_test os.rmdir(tmpdir) except: None - - sys.exit(0) - if check_test(filename): - try: - (compile_error, run_error) = run_test(filename) - except: - print_debug("ERROR: run_test function raised an exception: %s\n" % (sys.exc_info()[1]), s, run_tests_log) - sys.exit(-1) # This is in case the child has unexpectedly died or some other exception happened - - if compile_error == 0 and run_error == 0: - run_succeed_files += [ filename ] - if compile_error != 0: - compile_error_files += [ filename ] - if run_error != 0: - run_error_files += [ filename ] - - with mutex: - update_progress(filename, total_tests_arg, counter, max_test_length_arg) - else: - skip_files += [ filename ] + # the next line is crucial for error indication! + # this thread ended correctly, so take ERROR back + queue_error.get() + # minus one thread, minus one STOP + queue_finish.get() + # needed for queue join + queue_finish.task_done() + # exiting the loop, returning from the thread + break def sigint(signum, frame): @@ -684,10 +703,16 @@ def run_tests(options1, args, print_version): q = multiprocessing.Queue() for fn in files: q.put(fn) - for x in range(nthreads): - q.put('STOP') + # qret is a queue for returned data qret = multiprocessing.Queue() - qskip = multiprocessing.Queue() + # qerr is an error indication queue + qerr = multiprocessing.Queue() + # qfin is a waiting queue: JoinableQueue has join() and task_done() methods + qfin = multiprocessing.JoinableQueue() + + # for each thread, there is a STOP in qfin to synchronize execution + for x in range(nthreads): + qfin.put('STOP') # need to catch sigint so that we can terminate all of the tasks if # we're interrupted @@ -702,14 +727,14 @@ def run_tests(options1, args, print_version): global task_threads task_threads = [0] * nthreads for x in range(nthreads): - task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qskip, total_tests, + task_threads[x] = multiprocessing.Process(target=run_tasks_from_queue, args=(q, qret, qerr, qfin, total_tests, max_test_length, finished_tests_counter, finished_tests_counter_lock, glob_var)) task_threads[x].start() - # wait for them to all finish and then return the number that failed - # (i.e. return 0 if all is ok) - for t in task_threads: - t.join() + # wait for them all to finish and rid the queue of STOPs + # join() here just waits for synchronization + qfin.join() + if options.non_interactive == False: print_debug("\n", s, run_tests_log) @@ -752,9 +777,9 @@ def run_tests(options1, args, print_version): - for jb in task_threads: - if not jb.exitcode == 0: - raise OSError(2, 'Some test subprocess has thrown an exception', '') + # if all threads ended correctly, qerr is empty + if not qerr.empty(): + raise OSError(2, 'Some test subprocess has thrown an exception', '') if options.non_interactive: print_debug(" Done %d / %d\n" % (finished_tests_counter.value, total_tests), s, run_tests_log)