From 206c85114623b281875649b9d6f41a7f9d236dd4 Mon Sep 17 00:00:00 2001 From: Matt Pharr Date: Wed, 17 Aug 2011 14:31:45 +0100 Subject: [PATCH] Various improvements to example task systems in examples/. - Only have a single copy of all of the tasks_*.cpp sample implementations, stored in examples/. - Reduce dynamic storage allocation and locking in task launch code paths. - Don't have a hard limit of the number of tasks that can be launched on Windows (fix issue #85). --- examples/mandelbrot_tasks/Makefile | 13 +- examples/mandelbrot_tasks/mandelbrot.cpp | 3 - .../mandelbrot_tasks/mandelbrot_tasks.vcxproj | 4 +- examples/mandelbrot_tasks/tasks_gcd.cpp | 103 ------ examples/mandelbrot_tasks/tasks_pthreads.cpp | 295 ------------------ examples/rt/Makefile | 13 +- examples/rt/rt.cpp | 3 - examples/rt/rt.vcxproj | 6 +- examples/rt/tasks_concrt.cpp | 141 --------- examples/rt/tasks_gcd.cpp | 103 ------ examples/stencil/Makefile | 13 +- examples/stencil/stencil.cpp | 3 - examples/stencil/stencil.vcxproj | 6 +- examples/stencil/tasks_concrt.cpp | 141 --------- examples/stencil/tasks_pthreads.cpp | 295 ------------------ examples/taskinfo.h | 180 +++++++++++ .../{mandelbrot_tasks => }/tasks_concrt.cpp | 75 ++--- examples/{stencil => }/tasks_gcd.cpp | 58 ++-- examples/{rt => }/tasks_pthreads.cpp | 111 ++++--- examples/volume_rendering/Makefile | 13 +- examples/volume_rendering/tasks_concrt.cpp | 141 --------- examples/volume_rendering/tasks_gcd.cpp | 103 ------ examples/volume_rendering/tasks_pthreads.cpp | 295 ------------------ examples/volume_rendering/volume.cpp | 3 - examples/volume_rendering/volume.vcxproj | 2 +- 25 files changed, 322 insertions(+), 1801 deletions(-) delete mode 100644 examples/mandelbrot_tasks/tasks_gcd.cpp delete mode 100644 examples/mandelbrot_tasks/tasks_pthreads.cpp delete mode 100644 examples/rt/tasks_concrt.cpp delete mode 100644 examples/rt/tasks_gcd.cpp delete mode 100644 examples/stencil/tasks_concrt.cpp delete mode 100644 examples/stencil/tasks_pthreads.cpp create mode 100644 examples/taskinfo.h rename examples/{mandelbrot_tasks => }/tasks_concrt.cpp (59%) rename examples/{stencil => }/tasks_gcd.cpp (78%) rename examples/{rt => }/tasks_pthreads.cpp (83%) delete mode 100644 examples/volume_rendering/tasks_concrt.cpp delete mode 100644 examples/volume_rendering/tasks_gcd.cpp delete mode 100644 examples/volume_rendering/tasks_pthreads.cpp diff --git a/examples/mandelbrot_tasks/Makefile b/examples/mandelbrot_tasks/Makefile index 263cf750..718009c6 100644 --- a/examples/mandelbrot_tasks/Makefile +++ b/examples/mandelbrot_tasks/Makefile @@ -1,18 +1,18 @@ ARCH = $(shell uname) -TASK_CXX=tasks_pthreads.cpp +TASK_CXX=../tasks_pthreads.cpp TASK_LIB=-lpthread ifeq ($(ARCH), Darwin) - TASK_CXX=tasks_gcd.cpp + TASK_CXX=../tasks_gcd.cpp TASK_LIB= endif -TASK_OBJ=$(addprefix objs/, $(TASK_CXX:.cpp=.o)) +TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o))) -CXX=g++ -m64 -CXXFLAGS=-Iobjs/ -O3 -Wall +CXX=g++ +CXXFLAGS=-Iobjs/ -O3 -Wall -m64 ISPC=ispc ISPCFLAGS=-O2 --target=sse4x2 --arch=x86-64 @@ -32,6 +32,9 @@ mandelbrot: dirs objs/mandelbrot.o objs/mandelbrot_serial.o objs/mandelbrot_ispc objs/%.o: %.cpp $(CXX) $< $(CXXFLAGS) -c -o $@ +objs/%.o: ../%.cpp + $(CXX) $< $(CXXFLAGS) -c -o $@ + objs/mandelbrot.o: objs/mandelbrot_ispc.h objs/%_ispc.h objs/%_ispc.o: %.ispc diff --git a/examples/mandelbrot_tasks/mandelbrot.cpp b/examples/mandelbrot_tasks/mandelbrot.cpp index fb36a1a9..a7d2a032 100644 --- a/examples/mandelbrot_tasks/mandelbrot.cpp +++ b/examples/mandelbrot_tasks/mandelbrot.cpp @@ -110,9 +110,6 @@ int main() { ensureTargetISAIsSupported(); - extern void TasksInit(); - TasksInit(); - int maxIterations = 512; int *buf = new int[width*height]; diff --git a/examples/mandelbrot_tasks/mandelbrot_tasks.vcxproj b/examples/mandelbrot_tasks/mandelbrot_tasks.vcxproj index 81c339b0..07e5f54c 100755 --- a/examples/mandelbrot_tasks/mandelbrot_tasks.vcxproj +++ b/examples/mandelbrot_tasks/mandelbrot_tasks.vcxproj @@ -1,4 +1,4 @@ - + @@ -143,7 +143,7 @@ - + diff --git a/examples/mandelbrot_tasks/tasks_gcd.cpp b/examples/mandelbrot_tasks/tasks_gcd.cpp deleted file mode 100644 index f759cc37..00000000 --- a/examples/mandelbrot_tasks/tasks_gcd.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* A simple task system for ispc programs based on Apple's Grand Central - Dispatch. */ - -#include -#include -#include - -static bool initialized = false; -static dispatch_queue_t gcdQueue; -static dispatch_group_t gcdGroup; - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - -struct TaskInfo { - void *func; - void *data; -}; - - -void -TasksInit() { - gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - gcdGroup = dispatch_group_create(); - initialized = true; -} - - -static void -lRunTask(void *ti) { - typedef void (*TaskFuncType)(void *, int, int); - TaskInfo *taskInfo = (TaskInfo *)ti; - - TaskFuncType func = (TaskFuncType)(taskInfo->func); - - // FIXME: these are bogus values; may cause bugs in code that depends - // on them having unique values in different threads. - int threadIndex = 0; - int threadCount = 1; - // Actually run the task - func(taskInfo->data, threadIndex, threadCount); - - // FIXME: taskInfo leaks... -} - - -void ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - TaskInfo *ti = new TaskInfo; - ti->func = func; - ti->data = data; - dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Wait for all of the tasks in the group to complete before returning - dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER); -} diff --git a/examples/mandelbrot_tasks/tasks_pthreads.cpp b/examples/mandelbrot_tasks/tasks_pthreads.cpp deleted file mode 100644 index 6e95dbff..00000000 --- a/examples/mandelbrot_tasks/tasks_pthreads.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - - -static int nThreads; -static pthread_t *threads; -static pthread_mutex_t taskQueueMutex; -static std::vector > taskQueue; -static sem_t *workerSemaphore; -static uint32_t numUnfinishedTasks; -static pthread_mutex_t tasksRunningConditionMutex; -static pthread_cond_t tasksRunningCondition; - -static void *lTaskEntry(void *arg); - -/** Figure out how many CPU cores there are in the system - */ -static int -lNumCPUCores() { -#if defined(__linux__) - return sysconf(_SC_NPROCESSORS_ONLN); -#else - // Mac - int mib[2]; - mib[0] = CTL_HW; - size_t length = 2; - if (sysctlnametomib("hw.logicalcpu", mib, &length) == -1) { - fprintf(stderr, "sysctlnametomib() filed. Guessing 2 cores."); - return 2; - } - assert(length == 2); - - int nCores = 0; - size_t size = sizeof(nCores); - - if (sysctl(mib, 2, &nCores, &size, NULL, 0) == -1) { - fprintf(stderr, "sysctl() to find number of cores present failed. Guessing 2."); - return 2; - } - return nCores; -#endif -} - -void -TasksInit() { - nThreads = lNumCPUCores(); - - threads = new pthread_t[nThreads]; - - int err; - if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - char name[32]; - sprintf(name, "ispc_task.%d", (int)getpid()); - workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0); - if (!workerSemaphore) { - fprintf(stderr, "Error creating semaphore: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_cond_init(&tasksRunningCondition, NULL)) != 0) { - fprintf(stderr, "Error creating condition variable: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_mutex_init(&tasksRunningConditionMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - for (int i = 0; i < nThreads; ++i) { - err = pthread_create(&threads[i], NULL, &lTaskEntry, reinterpret_cast(i)); - if (err != 0) { - fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); - exit(1); - } - } -} - - -void -ISPCLaunch(void *f, void *d) { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // - // Acquire mutex, add task - // - int err; - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - taskQueue.push_back(std::make_pair(f, d)); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Update count of number of tasks left to run - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - ++numUnfinishedTasks; - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // - // Post to the worker semaphore to wake up worker threads that are - // sleeping waiting for tasks to show up - // - if ((err = sem_post(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_post: %s\n", strerror(err)); - exit(1); - } -} - - -static void * -lTaskEntry(void *arg) { - int threadIndex = int(reinterpret_cast(arg)); - int threadCount = nThreads; - - while (true) { - int err; - if ((err = sem_wait(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_wait: %s\n", strerror(err)); - exit(1); - } - - std::pair myTask; - // - // Acquire mutex, get task - // - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - if (taskQueue.size() == 0) { - // - // Task queue is empty, go back and wait on the semaphore - // - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - continue; - } - - myTask = taskQueue.back(); - taskQueue.pop_back(); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Do work for _myTask_ - // - typedef void (*TaskFunType)(void *, int, int); - TaskFunType func = (TaskFunType)myTask.first; - func(myTask.second, threadIndex, threadCount); - - // - // Decrement the number of unfinished tasks counter - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - int unfinished = --numUnfinishedTasks; - if (unfinished == 0) { - // - // Signal the "no more tasks are running" condition if all of - // them are done. - // - int err; - if ((err = pthread_cond_signal(&tasksRunningCondition)) != 0) { - fprintf(stderr, "Error from pthread_cond_signal: %s\n", strerror(err)); - exit(1); - } - } - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - } - - pthread_exit(NULL); - return 0; -} - - -void ISPCSync() { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - int err; - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // As long as there are tasks running, wait on the condition variable; - // doing so causes this thread to go to sleep until someone signals on - // the tasksRunningCondition condition variable. - while (numUnfinishedTasks > 0) { - if ((err = pthread_cond_wait(&tasksRunningCondition, - &tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_cond_wait: %s\n", strerror(err)); - exit(1); - } - } - - // We acquire ownership of the condition variable mutex when the above - // pthread_cond_wait returns. - // FIXME: is there a lurking issue here if numUnfinishedTasks gets back - // to zero by the time we get to ISPCSync() and thence we're trying to - // unlock a mutex we don't have a lock on? - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } -} diff --git a/examples/rt/Makefile b/examples/rt/Makefile index bc9ee25d..d3ca759d 100644 --- a/examples/rt/Makefile +++ b/examples/rt/Makefile @@ -1,18 +1,18 @@ ARCH = $(shell uname) -TASK_CXX=tasks_pthreads.cpp +TASK_CXX=../tasks_pthreads.cpp TASK_LIB=-lpthread ifeq ($(ARCH), Darwin) - TASK_CXX=tasks_gcd.cpp + TASK_CXX=../tasks_gcd.cpp TASK_LIB= endif -TASK_OBJ=$(addprefix objs/, $(TASK_CXX:.cpp=.o)) +TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o))) -CXX=g++ -m64 -CXXFLAGS=-Iobjs/ -O3 -Wall +CXX=g++ +CXXFLAGS=-Iobjs/ -O3 -Wall -m64 ISPC=ispc ISPCFLAGS=-O2 --target=sse4x2 --arch=x86-64 @@ -32,6 +32,9 @@ rt: dirs objs/rt.o objs/rt_serial.o objs/rt_ispc.o $(TASK_OBJ) objs/%.o: %.cpp $(CXX) $< $(CXXFLAGS) -c -o $@ +objs/%.o: ../%.cpp + $(CXX) $< $(CXXFLAGS) -c -o $@ + objs/rt.o: objs/rt_ispc.h objs/%_ispc.h objs/%_ispc.o: %.ispc diff --git a/examples/rt/rt.cpp b/examples/rt/rt.cpp index 57f4268f..688d8249 100644 --- a/examples/rt/rt.cpp +++ b/examples/rt/rt.cpp @@ -134,9 +134,6 @@ int main(int argc, char *argv[]) { ensureTargetISAIsSupported(); - extern void TasksInit(); - TasksInit(); - #define READ(var, n) \ if (fread(&(var), sizeof(var), n, f) != (unsigned int)n) { \ fprintf(stderr, "Unexpected EOF reading scene file\n"); \ diff --git a/examples/rt/rt.vcxproj b/examples/rt/rt.vcxproj index f082beb9..426144b4 100755 --- a/examples/rt/rt.vcxproj +++ b/examples/rt/rt.vcxproj @@ -1,4 +1,4 @@ - + @@ -164,9 +164,9 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h - + - \ No newline at end of file + diff --git a/examples/rt/tasks_concrt.cpp b/examples/rt/tasks_concrt.cpp deleted file mode 100644 index d40b40ae..00000000 --- a/examples/rt/tasks_concrt.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* Simple task system implementation for ispc based on Microsoft's - Concurrency Runtime. */ - -#include -#include -using namespace Concurrency; -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); - void *ISPCMalloc(int64_t size, int32_t alignment); - void ISPCFree(void *ptr); -} - -typedef void (*TaskFuncType)(void *, int, int); - -struct TaskInfo { - TaskFuncType ispcFunc; - void *ispcData; -}; - -// This is a simple implementation that just aborts if more than MAX_TASKS -// are launched. It could easily be extended to be more general... - -#define MAX_TASKS 32768 -static int taskOffset; -static TaskInfo taskInfo[MAX_TASKS]; -static event *events[MAX_TASKS]; -static CRITICAL_SECTION criticalSection; -static bool initialized = false; - -void -TasksInit() { - InitializeCriticalSection(&criticalSection); - for (int i = 0; i < MAX_TASKS; ++i) - events[i] = new event; - initialized = true; -} - - -void __cdecl -lRunTask(LPVOID param) { - TaskInfo *ti = (TaskInfo *)param; - - // Actually run the task. - // FIXME: like the tasks_gcd.cpp implementation, this is passing bogus - // values for the threadIndex and threadCount builtins, which in turn - // will cause bugs in code that uses those. FWIW this example doesn't - // use them... - int threadIndex = 0; - int threadCount = 1; - ti->ispcFunc(ti->ispcData, threadIndex, threadCount); - - // Signal the event that this task is done - int taskNum = ti - &taskInfo[0]; - events[taskNum]->set(); -} - - -void -ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Get a TaskInfo struct for this task - EnterCriticalSection(&criticalSection); - TaskInfo *ti = &taskInfo[taskOffset++]; - assert(taskOffset < MAX_TASKS); - LeaveCriticalSection(&criticalSection); - - // And pass it on to the Concurrency Runtime... - ti->ispcFunc = (TaskFuncType)func; - ti->ispcData = data; - CurrentScheduler::ScheduleTask(lRunTask, ti); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - event::wait_for_multiple(&events[0], taskOffset, true, - COOPERATIVE_TIMEOUT_INFINITE); - - for (int i = 0; i < taskOffset; ++i) - events[i]->reset(); - - taskOffset = 0; -} - - -void *ISPCMalloc(int64_t size, int32_t alignment) { - return _aligned_malloc(size, alignment); -} - - -void ISPCFree(void *ptr) { - _aligned_free(ptr); -} diff --git a/examples/rt/tasks_gcd.cpp b/examples/rt/tasks_gcd.cpp deleted file mode 100644 index f759cc37..00000000 --- a/examples/rt/tasks_gcd.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* A simple task system for ispc programs based on Apple's Grand Central - Dispatch. */ - -#include -#include -#include - -static bool initialized = false; -static dispatch_queue_t gcdQueue; -static dispatch_group_t gcdGroup; - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - -struct TaskInfo { - void *func; - void *data; -}; - - -void -TasksInit() { - gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - gcdGroup = dispatch_group_create(); - initialized = true; -} - - -static void -lRunTask(void *ti) { - typedef void (*TaskFuncType)(void *, int, int); - TaskInfo *taskInfo = (TaskInfo *)ti; - - TaskFuncType func = (TaskFuncType)(taskInfo->func); - - // FIXME: these are bogus values; may cause bugs in code that depends - // on them having unique values in different threads. - int threadIndex = 0; - int threadCount = 1; - // Actually run the task - func(taskInfo->data, threadIndex, threadCount); - - // FIXME: taskInfo leaks... -} - - -void ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - TaskInfo *ti = new TaskInfo; - ti->func = func; - ti->data = data; - dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Wait for all of the tasks in the group to complete before returning - dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER); -} diff --git a/examples/stencil/Makefile b/examples/stencil/Makefile index 6d6a9f62..7b40a489 100644 --- a/examples/stencil/Makefile +++ b/examples/stencil/Makefile @@ -1,18 +1,18 @@ ARCH = $(shell uname) -TASK_CXX=tasks_pthreads.cpp +TASK_CXX=../tasks_pthreads.cpp TASK_LIB=-lpthread ifeq ($(ARCH), Darwin) - TASK_CXX=tasks_gcd.cpp + TASK_CXX=../tasks_gcd.cpp TASK_LIB= endif -TASK_OBJ=$(addprefix objs/, $(TASK_CXX:.cpp=.o)) +TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o))) -CXX=g++ -m64 -CXXFLAGS=-Iobjs/ -O3 -Wall +CXX=g++ +CXXFLAGS=-Iobjs/ -O3 -Wall -m64 ISPC=ispc ISPCFLAGS=-O2 --target=sse4x2 --arch=x86-64 @@ -32,6 +32,9 @@ stencil: dirs objs/stencil.o objs/stencil_serial.o objs/stencil_ispc.o $(TASK_OB objs/%.o: %.cpp $(CXX) $< $(CXXFLAGS) -c -o $@ +objs/%.o: ../%.cpp + $(CXX) $< $(CXXFLAGS) -c -o $@ + objs/stencil.o: objs/stencil_ispc.h objs/%_ispc.h objs/%_ispc.o: %.ispc diff --git a/examples/stencil/stencil.cpp b/examples/stencil/stencil.cpp index 302fef2b..a3ce5712 100644 --- a/examples/stencil/stencil.cpp +++ b/examples/stencil/stencil.cpp @@ -102,9 +102,6 @@ void InitData(int Nx, int Ny, int Nz, float *A[2], float *vsq) { int main() { ensureTargetISAIsSupported(); - extern void TasksInit(); - TasksInit(); - int Nx = 256, Ny = 256, Nz = 256; int width = 4; float *Aserial[2], *Aispc[2]; diff --git a/examples/stencil/stencil.vcxproj b/examples/stencil/stencil.vcxproj index 9968955e..f045d08a 100755 --- a/examples/stencil/stencil.vcxproj +++ b/examples/stencil/stencil.vcxproj @@ -1,4 +1,4 @@ - + @@ -164,9 +164,9 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h - + - \ No newline at end of file + diff --git a/examples/stencil/tasks_concrt.cpp b/examples/stencil/tasks_concrt.cpp deleted file mode 100644 index b70d5cbe..00000000 --- a/examples/stencil/tasks_concrt.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* Simple task system implementation for ispc based on Microsoft's - Concurrency Runtime. */ - -#include -#include -using namespace Concurrency; -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); - void *ISPCMalloc(int64_t size, int32_t alignment); - void ISPCFree(void *ptr); -} - -typedef void (*TaskFuncType)(void *, int, int); - -struct TaskInfo { - TaskFuncType ispcFunc; - void *ispcData; -}; - -// This is a simple implementation that just aborts if more than MAX_TASKS -// are launched. It could easily be extended to be more general... - -#define MAX_TASKS 4096 -static int taskOffset; -static TaskInfo taskInfo[MAX_TASKS]; -static event *events[MAX_TASKS]; -static CRITICAL_SECTION criticalSection; -static bool initialized = false; - -void -TasksInit() { - InitializeCriticalSection(&criticalSection); - for (int i = 0; i < MAX_TASKS; ++i) - events[i] = new event; - initialized = true; -} - - -void __cdecl -lRunTask(LPVOID param) { - TaskInfo *ti = (TaskInfo *)param; - - // Actually run the task. - // FIXME: like the tasks_gcd.cpp implementation, this is passing bogus - // values for the threadIndex and threadCount builtins, which in turn - // will cause bugs in code that uses those. FWIW this example doesn't - // use them... - int threadIndex = 0; - int threadCount = 1; - ti->ispcFunc(ti->ispcData, threadIndex, threadCount); - - // Signal the event that this task is done - int taskNum = ti - &taskInfo[0]; - events[taskNum]->set(); -} - - -void -ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Get a TaskInfo struct for this task - EnterCriticalSection(&criticalSection); - TaskInfo *ti = &taskInfo[taskOffset++]; - assert(taskOffset < MAX_TASKS); - LeaveCriticalSection(&criticalSection); - - // And pass it on to the Concurrency Runtime... - ti->ispcFunc = (TaskFuncType)func; - ti->ispcData = data; - CurrentScheduler::ScheduleTask(lRunTask, ti); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - event::wait_for_multiple(&events[0], taskOffset, true, - COOPERATIVE_TIMEOUT_INFINITE); - - for (int i = 0; i < taskOffset; ++i) - events[i]->reset(); - - taskOffset = 0; -} - - -void *ISPCMalloc(int64_t size, int32_t alignment) { - return _aligned_malloc(size, alignment); -} - - -void ISPCFree(void *ptr) { - _aligned_free(ptr); -} diff --git a/examples/stencil/tasks_pthreads.cpp b/examples/stencil/tasks_pthreads.cpp deleted file mode 100644 index 6e95dbff..00000000 --- a/examples/stencil/tasks_pthreads.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - - -static int nThreads; -static pthread_t *threads; -static pthread_mutex_t taskQueueMutex; -static std::vector > taskQueue; -static sem_t *workerSemaphore; -static uint32_t numUnfinishedTasks; -static pthread_mutex_t tasksRunningConditionMutex; -static pthread_cond_t tasksRunningCondition; - -static void *lTaskEntry(void *arg); - -/** Figure out how many CPU cores there are in the system - */ -static int -lNumCPUCores() { -#if defined(__linux__) - return sysconf(_SC_NPROCESSORS_ONLN); -#else - // Mac - int mib[2]; - mib[0] = CTL_HW; - size_t length = 2; - if (sysctlnametomib("hw.logicalcpu", mib, &length) == -1) { - fprintf(stderr, "sysctlnametomib() filed. Guessing 2 cores."); - return 2; - } - assert(length == 2); - - int nCores = 0; - size_t size = sizeof(nCores); - - if (sysctl(mib, 2, &nCores, &size, NULL, 0) == -1) { - fprintf(stderr, "sysctl() to find number of cores present failed. Guessing 2."); - return 2; - } - return nCores; -#endif -} - -void -TasksInit() { - nThreads = lNumCPUCores(); - - threads = new pthread_t[nThreads]; - - int err; - if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - char name[32]; - sprintf(name, "ispc_task.%d", (int)getpid()); - workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0); - if (!workerSemaphore) { - fprintf(stderr, "Error creating semaphore: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_cond_init(&tasksRunningCondition, NULL)) != 0) { - fprintf(stderr, "Error creating condition variable: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_mutex_init(&tasksRunningConditionMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - for (int i = 0; i < nThreads; ++i) { - err = pthread_create(&threads[i], NULL, &lTaskEntry, reinterpret_cast(i)); - if (err != 0) { - fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); - exit(1); - } - } -} - - -void -ISPCLaunch(void *f, void *d) { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // - // Acquire mutex, add task - // - int err; - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - taskQueue.push_back(std::make_pair(f, d)); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Update count of number of tasks left to run - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - ++numUnfinishedTasks; - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // - // Post to the worker semaphore to wake up worker threads that are - // sleeping waiting for tasks to show up - // - if ((err = sem_post(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_post: %s\n", strerror(err)); - exit(1); - } -} - - -static void * -lTaskEntry(void *arg) { - int threadIndex = int(reinterpret_cast(arg)); - int threadCount = nThreads; - - while (true) { - int err; - if ((err = sem_wait(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_wait: %s\n", strerror(err)); - exit(1); - } - - std::pair myTask; - // - // Acquire mutex, get task - // - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - if (taskQueue.size() == 0) { - // - // Task queue is empty, go back and wait on the semaphore - // - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - continue; - } - - myTask = taskQueue.back(); - taskQueue.pop_back(); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Do work for _myTask_ - // - typedef void (*TaskFunType)(void *, int, int); - TaskFunType func = (TaskFunType)myTask.first; - func(myTask.second, threadIndex, threadCount); - - // - // Decrement the number of unfinished tasks counter - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - int unfinished = --numUnfinishedTasks; - if (unfinished == 0) { - // - // Signal the "no more tasks are running" condition if all of - // them are done. - // - int err; - if ((err = pthread_cond_signal(&tasksRunningCondition)) != 0) { - fprintf(stderr, "Error from pthread_cond_signal: %s\n", strerror(err)); - exit(1); - } - } - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - } - - pthread_exit(NULL); - return 0; -} - - -void ISPCSync() { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - int err; - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // As long as there are tasks running, wait on the condition variable; - // doing so causes this thread to go to sleep until someone signals on - // the tasksRunningCondition condition variable. - while (numUnfinishedTasks > 0) { - if ((err = pthread_cond_wait(&tasksRunningCondition, - &tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_cond_wait: %s\n", strerror(err)); - exit(1); - } - } - - // We acquire ownership of the condition variable mutex when the above - // pthread_cond_wait returns. - // FIXME: is there a lurking issue here if numUnfinishedTasks gets back - // to zero by the time we get to ISPCSync() and thence we're trying to - // unlock a mutex we don't have a lock on? - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } -} diff --git a/examples/taskinfo.h b/examples/taskinfo.h new file mode 100644 index 00000000..4806bdaa --- /dev/null +++ b/examples/taskinfo.h @@ -0,0 +1,180 @@ +/* + Copyright (c) 2011, Intel Corporation + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS + IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER + OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef TASKINFO_H +#define TASKINFO_H 1 + +#ifdef _MSC_VER +#define ISPC_IS_WINDOWS +#elif defined(__linux__) +#define ISPC_IS_LINUX +#elif defined(__APPLE__) +#define ISPC_IS_APPLE +#endif + +#ifdef ISPC_IS_WINDOWS +#define NOMINMAX +#include +#include +using namespace Concurrency; +#endif // ISPC_IS_WINDOWS + +#if (__SIZEOF_POINTER__ == 4) || defined(__i386__) || defined(_WIN32) +#define ISPC_POINTER_BYTES 4 +#elif (__SIZEOF_POINTER__ == 8) || defined(__x86_64__) || defined(__amd64__) || defined(_WIN64) +#define ISPC_POINTER_BYTES 8 +#else +#error "Pointer size unknown!" +#endif // __SIZEOF_POINTER__ + +#include +#include +#include +#include + +typedef struct TaskInfo { + void *func; + void *data; +#if defined(ISPC_IS_WINDOWS) + event taskEvent; +#endif +} TaskInfo; + + +#ifndef ISPC_IS_WINDOWS +static int32_t +lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) { + int32_t result; + __asm__ __volatile__("lock\ncmpxchgl %2,%1" + : "=a"(result), "=m"(*v) + : "q"(newValue), "0"(oldValue) + : "memory"); + __asm__ __volatile__("mfence":::"memory"); + return result; +} +#endif // !ISPC_IS_WINDOWS + + +static void * +lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) { +#ifdef ISPC_IS_WINDOWS + return InterlockedCompareExchangePointer(v, newValue, oldValue); +#else + void *result; +#if (ISPC_POINTER_BYTES == 4) + __asm__ __volatile__("lock\ncmpxchgd %2,%1" + : "=a"(result), "=m"(*v) + : "q"(newValue), "0"(oldValue) + : "memory"); +#else + __asm__ __volatile__("lock\ncmpxchgq %2,%1" + : "=a"(result), "=m"(*v) + : "q"(newValue), "0"(oldValue) + : "memory"); +#endif // ISPC_POINTER_BYTES + __asm__ __volatile__("mfence":::"memory"); + return result; +#endif // ISPC_IS_WINDOWS +} + + +#ifndef ISPC_IS_WINDOWS +static int32_t +lAtomicAdd32(volatile int32_t *v, int32_t delta) { + // Do atomic add with gcc x86 inline assembly + int32_t origValue; + __asm__ __volatile__("lock\n" + "xaddl %0,%1" + : "=r"(origValue), "=m"(*v) : "0"(delta) + : "memory"); + return origValue; +} +#endif + +#define LOG_TASK_QUEUE_CHUNK_SIZE 13 +#define MAX_TASK_QUEUE_CHUNKS 1024 +#define TASK_QUEUE_CHUNK_SIZE (1<> LOG_TASK_QUEUE_CHUNK_SIZE); + int offset = myCoord & (TASK_QUEUE_CHUNK_SIZE-1); + if (index == MAX_TASK_QUEUE_CHUNKS) { + fprintf(stderr, "A total of %d tasks have been launched--the simple " + "built-in task system can handle no more. Exiting.", myCoord); + exit(1); + } + + if (taskInfo[index] == NULL) { + TaskInfo *newChunk = new TaskInfo[TASK_QUEUE_CHUNK_SIZE]; + if (lAtomicCompareAndSwapPointer((void **)&taskInfo[index], newChunk, + NULL) != NULL) { + // failure--someone else got it, but that's cool + assert(taskInfo[index] != NULL); + free(newChunk); + } + } + + return &taskInfo[index][offset]; +} + + +static inline void +lResetTaskInfo() { + nextTaskInfoCoordinate = 0; +} + +#endif // TASKINFO_H diff --git a/examples/mandelbrot_tasks/tasks_concrt.cpp b/examples/tasks_concrt.cpp similarity index 59% rename from examples/mandelbrot_tasks/tasks_concrt.cpp rename to examples/tasks_concrt.cpp index b70d5cbe..dac9cdae 100644 --- a/examples/mandelbrot_tasks/tasks_concrt.cpp +++ b/examples/tasks_concrt.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2010-2011, Intel Corporation + Copyright (c) 2011, Intel Corporation All rights reserved. Redistribution and use in source and binary forms, with or without @@ -31,6 +31,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "taskinfo.h" + /* Simple task system implementation for ispc based on Microsoft's Concurrency Runtime. */ @@ -41,6 +43,7 @@ using namespace Concurrency; #include #include #include +#include // ispc expects these functions to have C linkage / not be mangled extern "C" { @@ -50,84 +53,44 @@ extern "C" { void ISPCFree(void *ptr); } -typedef void (*TaskFuncType)(void *, int, int); - -struct TaskInfo { - TaskFuncType ispcFunc; - void *ispcData; -}; - -// This is a simple implementation that just aborts if more than MAX_TASKS -// are launched. It could easily be extended to be more general... - -#define MAX_TASKS 4096 -static int taskOffset; -static TaskInfo taskInfo[MAX_TASKS]; -static event *events[MAX_TASKS]; -static CRITICAL_SECTION criticalSection; -static bool initialized = false; - -void -TasksInit() { - InitializeCriticalSection(&criticalSection); - for (int i = 0; i < MAX_TASKS; ++i) - events[i] = new event; - initialized = true; -} - void __cdecl lRunTask(LPVOID param) { TaskInfo *ti = (TaskInfo *)param; // Actually run the task. - // FIXME: like the tasks_gcd.cpp implementation, this is passing bogus + // FIXME: like the GCD implementation for OS X, this is passing bogus // values for the threadIndex and threadCount builtins, which in turn - // will cause bugs in code that uses those. FWIW this example doesn't - // use them... + // will cause bugs in code that uses those. int threadIndex = 0; int threadCount = 1; - ti->ispcFunc(ti->ispcData, threadIndex, threadCount); + TaskFuncType func = (TaskFuncType)ti->func; + func(ti->data, threadIndex, threadCount); // Signal the event that this task is done - int taskNum = ti - &taskInfo[0]; - events[taskNum]->set(); + ti->taskEvent.set(); } void ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Get a TaskInfo struct for this task - EnterCriticalSection(&criticalSection); - TaskInfo *ti = &taskInfo[taskOffset++]; - assert(taskOffset < MAX_TASKS); - LeaveCriticalSection(&criticalSection); - - // And pass it on to the Concurrency Runtime... - ti->ispcFunc = (TaskFuncType)func; - ti->ispcData = data; + TaskInfo *ti = lGetTaskInfo(); + ti->func = (TaskFuncType)func; + ti->data = data; + ti->taskEvent.reset(); CurrentScheduler::ScheduleTask(lRunTask, ti); } void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); + for (int i = 0; i < nextTaskInfoCoordinate; ++i) { + int index = (i >> LOG_TASK_QUEUE_CHUNK_SIZE); + int offset = i & (TASK_QUEUE_CHUNK_SIZE-1); + taskInfo[index][offset].taskEvent.wait(); + taskInfo[index][offset].taskEvent.reset(); } - event::wait_for_multiple(&events[0], taskOffset, true, - COOPERATIVE_TIMEOUT_INFINITE); - - for (int i = 0; i < taskOffset; ++i) - events[i]->reset(); - - taskOffset = 0; + lResetTaskInfo(); } diff --git a/examples/stencil/tasks_gcd.cpp b/examples/tasks_gcd.cpp similarity index 78% rename from examples/stencil/tasks_gcd.cpp rename to examples/tasks_gcd.cpp index f759cc37..99e616a0 100644 --- a/examples/stencil/tasks_gcd.cpp +++ b/examples/tasks_gcd.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2010-2011, Intel Corporation + Copyright (c) 2011, Intel Corporation All rights reserved. Redistribution and use in source and binary forms, with or without @@ -31,61 +31,57 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "taskinfo.h" + /* A simple task system for ispc programs based on Apple's Grand Central Dispatch. */ - #include #include -#include -static bool initialized = false; +static int initialized = 0; +static volatile int32_t lock = 0; static dispatch_queue_t gcdQueue; static dispatch_group_t gcdGroup; // ispc expects these functions to have C linkage / not be mangled -extern "C" { +extern "C" { void ISPCLaunch(void *f, void *data); void ISPCSync(); } -struct TaskInfo { - void *func; - void *data; -}; - - -void -TasksInit() { - gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - gcdGroup = dispatch_group_create(); - initialized = true; -} - static void lRunTask(void *ti) { - typedef void (*TaskFuncType)(void *, int, int); TaskInfo *taskInfo = (TaskInfo *)ti; - - TaskFuncType func = (TaskFuncType)(taskInfo->func); - // FIXME: these are bogus values; may cause bugs in code that depends // on them having unique values in different threads. int threadIndex = 0; int threadCount = 1; + TaskFuncType func = (TaskFuncType)(taskInfo->func); + // Actually run the task func(taskInfo->data, threadIndex, threadCount); - - // FIXME: taskInfo leaks... } void ISPCLaunch(void *func, void *data) { if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); + while (1) { + if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) { + if (!initialized) { + gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); + gcdGroup = dispatch_group_create(); + lInitTaskInfo(); + __asm__ __volatile__("mfence":::"memory"); + initialized = 1; + } + lock = 0; + break; + } + } } - TaskInfo *ti = new TaskInfo; + + TaskInfo *ti = lGetTaskInfo(); ti->func = func; ti->data = data; dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask); @@ -93,11 +89,11 @@ void ISPCLaunch(void *func, void *data) { void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } + if (!initialized) + return; // Wait for all of the tasks in the group to complete before returning dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER); + + lResetTaskInfo(); } diff --git a/examples/rt/tasks_pthreads.cpp b/examples/tasks_pthreads.cpp similarity index 83% rename from examples/rt/tasks_pthreads.cpp rename to examples/tasks_pthreads.cpp index 6e95dbff..9ce9d827 100644 --- a/examples/rt/tasks_pthreads.cpp +++ b/examples/tasks_pthreads.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2010-2011, Intel Corporation + Copyright (c) 2011, Intel Corporation All rights reserved. Redistribution and use in source and binary forms, with or without @@ -31,6 +31,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "taskinfo.h" #include #include #include @@ -45,7 +46,18 @@ #include #include #include -#include + +static int initialized = 0; +static volatile int32_t lock = 0; + +static int nThreads; +static pthread_t *threads; +static pthread_mutex_t taskQueueMutex; +static int nextTaskToRun; +static sem_t *workerSemaphore; +static uint32_t numUnfinishedTasks; +static pthread_mutex_t tasksRunningConditionMutex; +static pthread_cond_t tasksRunningCondition; // ispc expects these functions to have C linkage / not be mangled extern "C" { @@ -53,51 +65,21 @@ extern "C" { void ISPCSync(); } - -static int nThreads; -static pthread_t *threads; -static pthread_mutex_t taskQueueMutex; -static std::vector > taskQueue; -static sem_t *workerSemaphore; -static uint32_t numUnfinishedTasks; -static pthread_mutex_t tasksRunningConditionMutex; -static pthread_cond_t tasksRunningCondition; - static void *lTaskEntry(void *arg); /** Figure out how many CPU cores there are in the system */ static int lNumCPUCores() { -#if defined(__linux__) return sysconf(_SC_NPROCESSORS_ONLN); -#else - // Mac - int mib[2]; - mib[0] = CTL_HW; - size_t length = 2; - if (sysctlnametomib("hw.logicalcpu", mib, &length) == -1) { - fprintf(stderr, "sysctlnametomib() filed. Guessing 2 cores."); - return 2; - } - assert(length == 2); - - int nCores = 0; - size_t size = sizeof(nCores); - - if (sysctl(mib, 2, &nCores, &size, NULL, 0) == -1) { - fprintf(stderr, "sysctl() to find number of cores present failed. Guessing 2."); - return 2; - } - return nCores; -#endif } -void -TasksInit() { + +static void +lTasksInit() { nThreads = lNumCPUCores(); - threads = new pthread_t[nThreads]; + threads = (pthread_t *)malloc(nThreads * sizeof(pthread_t)); int err; if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) { @@ -124,7 +106,7 @@ TasksInit() { } for (int i = 0; i < nThreads; ++i) { - err = pthread_create(&threads[i], NULL, &lTaskEntry, reinterpret_cast(i)); + err = pthread_create(&threads[i], NULL, &lTaskEntry, (void *)(i)); if (err != 0) { fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); exit(1); @@ -135,21 +117,35 @@ TasksInit() { void ISPCLaunch(void *f, void *d) { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); + int err; + + if (!initialized) { + while (1) { + if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) { + if (!initialized) { + lTasksInit(); + __asm__ __volatile__("mfence":::"memory"); + initialized = 1; + } + lock = 0; + break; + } + } } // // Acquire mutex, add task // - int err; if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); exit(1); } - taskQueue.push_back(std::make_pair(f, d)); + // Need a mutex here to ensure we get this filled in before a worker + // grabs it and starts running... + TaskInfo *ti = lGetTaskInfo(); + ti->func = f; + ti->data = d; if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); @@ -164,6 +160,7 @@ ISPCLaunch(void *f, void *d) { exit(1); } + // FIXME: is this redundant with nextTaskInfoCoordinate? ++numUnfinishedTasks; if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { @@ -184,17 +181,17 @@ ISPCLaunch(void *f, void *d) { static void * lTaskEntry(void *arg) { - int threadIndex = int(reinterpret_cast(arg)); + int threadIndex = (int)arg; int threadCount = nThreads; + TaskFuncType func; - while (true) { + while (1) { int err; if ((err = sem_wait(workerSemaphore)) != 0) { fprintf(stderr, "Error from sem_wait: %s\n", strerror(err)); exit(1); } - std::pair myTask; // // Acquire mutex, get task // @@ -202,7 +199,8 @@ lTaskEntry(void *arg) { fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); exit(1); } - if (taskQueue.size() == 0) { + + if (nextTaskToRun == nextTaskInfoCoordinate) { // // Task queue is empty, go back and wait on the semaphore // @@ -213,8 +211,10 @@ lTaskEntry(void *arg) { continue; } - myTask = taskQueue.back(); - taskQueue.pop_back(); + int runCoord = nextTaskToRun++; + int index = (runCoord >> LOG_TASK_QUEUE_CHUNK_SIZE); + int offset = runCoord & (TASK_QUEUE_CHUNK_SIZE-1); + TaskInfo *myTask = &taskInfo[index][offset]; if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); @@ -224,9 +224,8 @@ lTaskEntry(void *arg) { // // Do work for _myTask_ // - typedef void (*TaskFunType)(void *, int, int); - TaskFunType func = (TaskFunType)myTask.first; - func(myTask.second, threadIndex, threadCount); + func = (TaskFuncType)myTask->func; + func(myTask->data, threadIndex, threadCount); // // Decrement the number of unfinished tasks counter @@ -236,6 +235,8 @@ lTaskEntry(void *arg) { exit(1); } + // FIXME: can this be a comparison of (nextTaskToRun == nextTaskInfoCoordinate)? + // (I don't think so--think there is a race...) int unfinished = --numUnfinishedTasks; if (unfinished == 0) { // @@ -261,11 +262,6 @@ lTaskEntry(void *arg) { void ISPCSync() { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - int err; if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); @@ -283,6 +279,9 @@ void ISPCSync() { } } + lResetTaskInfo(); + nextTaskToRun = 0; + // We acquire ownership of the condition variable mutex when the above // pthread_cond_wait returns. // FIXME: is there a lurking issue here if numUnfinishedTasks gets back diff --git a/examples/volume_rendering/Makefile b/examples/volume_rendering/Makefile index ecd7d421..17880557 100644 --- a/examples/volume_rendering/Makefile +++ b/examples/volume_rendering/Makefile @@ -1,18 +1,18 @@ ARCH = $(shell uname) -TASK_CXX=tasks_pthreads.cpp +TASK_CXX=../tasks_pthreads.cpp TASK_LIB=-lpthread ifeq ($(ARCH), Darwin) - TASK_CXX=tasks_gcd.cpp + TASK_CXX=../tasks_gcd.cpp TASK_LIB= endif -TASK_OBJ=$(addprefix objs/, $(TASK_CXX:.cpp=.o)) +TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o))) -CXX=g++ -m64 -CXXFLAGS=-Iobjs/ -O3 -Wall +CXX=g++ +CXXFLAGS=-Iobjs/ -O3 -Wall -m64 ISPC=ispc ISPCFLAGS=-O2 --target=sse4x2 --arch=x86-64 @@ -32,6 +32,9 @@ volume: dirs objs/volume.o objs/volume_serial.o objs/volume_ispc.o $(TASK_OBJ) objs/%.o: %.cpp $(CXX) $< $(CXXFLAGS) -c -o $@ +objs/%.o: ../%.cpp + $(CXX) $< $(CXXFLAGS) -c -o $@ + objs/volume.o: objs/volume_ispc.h objs/%_ispc.h objs/%_ispc.o: %.ispc diff --git a/examples/volume_rendering/tasks_concrt.cpp b/examples/volume_rendering/tasks_concrt.cpp deleted file mode 100644 index d40b40ae..00000000 --- a/examples/volume_rendering/tasks_concrt.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* Simple task system implementation for ispc based on Microsoft's - Concurrency Runtime. */ - -#include -#include -using namespace Concurrency; -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); - void *ISPCMalloc(int64_t size, int32_t alignment); - void ISPCFree(void *ptr); -} - -typedef void (*TaskFuncType)(void *, int, int); - -struct TaskInfo { - TaskFuncType ispcFunc; - void *ispcData; -}; - -// This is a simple implementation that just aborts if more than MAX_TASKS -// are launched. It could easily be extended to be more general... - -#define MAX_TASKS 32768 -static int taskOffset; -static TaskInfo taskInfo[MAX_TASKS]; -static event *events[MAX_TASKS]; -static CRITICAL_SECTION criticalSection; -static bool initialized = false; - -void -TasksInit() { - InitializeCriticalSection(&criticalSection); - for (int i = 0; i < MAX_TASKS; ++i) - events[i] = new event; - initialized = true; -} - - -void __cdecl -lRunTask(LPVOID param) { - TaskInfo *ti = (TaskInfo *)param; - - // Actually run the task. - // FIXME: like the tasks_gcd.cpp implementation, this is passing bogus - // values for the threadIndex and threadCount builtins, which in turn - // will cause bugs in code that uses those. FWIW this example doesn't - // use them... - int threadIndex = 0; - int threadCount = 1; - ti->ispcFunc(ti->ispcData, threadIndex, threadCount); - - // Signal the event that this task is done - int taskNum = ti - &taskInfo[0]; - events[taskNum]->set(); -} - - -void -ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Get a TaskInfo struct for this task - EnterCriticalSection(&criticalSection); - TaskInfo *ti = &taskInfo[taskOffset++]; - assert(taskOffset < MAX_TASKS); - LeaveCriticalSection(&criticalSection); - - // And pass it on to the Concurrency Runtime... - ti->ispcFunc = (TaskFuncType)func; - ti->ispcData = data; - CurrentScheduler::ScheduleTask(lRunTask, ti); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - event::wait_for_multiple(&events[0], taskOffset, true, - COOPERATIVE_TIMEOUT_INFINITE); - - for (int i = 0; i < taskOffset; ++i) - events[i]->reset(); - - taskOffset = 0; -} - - -void *ISPCMalloc(int64_t size, int32_t alignment) { - return _aligned_malloc(size, alignment); -} - - -void ISPCFree(void *ptr) { - _aligned_free(ptr); -} diff --git a/examples/volume_rendering/tasks_gcd.cpp b/examples/volume_rendering/tasks_gcd.cpp deleted file mode 100644 index f759cc37..00000000 --- a/examples/volume_rendering/tasks_gcd.cpp +++ /dev/null @@ -1,103 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -/* A simple task system for ispc programs based on Apple's Grand Central - Dispatch. */ - -#include -#include -#include - -static bool initialized = false; -static dispatch_queue_t gcdQueue; -static dispatch_group_t gcdGroup; - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - -struct TaskInfo { - void *func; - void *data; -}; - - -void -TasksInit() { - gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); - gcdGroup = dispatch_group_create(); - initialized = true; -} - - -static void -lRunTask(void *ti) { - typedef void (*TaskFuncType)(void *, int, int); - TaskInfo *taskInfo = (TaskInfo *)ti; - - TaskFuncType func = (TaskFuncType)(taskInfo->func); - - // FIXME: these are bogus values; may cause bugs in code that depends - // on them having unique values in different threads. - int threadIndex = 0; - int threadCount = 1; - // Actually run the task - func(taskInfo->data, threadIndex, threadCount); - - // FIXME: taskInfo leaks... -} - - -void ISPCLaunch(void *func, void *data) { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - TaskInfo *ti = new TaskInfo; - ti->func = func; - ti->data = data; - dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask); -} - - -void ISPCSync() { - if (!initialized) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // Wait for all of the tasks in the group to complete before returning - dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER); -} diff --git a/examples/volume_rendering/tasks_pthreads.cpp b/examples/volume_rendering/tasks_pthreads.cpp deleted file mode 100644 index 6e95dbff..00000000 --- a/examples/volume_rendering/tasks_pthreads.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/* - Copyright (c) 2010-2011, Intel Corporation - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - * Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - * Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - * Neither the name of Intel Corporation nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS - IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A - PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER - OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, - EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, - PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void *f, void *data); - void ISPCSync(); -} - - -static int nThreads; -static pthread_t *threads; -static pthread_mutex_t taskQueueMutex; -static std::vector > taskQueue; -static sem_t *workerSemaphore; -static uint32_t numUnfinishedTasks; -static pthread_mutex_t tasksRunningConditionMutex; -static pthread_cond_t tasksRunningCondition; - -static void *lTaskEntry(void *arg); - -/** Figure out how many CPU cores there are in the system - */ -static int -lNumCPUCores() { -#if defined(__linux__) - return sysconf(_SC_NPROCESSORS_ONLN); -#else - // Mac - int mib[2]; - mib[0] = CTL_HW; - size_t length = 2; - if (sysctlnametomib("hw.logicalcpu", mib, &length) == -1) { - fprintf(stderr, "sysctlnametomib() filed. Guessing 2 cores."); - return 2; - } - assert(length == 2); - - int nCores = 0; - size_t size = sizeof(nCores); - - if (sysctl(mib, 2, &nCores, &size, NULL, 0) == -1) { - fprintf(stderr, "sysctl() to find number of cores present failed. Guessing 2."); - return 2; - } - return nCores; -#endif -} - -void -TasksInit() { - nThreads = lNumCPUCores(); - - threads = new pthread_t[nThreads]; - - int err; - if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - char name[32]; - sprintf(name, "ispc_task.%d", (int)getpid()); - workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0); - if (!workerSemaphore) { - fprintf(stderr, "Error creating semaphore: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_cond_init(&tasksRunningCondition, NULL)) != 0) { - fprintf(stderr, "Error creating condition variable: %s\n", strerror(err)); - exit(1); - } - - if ((err = pthread_mutex_init(&tasksRunningConditionMutex, NULL)) != 0) { - fprintf(stderr, "Error creating mutex: %s\n", strerror(err)); - exit(1); - } - - for (int i = 0; i < nThreads; ++i) { - err = pthread_create(&threads[i], NULL, &lTaskEntry, reinterpret_cast(i)); - if (err != 0) { - fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); - exit(1); - } - } -} - - -void -ISPCLaunch(void *f, void *d) { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - // - // Acquire mutex, add task - // - int err; - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - taskQueue.push_back(std::make_pair(f, d)); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Update count of number of tasks left to run - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - ++numUnfinishedTasks; - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // - // Post to the worker semaphore to wake up worker threads that are - // sleeping waiting for tasks to show up - // - if ((err = sem_post(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_post: %s\n", strerror(err)); - exit(1); - } -} - - -static void * -lTaskEntry(void *arg) { - int threadIndex = int(reinterpret_cast(arg)); - int threadCount = nThreads; - - while (true) { - int err; - if ((err = sem_wait(workerSemaphore)) != 0) { - fprintf(stderr, "Error from sem_wait: %s\n", strerror(err)); - exit(1); - } - - std::pair myTask; - // - // Acquire mutex, get task - // - if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - if (taskQueue.size() == 0) { - // - // Task queue is empty, go back and wait on the semaphore - // - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - continue; - } - - myTask = taskQueue.back(); - taskQueue.pop_back(); - - if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err)); - exit(1); - } - - // - // Do work for _myTask_ - // - typedef void (*TaskFunType)(void *, int, int); - TaskFunType func = (TaskFunType)myTask.first; - func(myTask.second, threadIndex, threadCount); - - // - // Decrement the number of unfinished tasks counter - // - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - int unfinished = --numUnfinishedTasks; - if (unfinished == 0) { - // - // Signal the "no more tasks are running" condition if all of - // them are done. - // - int err; - if ((err = pthread_cond_signal(&tasksRunningCondition)) != 0) { - fprintf(stderr, "Error from pthread_cond_signal: %s\n", strerror(err)); - exit(1); - } - } - - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - } - - pthread_exit(NULL); - return 0; -} - - -void ISPCSync() { - if (threads == NULL) { - fprintf(stderr, "You must call TasksInit() before launching tasks.\n"); - exit(1); - } - - int err; - if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } - - // As long as there are tasks running, wait on the condition variable; - // doing so causes this thread to go to sleep until someone signals on - // the tasksRunningCondition condition variable. - while (numUnfinishedTasks > 0) { - if ((err = pthread_cond_wait(&tasksRunningCondition, - &tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_cond_wait: %s\n", strerror(err)); - exit(1); - } - } - - // We acquire ownership of the condition variable mutex when the above - // pthread_cond_wait returns. - // FIXME: is there a lurking issue here if numUnfinishedTasks gets back - // to zero by the time we get to ISPCSync() and thence we're trying to - // unlock a mutex we don't have a lock on? - if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) { - fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err)); - exit(1); - } -} diff --git a/examples/volume_rendering/volume.cpp b/examples/volume_rendering/volume.cpp index 261ccace..cb9eb739 100644 --- a/examples/volume_rendering/volume.cpp +++ b/examples/volume_rendering/volume.cpp @@ -174,9 +174,6 @@ int main(int argc, char *argv[]) { ensureTargetISAIsSupported(); - extern void TasksInit(); - TasksInit(); - // // Load viewing data and the volume density data // diff --git a/examples/volume_rendering/volume.vcxproj b/examples/volume_rendering/volume.vcxproj index b436ccb9..fecf79d6 100755 --- a/examples/volume_rendering/volume.vcxproj +++ b/examples/volume_rendering/volume.vcxproj @@ -143,7 +143,7 @@ - +