diff --git a/examples/tasksys.cpp b/examples/tasksys.cpp index 4ce5d354..d747140b 100644 --- a/examples/tasksys.cpp +++ b/examples/tasksys.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2011, Intel Corporation + Copyright (c) 2011-2012, Intel Corporation All rights reserved. Redistribution and use in source and binary forms, with or without @@ -40,21 +40,68 @@ Runtime Requirements" for information about the task-related entrypoints that are implemented here. - There are three task systems in this file: one built using Microsoft's - Concurrency Runtime, one built with Apple's Grand Central Dispatch, and - one built on top of bare pthreads. + There are several task systems in this file, built using: + - Microsoft's Concurrency Runtime (ISPC_USE_CONCRT) + - Apple's Grand Central Dispatch (ISPC_USE_GCD) + - bare pthreads (ISPC_USE_PTHREADS, ISPC_USE_PTHREADS_FULLY_SUBSCRIBED) + - Cilk Plus (ISPC_USE_CILK) + - TBB (ISPC_USE_TBB_TASK_GROUP, ISPC_USE_TBB_PARALLEL_FOR) + - OpenMP (ISPC_USE_OMP) + + The task system implementation can be selected at compile time, by defining + the appropriate preprocessor symbol on the command line (for e.g.: -D ISPC_USE_TBB). + Not all combinations of platform and task system are meaningful. + If no task system is requested, a reasonable default task system for the platform + is selected. Here are the task systems that can be selected: + +#define ISPC_USE_GCD +#define ISPC_USE_CONCRT +#define ISPC_USE_PTHREADS +#define ISPC_USE_PTHREADS_FULLY_SUBSCRIBED +#define ISPC_USE_CILK +#define ISPC_USE_OMP +#define ISPC_USE_TBB_TASK_GROUP +#define ISPC_USE_TBB_PARALLEL_FOR + + The ISPC_USE_PTHREADS_FULLY_SUBSCRIBED model essentially takes over the machine + by assigning one pthread to each hyper-thread, and then uses spinlocks and atomics + for task management. This model is useful for KNC where tasks can take over + the machine, but less so when there are other tasks that need running on the machine. + +#define ISPC_USE_CREW + */ +#if !(defined ISPC_USE_CONCRT || defined ISPC_USE_GCD || \ + defined ISPC_USE_PTHREADS || defined ISPC_USE_PTHREADS_FULLY_SUBSCRIBED || \ + defined ISPC_USE_TBB_TASK_GROUP || defined ISPC_USE_TBB_PARALLEL_FOR || \ + defined ISPC_USE_OMP || defined ISPC_USE_CILK ) + + // If no task model chosen from the compiler cmdline, pick a reasonable default + #if defined(_WIN32) || defined(_WIN64) + #define ISPC_USE_CONCRT + #elif defined(__linux__) + #define ISPC_USE_PTHREADS + #elif defined(__APPLE__) + #define ISPC_USE_GCD + #endif + #if defined(__KNC__) + #define ISPC_USE_PTHREADS + #endif + +#endif // No task model specified on compiler cmdline + #if defined(_WIN32) || defined(_WIN64) - #define ISPC_IS_WINDOWS - #define ISPC_USE_CONCRT +#define ISPC_IS_WINDOWS #elif defined(__linux__) - #define ISPC_IS_LINUX - #define ISPC_USE_PTHREADS +#define ISPC_IS_LINUX #elif defined(__APPLE__) - #define ISPC_IS_APPLE - #define ISPC_USE_GCD +#define ISPC_IS_APPLE #endif +#if defined(__KNC__) +#define ISPC_IS_KNC +#endif + #define DBG(x) @@ -83,9 +130,37 @@ #include #include #endif // ISPC_USE_PTHREADS +#ifdef ISPC_USE_PTHREADS_FULLY_SUBSCRIBED +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +//#include +#include +#endif // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED +#ifdef ISPC_USE_TBB_PARALLEL_FOR + #include +#endif // ISPC_USE_TBB_PARALLEL_FOR +#ifdef ISPC_USE_TBB_TASK_GROUP + #include +#endif // ISPC_USE_TBB_TASK_GROUP +#ifdef ISPC_USE_CILK + #include +#endif // ISPC_USE_TBB +#ifdef ISPC_USE_OMP + #include +#endif // ISPC_USE_OMP #ifdef ISPC_IS_LINUX #include #endif // ISPC_IS_LINUX + #include #include #include @@ -107,6 +182,13 @@ struct TaskInfo { #endif }; +// ispc expects these functions to have C linkage / not be mangled +extern "C" { + void ISPCLaunch(void **handlePtr, void *f, void *data, int count); + void *ISPCAlloc(void **handlePtr, int64_t size, int32_t alignment); + void ISPCSync(void *handle); +} + /////////////////////////////////////////////////////////////////////////// // TaskGroupBase @@ -181,7 +263,7 @@ inline TaskGroupBase::~TaskGroupBase() { // Note: don't delete memBuffers[0], since it points to the start of // the "mem" member! for (int i = 1; i < NUM_MEM_BUFFERS; ++i) - delete[] memBuffers[i]; + delete[](memBuffers[i]); } @@ -224,10 +306,10 @@ TaskGroupBase::GetTaskInfo(int index) { inline void * TaskGroupBase::AllocMemory(int64_t size, int32_t alignment) { char *basePtr = memBuffers[curMemBuffer]; - int64_t iptr = (int64_t)(basePtr + curMemBufferOffset); + intptr_t iptr = (intptr_t)(basePtr + curMemBufferOffset); iptr = (iptr + (alignment-1)) & ~(alignment-1); - int newOffset = int(iptr + size - (int64_t)basePtr); + int newOffset = int(iptr - (intptr_t)basePtr + size); if (newOffset < memBufferSize[curMemBuffer]) { curMemBufferOffset = newOffset; return (char *)iptr; @@ -249,14 +331,6 @@ TaskGroupBase::AllocMemory(int64_t size, int32_t alignment) { /////////////////////////////////////////////////////////////////////////// // Atomics and the like -#ifndef ISPC_IS_WINDOWS -static inline void -lMemFence() { - __asm__ __volatile__("mfence":::"memory"); -} -#endif // !ISPC_IS_WINDOWS - - #if (__SIZEOF_POINTER__ == 4) || defined(__i386__) || defined(_WIN32) #define ISPC_POINTER_BYTES 4 #elif (__SIZEOF_POINTER__ == 8) || defined(__x86_64__) || defined(__amd64__) || defined(_WIN64) @@ -266,6 +340,15 @@ lMemFence() { #endif // __SIZEOF_POINTER__ +static inline void +lMemFence() { + // Windows atomic functions already contain the fence + // KNC doesn't need the memory barrier +#if !defined ISPC_IS_KNC || !defined ISPC_IS_WINDOWS + __asm__ __volatile__("mfence":::"memory"); +#endif +} + static void * lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) { #ifdef ISPC_IS_WINDOWS @@ -288,11 +371,11 @@ lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) { #endif // ISPC_IS_WINDOWS } - - -#ifndef ISPC_IS_WINDOWS static int32_t lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) { +#ifdef ISPC_IS_WINDOWS + return InterlockedCompareExchange(v, newValue, oldValue); +#else int32_t result; __asm__ __volatile__("lock\ncmpxchgl %2,%1" : "=a"(result), "=m"(*v) @@ -300,9 +383,22 @@ lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) : "memory"); lMemFence(); return result; +#endif // ISPC_IS_WINDOWS } -#endif // !ISPC_IS_WINDOWS +static inline int32_t +lAtomicAdd(volatile int32_t *v, int32_t delta) { +#ifdef ISPC_IS_WINDOWS + return InterlockedAdd(v, delta); +#else + int32_t origValue; + __asm__ __volatile__("lock\n" + "xaddl %0,%1" + : "=r"(origValue), "=m"(*v) : "0"(delta) + : "memory"); + return origValue; +#endif +} /////////////////////////////////////////////////////////////////////////// @@ -366,6 +462,50 @@ private: #endif // ISPC_USE_PTHREADS +#ifdef ISPC_USE_CILK + +class TaskGroup : public TaskGroupBase { +public: + void Launch(int baseIndex, int count); + void Sync(); + +}; + +#endif // ISPC_USE_CILK + +#ifdef ISPC_USE_OMP + +class TaskGroup : public TaskGroupBase { +public: + void Launch(int baseIndex, int count); + void Sync(); + +}; + +#endif // ISPC_USE_OMP + +#ifdef ISPC_USE_TBB_PARALLEL_FOR + +class TaskGroup : public TaskGroupBase { +public: + void Launch(int baseIndex, int count); + void Sync(); + +}; + +#endif // ISPC_USE_TBB_PARALLEL_FOR + +#ifdef ISPC_USE_TBB_TASK_GROUP + +class TaskGroup : public TaskGroupBase { +public: + void Launch(int baseIndex, int count); + void Sync(); +private: + tbb::task_group tbbTaskGroup; +}; + +#endif // ISPC_USE_TBB_TASK_GROUP /////////////////////////////////////////////////////////////////////////// // Grand Central Dispatch @@ -487,18 +627,6 @@ static pthread_mutex_t taskSysMutex; static std::vector activeTaskGroups; static sem_t *workerSemaphore; - -static inline int32_t -lAtomicAdd(int32_t *v, int32_t delta) { - int32_t origValue; - __asm__ __volatile__("lock\n" - "xaddl %0,%1" - : "=r"(origValue), "=m"(*v) : "0"(delta) - : "memory"); - return origValue; -} - - static void * lTaskEntry(void *arg) { int threadIndex = (int)((int64_t)arg); @@ -724,11 +852,15 @@ TaskGroup::Sync() { exit(1); } // FIXME: We basically end up busy-waiting here, which is - // extra wasteful in a world with hyperthreading. It would + // extra wasteful in a world with hyper-threading. It would // be much better to put this thread to sleep on a // condition variable that was signaled when the last task // in this group was finished. - sleep(0); +#ifndef ISPC_IS_KNC + usleep(1); +#else + _mm_delay_32(8); +#endif continue; } @@ -772,6 +904,124 @@ TaskGroup::Sync() { #endif // ISPC_USE_PTHREADS /////////////////////////////////////////////////////////////////////////// +// Cilk Plus + +#ifdef ISPC_USE_CILK + +static void +InitTaskSystem() { + // No initialization needed +} + +inline void +TaskGroup::Launch(int baseIndex, int count) { + cilk_for(int i = 0; i < count; i++) { + TaskInfo *ti = GetTaskInfo(baseIndex + i); + + // Actually run the task. + // Cilk does not expose the task -> thread mapping so we pretend it's 1:1 + ti->func(ti->data, ti->taskIndex, ti->taskCount, ti->taskIndex, ti->taskCount); + } +} + +inline void +TaskGroup::Sync() { +} + +#endif // ISPC_USE_CILK + +/////////////////////////////////////////////////////////////////////////// +// OpenMP + +#ifdef ISPC_USE_OMP + +static void +InitTaskSystem() { + // No initialization needed +} + +inline void +TaskGroup::Launch(int baseIndex, int count) { +#pragma omp parallel for + for(int i = 0; i < count; i++) { + TaskInfo *ti = GetTaskInfo(baseIndex + i); + + // Actually run the task. + int threadIndex = omp_get_thread_num(); + int threadCount = omp_get_num_threads(); + ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount); + } +} + +inline void +TaskGroup::Sync() { +} + +#endif // ISPC_USE_OMP + +/////////////////////////////////////////////////////////////////////////// +// Thread Building Blocks + +#ifdef ISPC_USE_TBB_PARALLEL_FOR + +static void +InitTaskSystem() { + // No initialization needed by default + //tbb::task_scheduler_init(); +} + +inline void +TaskGroup::Launch(int baseIndex, int count) { + tbb::parallel_for(0, count, [=](int i) { + TaskInfo *ti = GetTaskInfo(baseIndex + i); + + // Actually run the task. + // TBB does not expose the task -> thread mapping so we pretend it's 1:1 + int threadIndex = ti->taskIndex; + int threadCount = ti->taskCount; + + ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount); + }); +} + +inline void +TaskGroup::Sync() { +} + +#endif // ISPC_USE_TBB_PARALLEL_FOR + +#ifdef ISPC_USE_TBB_TASK_GROUP + +static void +InitTaskSystem() { + // No initialization needed by default + //tbb::task_scheduler_init(); +} + +inline void +TaskGroup::Launch(int baseIndex, int count) { + for (int i = 0; i < count; i++) { + tbbTaskGroup.run([=]() { + TaskInfo *ti = GetTaskInfo(baseIndex + i); + + // TBB does not expose the task -> thread mapping so we pretend it's 1:1 + int threadIndex = ti->taskIndex; + int threadCount = ti->taskCount; + ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount); + }); + } +} + +inline void +TaskGroup::Sync() { + tbbTaskGroup.wait(); +} + +#endif // ISPC_USE_TBB_TASK_GROUP + +/////////////////////////////////////////////////////////////////////////// + +#ifndef ISPC_USE_PTHREADS_FULLY_SUBSCRIBED #define MAX_FREE_TASK_GROUPS 64 static TaskGroup *freeTaskGroups[MAX_FREE_TASK_GROUPS]; @@ -810,13 +1060,6 @@ FreeTaskGroup(TaskGroup *tg) { /////////////////////////////////////////////////////////////////////////// -// ispc expects these functions to have C linkage / not be mangled -extern "C" { - void ISPCLaunch(void **handlePtr, void *f, void *data, int count); - void *ISPCAlloc(void **handlePtr, int64_t size, int32_t alignment); - void ISPCSync(void *handle); -} - void ISPCLaunch(void **taskGroupPtr, void *func, void *data, int count) { TaskGroup *taskGroup; @@ -863,3 +1106,250 @@ ISPCAlloc(void **taskGroupPtr, int64_t size, int32_t alignment) { return taskGroup->AllocMemory(size, alignment); } + +#else // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED + +#define MAX_LIVE_TASKS 1024 + +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +// Small structure used to hold the data for each task +struct Task { +public: + TaskFuncType func; + void *data; + volatile int32_t taskIndex; + int taskCount; + + volatile int numDone; + int liveIndex; // index in live task queue + + inline int noMoreWork() { return taskIndex >= taskCount; } + /*! given thread is done working on this task --> decrease num locks */ + // inline void lock() { lAtomicAdd(&locks,1); } + // inline void unlock() { lAtomicAdd(&locks,-1); } + inline int nextJob() { return lAtomicAdd(&taskIndex,1); } + inline int numJobs() { return taskCount; } + inline void schedule(int idx) { taskIndex = 0; numDone = 0; liveIndex = idx; } + inline void run(int idx, int threadIdx); + inline void markOneDone() { lAtomicAdd(&numDone,1); } + inline void wait() + { + while (!noMoreWork()) { + int next = nextJob(); + if (next < numJobs()) run(next, 0); + } + while (numDone != taskCount) { +#ifndef ISPC_IS_KNC + usleep(1); +#else + _mm_delay_32(8); +#endif + } + } +}; + +/////////////////////////////////////////////////////////////////////////// +class TaskSys { + static int numThreadsRunning; + struct LiveTask + { + volatile int locks; /*!< num locks on this task. gets + initialized to NUM_THREADS+1, then counted + down by every thread that sees this. this + value is only valid when 'active' is set + to true */ + volatile int active; /*! workers will spin on this until it + becomes active */ + Task *task; + + inline void doneWithThis() { lAtomicAdd(&locks,-1); } + LiveTask() : active(0), locks(-1) {} + }; + +public: + volatile int nextScheduleIndex; /*! next index in the task queue + where we'll insert a live task */ + + // inline int inc_begin() { int old = begin; begin = (begin+1)%MAX_TASKS; return old; } + // inline int inc_end() { int old = end; end = (end+1)%MAX_TASKS; return old; } + + LiveTask taskQueue[MAX_LIVE_TASKS]; + std::stack taskMem; + + static TaskSys *global; + + TaskSys() : nextScheduleIndex(0) + { + TaskSys::global = this; + Task *mem = new Task[MAX_LIVE_TASKS]; //< could actually be more than _live_ tasks + for (int i=0;ischedule(liveIndex); + taskQueue[liveIndex].locks = numThreadsRunning+1; // num _worker_ threads plus creator + taskQueue[liveIndex].active = true; + pthread_mutex_unlock(&mutex); + } + + void sync(Task *task) + { + task->wait(); + int liveIndex = task->liveIndex; + while (taskQueue[liveIndex].locks > 1) { +#ifndef ISPC_IS_KNC + usleep(1); +#else + _mm_delay_32(8); +#endif + } + _mm_free(task->data); + pthread_mutex_lock(&mutex); + taskMem.push(task); // recycle task index + taskQueue[liveIndex].active = false; + pthread_mutex_unlock(&mutex); + } +}; + + +void TaskSys::threadFct() +{ + int myIndex = 0; //lAtomicAdd(&threadIdx,1); + while (1) { + while (!taskQueue[myIndex].active) { +#ifndef ISPC_IS_KNC + usleep(4); +#else + _mm_delay_32(32); +#endif + continue; + } + + Task *mine = taskQueue[myIndex].task; + while (!mine->noMoreWork()) { + int job = mine->nextJob(); + if (job >= mine->numJobs()) break; + mine->run(job,myIndex); + } + taskQueue[myIndex].doneWithThis(); + myIndex = (myIndex+1)%MAX_LIVE_TASKS; + } +} + + +inline void Task::run(int idx, int threadIdx) { + (*this->func)(data,threadIdx,TaskSys::global->nThreads,idx,taskCount); + markOneDone(); +} + + +void *_threadFct(void *data) { + ((TaskSys*)data)->threadFct(); + return NULL; +} + + +void TaskSys::createThreads() +{ + init(); + int reserved = 4; + int minid = 2; + nThreads = sysconf(_SC_NPROCESSORS_ONLN) - reserved; + + thread = (pthread_t *)malloc(nThreads * sizeof(pthread_t)); + + numThreadsRunning = 0; + for (int i = 0; i < nThreads; ++i) { + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 2*1024 * 1024); + + int threadID = minid+i; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(threadID,&cpuset); + int ret = pthread_attr_setaffinity_np(&attr,sizeof(cpuset),&cpuset); + + int err = pthread_create(&thread[i], &attr, &_threadFct, this); + ++numThreadsRunning; + if (err != 0) { + fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err)); + exit(1); + } + } +} + +TaskSys * TaskSys::global = NULL; +int TaskSys::numThreadsRunning = 0; + +/////////////////////////////////////////////////////////////////////////// + +void ISPCLaunch(void **taskGroupPtr, void *func, void *data, int count) +{ + Task *ti = *(Task**)taskGroupPtr; + ti->func = (TaskFuncType)func; + ti->data = data; + ti->taskIndex = 0; + ti->taskCount = count; + TaskSys::global->schedule(ti); +} + +void ISPCSync(void *h) +{ + Task *task = (Task *)h; + assert(task); + TaskSys::global->sync(task); +} + +void *ISPCAlloc(void **taskGroupPtr, int64_t size, int32_t alignment) +{ + TaskSys::init(); + Task *task = TaskSys::global->allocOne(); + *taskGroupPtr = task; + task->data = _mm_malloc(size,alignment); + return task->data;//*taskGroupPtr; +} + +#endif // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED diff --git a/ispc.h b/ispc.h index a023cdfc..e376df46 100644 --- a/ispc.h +++ b/ispc.h @@ -51,6 +51,9 @@ #elif defined(__APPLE__) #define ISPC_IS_APPLE #endif +#if defined(__KNC__) +#define ISPC_IS_KNC +#endif #include #include