From 60bdf1ef8a92576e775b27a80464384d03e3e5b3 Mon Sep 17 00:00:00 2001 From: Matt Pharr Date: Wed, 17 Aug 2011 13:14:32 +0100 Subject: [PATCH] Modify rt example to also do a set of runs with tasks + SPMD together. --- examples/rt/Makefile | 20 ++- examples/rt/rt.cpp | 41 ++++- examples/rt/rt.ispc | 59 ++++++- examples/rt/rt.vcxproj | 3 +- examples/rt/tasks_concrt.cpp | 141 ++++++++++++++++ examples/rt/tasks_gcd.cpp | 103 ++++++++++++ examples/rt/tasks_pthreads.cpp | 295 +++++++++++++++++++++++++++++++++ 7 files changed, 643 insertions(+), 19 deletions(-) create mode 100644 examples/rt/tasks_concrt.cpp create mode 100644 examples/rt/tasks_gcd.cpp create mode 100644 examples/rt/tasks_pthreads.cpp diff --git a/examples/rt/Makefile b/examples/rt/Makefile index d0f18909..bc9ee25d 100644 --- a/examples/rt/Makefile +++ b/examples/rt/Makefile @@ -1,4 +1,16 @@ +ARCH = $(shell uname) + +TASK_CXX=tasks_pthreads.cpp +TASK_LIB=-lpthread + +ifeq ($(ARCH), Darwin) + TASK_CXX=tasks_gcd.cpp + TASK_LIB= +endif + +TASK_OBJ=$(addprefix objs/, $(TASK_CXX:.cpp=.o)) + CXX=g++ -m64 CXXFLAGS=-Iobjs/ -O3 -Wall ISPC=ispc @@ -14,11 +26,13 @@ dirs: clean: /bin/rm -rf objs *~ rt -rt: dirs objs/rt.o objs/rt_serial.o objs/rt_ispc.o - $(CXX) $(CXXFLAGS) -o $@ objs/rt.o objs/rt_ispc.o objs/rt_serial.o -lm +rt: dirs objs/rt.o objs/rt_serial.o objs/rt_ispc.o $(TASK_OBJ) + $(CXX) $(CXXFLAGS) -o $@ objs/rt.o objs/rt_ispc.o objs/rt_serial.o $(TASK_OBJ) -lm $(TASK_LIB) -objs/%.o: %.cpp objs/rt_ispc.h +objs/%.o: %.cpp $(CXX) $< $(CXXFLAGS) -c -o $@ +objs/rt.o: objs/rt_ispc.h + objs/%_ispc.h objs/%_ispc.o: %.ispc $(ISPC) $(ISPCFLAGS) $< -o objs/$*_ispc.o -h objs/$*_ispc.h diff --git a/examples/rt/rt.cpp b/examples/rt/rt.cpp index df9582bd..57f4268f 100644 --- a/examples/rt/rt.cpp +++ b/examples/rt/rt.cpp @@ -134,6 +134,9 @@ int main(int argc, char *argv[]) { ensureTargetISAIsSupported(); + extern void TasksInit(); + TasksInit(); + #define READ(var, n) \ if (fread(&(var), sizeof(var), n, f) != (unsigned int)n) { \ fprintf(stderr, "Unexpected EOF reading scene file\n"); \ @@ -215,7 +218,7 @@ int main(int argc, char *argv[]) { } fclose(f); - // round image resolution up to multiple of 4 to makethings easy for + // round image resolution up to multiple of 4 to make things easy for // the code that assigns pixels to ispc program instances height = (height + 3) & ~3; width = (width + 3) & ~3; @@ -226,19 +229,42 @@ int main(int argc, char *argv[]) { float *image = new float[width*height]; // - // Run 3 iterations with ispc, record the minimum time + // Run 3 iterations with ispc + 1 core, record the minimum time // double minTimeISPC = 1e30; for (int i = 0; i < 3; ++i) { reset_and_start_timer(); - raytrace(width, height, raster2camera, camera2world, - image, id, nodes, triangles); + raytrace_ispc(width, height, raster2camera, camera2world, + image, id, nodes, triangles); double dt = get_elapsed_mcycles(); minTimeISPC = std::min(dt, minTimeISPC); } - printf("[rt ispc]:\t\t\t[%.3f] million cycles for %d x %d image\n", minTimeISPC, width, height); + printf("[rt ispc, 1 core]:\t\t[%.3f] million cycles for %d x %d image\n", + minTimeISPC, width, height); - writeImage(id, image, width, height, "rt-ispc.ppm"); + writeImage(id, image, width, height, "rt-ispc-1core.ppm"); + + memset(id, 0, width*height*sizeof(int)); + memset(image, 0, width*height*sizeof(float)); + + // + // Run 3 iterations with ispc + 1 core, record the minimum time + // + double minTimeISPCtasks = 1e30; + for (int i = 0; i < 3; ++i) { + reset_and_start_timer(); + raytrace_ispc_tasks(width, height, raster2camera, camera2world, + image, id, nodes, triangles); + double dt = get_elapsed_mcycles(); + minTimeISPCtasks = std::min(dt, minTimeISPCtasks); + } + printf("[rt ispc + tasks]:\t\t[%.3f] million cycles for %d x %d image\n", + minTimeISPCtasks, width, height); + + writeImage(id, image, width, height, "rt-ispc-tasks.ppm"); + + memset(id, 0, width*height*sizeof(int)); + memset(image, 0, width*height*sizeof(float)); // // And 3 iterations with the serial implementation, reporting the @@ -254,7 +280,8 @@ int main(int argc, char *argv[]) { } printf("[rt serial]:\t\t\t[%.3f] million cycles for %d x %d image\n", minTimeSerial, width, height); - printf("\t\t\t\t(%.2fx speedup from ISPC)\n", minTimeSerial / minTimeISPC); + printf("\t\t\t\t(%.2fx speedup from ISPC, %.2f from ISPC + tasks)\n", + minTimeSerial / minTimeISPC, minTimeSerial / minTimeISPCtasks); writeImage(id, image, width, height, "rt-serial.ppm"); diff --git a/examples/rt/rt.ispc b/examples/rt/rt.ispc index ca150594..8e5ddf5e 100644 --- a/examples/rt/rt.ispc +++ b/examples/rt/rt.ispc @@ -226,20 +226,21 @@ bool BVHIntersect(const LinearBVHNode nodes[], const Triangle tris[], } -export void raytrace(uniform int width, uniform int height, - const uniform float raster2camera[4][4], - const uniform float camera2world[4][4], - uniform float image[], uniform int id[], - const LinearBVHNode nodes[], - const Triangle triangles[]) { +static void raytrace_tile(uniform int x0, uniform int x1, + uniform int y0, uniform int y1, uniform int width, + const uniform float raster2camera[4][4], + const uniform float camera2world[4][4], + uniform float image[], uniform int id[], + const LinearBVHNode nodes[], + const Triangle triangles[]) { static const uniform float udx[16] = { 0, 1, 0, 1, 2, 3, 2, 3, 0, 1, 0, 1, 2, 3, 2, 3 }; static const uniform float udy[16] = { 0, 0, 1, 1, 0, 0, 1, 1, 2, 2, 3, 3, 2, 2, 3, 3 }; // The outer loops are always over blocks of 4x4 pixels - for (uniform int y = 0; y < height; y += 4) { - for (uniform int x = 0; x < width; x += 4) { + for (uniform int y = y0; y < y1; y += 4) { + for (uniform int x = x0; x < x1; x += 4) { // Now we have a block of 4x4=16 pixels to process; it will // take 16/programCount iterations of this loop to process // them. @@ -261,3 +262,45 @@ export void raytrace(uniform int width, uniform int height, } } } + + +export void raytrace_ispc(uniform int width, uniform int height, + const uniform float raster2camera[4][4], + const uniform float camera2world[4][4], + uniform float image[], uniform int id[], + const LinearBVHNode nodes[], + const Triangle triangles[]) { + raytrace_tile(0, width, 0, height, width, raster2camera, camera2world, image, + id, nodes, triangles); +} + + +task void raytrace_tile_task(uniform int x0, uniform int x1, + uniform int y0, uniform int y1, uniform int width, + const uniform float raster2camera[4][4], + const uniform float camera2world[4][4], + uniform float image[], uniform int id[], + const LinearBVHNode nodes[], + const Triangle triangles[]) { + raytrace_tile(x0, x1, y0, y1, width, raster2camera, camera2world, image, + id, nodes, triangles); +} + + +export void raytrace_ispc_tasks(uniform int width, uniform int height, + const uniform float raster2camera[4][4], + const uniform float camera2world[4][4], + uniform float image[], uniform int id[], + const LinearBVHNode nodes[], + const Triangle triangles[]) { + uniform int dx = 16, dy = 16; + for (uniform int y = 0; y < height; y += dy) { + uniform int y1 = min(y + dy, height); + for (uniform int x = 0; x < width; x += dx) { + uniform int x1 = min(x + dx, width); + launch < raytrace_tile_task(x, x1, y, y1, width, raster2camera, + camera2world, image, id, nodes, + triangles) >; + } + } +} diff --git a/examples/rt/rt.vcxproj b/examples/rt/rt.vcxproj index 6b347a7a..f082beb9 100755 --- a/examples/rt/rt.vcxproj +++ b/examples/rt/rt.vcxproj @@ -164,8 +164,9 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h + - + \ No newline at end of file diff --git a/examples/rt/tasks_concrt.cpp b/examples/rt/tasks_concrt.cpp new file mode 100644 index 00000000..d40b40ae --- /dev/null +++ b/examples/rt/tasks_concrt.cpp @@ -0,0 +1,141 @@ +/* + Copyright (c) 2010-2011, Intel Corporation + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +/* Simple task system implementation for ispc based on Microsoft's + Concurrency Runtime. */ + +#include +#include +using namespace Concurrency; +#include +#include +#include +#include + +// ispc expects these functions to have C linkage / not be mangled +extern "C" { + void ISPCLaunch(void *f, void *data); + void ISPCSync(); + void *ISPCMalloc(int64_t size, int32_t alignment); + void ISPCFree(void *ptr); +} + +typedef void (*TaskFuncType)(void *, int, int); + +struct TaskInfo { + TaskFuncType ispcFunc; + void *ispcData; +}; + +// This is a simple implementation that just aborts if more than MAX_TASKS +// are launched. It could easily be extended to be more general... + +#define MAX_TASKS 32768 +static int taskOffset; +static TaskInfo taskInfo[MAX_TASKS]; +static event *events[MAX_TASKS]; +static CRITICAL_SECTION criticalSection; +static bool initialized = false; + +void +TasksInit() { + InitializeCriticalSection(&criticalSection); + for (int i = 0; i < MAX_TASKS; ++i) + events[i] = new event; + initialized = true; +} + + +void __cdecl +lRunTask(LPVOID param) { + TaskInfo *ti = (TaskInfo *)param; + + // Actually run the task. + // FIXME: like the tasks_gcd.cpp implementation, this is passing bogus + // values for the threadIndex and threadCount builtins, which in turn + // will cause bugs in code that uses those. FWIW this example doesn't + // use them... + int threadIndex = 0; + int threadCount = 1; + ti->ispcFunc(ti->ispcData, threadIndex, threadCount); + + // Signal the event that this task is done + int taskNum = ti - &taskInfo[0]; + events[taskNum]->set(); +} + + +void +ISPCLaunch(void *func, void *data) { + if (!initialized) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + + // Get a TaskInfo struct for this task + EnterCriticalSection(&criticalSection); + TaskInfo *ti = &taskInfo[taskOffset++]; + assert(taskOffset < MAX_TASKS); + LeaveCriticalSection(&criticalSection); + + // And pass it on to the Concurrency Runtime... + ti->ispcFunc = (TaskFuncType)func; + ti->ispcData = data; + CurrentScheduler::ScheduleTask(lRunTask, ti); +} + + +void ISPCSync() { + if (!initialized) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + + event::wait_for_multiple(&events[0], taskOffset, true, + COOPERATIVE_TIMEOUT_INFINITE); + + for (int i = 0; i < taskOffset; ++i) + events[i]->reset(); + + taskOffset = 0; +} + + +void *ISPCMalloc(int64_t size, int32_t alignment) { + return _aligned_malloc(size, alignment); +} + + +void ISPCFree(void *ptr) { + _aligned_free(ptr); +} diff --git a/examples/rt/tasks_gcd.cpp b/examples/rt/tasks_gcd.cpp new file mode 100644 index 00000000..f759cc37 --- /dev/null +++ b/examples/rt/tasks_gcd.cpp @@ -0,0 +1,103 @@ +/* + Copyright (c) 2010-2011, Intel Corporation + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +/* A simple task system for ispc programs based on Apple's Grand Central + Dispatch. */ + +#include +#include +#include + +static bool initialized = false; +static dispatch_queue_t gcdQueue; +static dispatch_group_t gcdGroup; + +// ispc expects these functions to have C linkage / not be mangled +extern "C" { + void ISPCLaunch(void *f, void *data); + void ISPCSync(); +} + +struct TaskInfo { + void *func; + void *data; +}; + + +void +TasksInit() { + gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); + gcdGroup = dispatch_group_create(); + initialized = true; +} + + +static void +lRunTask(void *ti) { + typedef void (*TaskFuncType)(void *, int, int); + TaskInfo *taskInfo = (TaskInfo *)ti; + + TaskFuncType func = (TaskFuncType)(taskInfo->func); + + // FIXME: these are bogus values; may cause bugs in code that depends + // on them having unique values in different threads. + int threadIndex = 0; + int threadCount = 1; + // Actually run the task + func(taskInfo->data, threadIndex, threadCount); + + // FIXME: taskInfo leaks... +} + + +void ISPCLaunch(void *func, void *data) { + if (!initialized) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + TaskInfo *ti = new TaskInfo; + ti->func = func; + ti->data = data; + dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask); +} + + +void ISPCSync() { + if (!initialized) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + + // Wait for all of the tasks in the group to complete before returning + dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER); +} diff --git a/examples/rt/tasks_pthreads.cpp b/examples/rt/tasks_pthreads.cpp new file mode 100644 index 00000000..6e95dbff --- /dev/null +++ b/examples/rt/tasks_pthreads.cpp @@ -0,0 +1,295 @@ +/* + Copyright (c) 2010-2011, Intel Corporation + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ispc expects these functions to have C linkage / not be mangled +extern "C" { + void ISPCLaunch(void *f, void *data); + void ISPCSync(); +} + + +static int nThreads; +static pthread_t *threads; +static pthread_mutex_t taskQueueMutex; +static std::vector > taskQueue; +static sem_t *workerSemaphore; +static uint32_t numUnfinishedTasks; +static pthread_mutex_t tasksRunningConditionMutex; +static pthread_cond_t tasksRunningCondition; + +static void *lTaskEntry(void *arg); + +/** Figure out how many CPU cores there are in the system + */ +static int +lNumCPUCores() { +#if defined(__linux__) + return sysconf(_SC_NPROCESSORS_ONLN); +#else + // Mac + int mib[2]; + mib[0] = CTL_HW; + size_t length = 2; + if (sysctlnametomib("hw.logicalcpu", mib, &length) == -1) { + fprintf(stderr, "sysctlnametomib() filed. Guessing 2 cores."); + return 2; + } + assert(length == 2); + + int nCores = 0; + size_t size = sizeof(nCores); + + if (sysctl(mib, 2, &nCores, &size, NULL, 0) == -1) { + fprintf(stderr, "sysctl() to find number of cores present failed. Guessing 2."); + return 2; + } + return nCores; +#endif +} + +void +TasksInit() { + nThreads = lNumCPUCores(); + + threads = new pthread_t[nThreads]; + + int err; + if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) { + fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); + exit(1); + } + + char name[32]; + sprintf(name, "ispc_task.%d", (int)getpid()); + workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0); + if (!workerSemaphore) { + fprintf(stderr, "Error creating semaphore: %s\n", strerror(err)); + exit(1); + } + + if ((err = pthread_cond_init(&tasksRunningCondition, NULL)) != 0) { + fprintf(stderr, "Error creating condition variable: %s\n", strerror(err)); + exit(1); + } + + if ((err = pthread_mutex_init(&tasksRunningConditionMutex, NULL)) != 0) { + fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); + exit(1); + } + + for (int i = 0; i < nThreads; ++i) { + err = pthread_create(&threads[i], NULL, &lTaskEntry, reinterpret_cast(i)); + if (err != 0) { + fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); + exit(1); + } + } +} + + +void +ISPCLaunch(void *f, void *d) { + if (threads == NULL) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + + // + // Acquire mutex, add task + // + int err; + if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + + taskQueue.push_back(std::make_pair(f, d)); + + if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); + exit(1); + } + + // + // Update count of number of tasks left to run + // + if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + + ++numUnfinishedTasks; + + if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + + // + // Post to the worker semaphore to wake up worker threads that are + // sleeping waiting for tasks to show up + // + if ((err = sem_post(workerSemaphore)) != 0) { + fprintf(stderr, "Error from sem_post: %s\n", strerror(err)); + exit(1); + } +} + + +static void * +lTaskEntry(void *arg) { + int threadIndex = int(reinterpret_cast(arg)); + int threadCount = nThreads; + + while (true) { + int err; + if ((err = sem_wait(workerSemaphore)) != 0) { + fprintf(stderr, "Error from sem_wait: %s\n", strerror(err)); + exit(1); + } + + std::pair myTask; + // + // Acquire mutex, get task + // + if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + if (taskQueue.size() == 0) { + // + // Task queue is empty, go back and wait on the semaphore + // + if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); + exit(1); + } + continue; + } + + myTask = taskQueue.back(); + taskQueue.pop_back(); + + if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); + exit(1); + } + + // + // Do work for _myTask_ + // + typedef void (*TaskFunType)(void *, int, int); + TaskFunType func = (TaskFunType)myTask.first; + func(myTask.second, threadIndex, threadCount); + + // + // Decrement the number of unfinished tasks counter + // + if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + + int unfinished = --numUnfinishedTasks; + if (unfinished == 0) { + // + // Signal the "no more tasks are running" condition if all of + // them are done. + // + int err; + if ((err = pthread_cond_signal(&tasksRunningCondition)) != 0) { + fprintf(stderr, "Error from pthread_cond_signal: %s\n", strerror(err)); + exit(1); + } + } + + if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + } + + pthread_exit(NULL); + return 0; +} + + +void ISPCSync() { + if (threads == NULL) { + fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); + exit(1); + } + + int err; + if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } + + // As long as there are tasks running, wait on the condition variable; + // doing so causes this thread to go to sleep until someone signals on + // the tasksRunningCondition condition variable. + while (numUnfinishedTasks > 0) { + if ((err = pthread_cond_wait(&tasksRunningCondition, + &tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_cond_wait: %s\n", strerror(err)); + exit(1); + } + } + + // We acquire ownership of the condition variable mutex when the above + // pthread_cond_wait returns. + // FIXME: is there a lurking issue here if numUnfinishedTasks gets back + // to zero by the time we get to ISPCSync() and thence we're trying to + // unlock a mutex we don't have a lock on? + if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { + fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); + exit(1); + } +}