Merge pull request #369 from jduprat/master

Task system updates
This commit is contained in:
Matt Pharr
2012-08-28 14:01:37 -07:00
2 changed files with 539 additions and 46 deletions

View File

@@ -1,5 +1,5 @@
/*
Copyright (c) 2011, Intel Corporation
Copyright (c) 2011-2012, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
@@ -40,21 +40,68 @@
Runtime Requirements" for information about the task-related entrypoints
that are implemented here.
There are three task systems in this file: one built using Microsoft's
Concurrency Runtime, one built with Apple's Grand Central Dispatch, and
one built on top of bare pthreads.
There are several task systems in this file, built using:
- Microsoft's Concurrency Runtime (ISPC_USE_CONCRT)
- Apple's Grand Central Dispatch (ISPC_USE_GCD)
- bare pthreads (ISPC_USE_PTHREADS, ISPC_USE_PTHREADS_FULLY_SUBSCRIBED)
- Cilk Plus (ISPC_USE_CILK)
- TBB (ISPC_USE_TBB_TASK_GROUP, ISPC_USE_TBB_PARALLEL_FOR)
- OpenMP (ISPC_USE_OMP)
The task system implementation can be selected at compile time, by defining
the appropriate preprocessor symbol on the command line (for e.g.: -D ISPC_USE_TBB).
Not all combinations of platform and task system are meaningful.
If no task system is requested, a reasonable default task system for the platform
is selected. Here are the task systems that can be selected:
#define ISPC_USE_GCD
#define ISPC_USE_CONCRT
#define ISPC_USE_PTHREADS
#define ISPC_USE_PTHREADS_FULLY_SUBSCRIBED
#define ISPC_USE_CILK
#define ISPC_USE_OMP
#define ISPC_USE_TBB_TASK_GROUP
#define ISPC_USE_TBB_PARALLEL_FOR
The ISPC_USE_PTHREADS_FULLY_SUBSCRIBED model essentially takes over the machine
by assigning one pthread to each hyper-thread, and then uses spinlocks and atomics
for task management. This model is useful for KNC where tasks can take over
the machine, but less so when there are other tasks that need running on the machine.
#define ISPC_USE_CREW
*/
#if !(defined ISPC_USE_CONCRT || defined ISPC_USE_GCD || \
defined ISPC_USE_PTHREADS || defined ISPC_USE_PTHREADS_FULLY_SUBSCRIBED || \
defined ISPC_USE_TBB_TASK_GROUP || defined ISPC_USE_TBB_PARALLEL_FOR || \
defined ISPC_USE_OMP || defined ISPC_USE_CILK )
// If no task model chosen from the compiler cmdline, pick a reasonable default
#if defined(_WIN32) || defined(_WIN64)
#define ISPC_USE_CONCRT
#elif defined(__linux__)
#define ISPC_USE_PTHREADS
#elif defined(__APPLE__)
#define ISPC_USE_GCD
#endif
#if defined(__KNC__)
#define ISPC_USE_PTHREADS
#endif
#endif // No task model specified on compiler cmdline
#if defined(_WIN32) || defined(_WIN64)
#define ISPC_IS_WINDOWS
#define ISPC_USE_CONCRT
#define ISPC_IS_WINDOWS
#elif defined(__linux__)
#define ISPC_IS_LINUX
#define ISPC_USE_PTHREADS
#define ISPC_IS_LINUX
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
#define ISPC_USE_GCD
#define ISPC_IS_APPLE
#endif
#if defined(__KNC__)
#define ISPC_IS_KNC
#endif
#define DBG(x)
@@ -83,9 +130,37 @@
#include <vector>
#include <algorithm>
#endif // ISPC_USE_PTHREADS
#ifdef ISPC_USE_PTHREADS_FULLY_SUBSCRIBED
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <vector>
#include <algorithm>
//#include <stdexcept>
#include <stack>
#endif // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED
#ifdef ISPC_USE_TBB_PARALLEL_FOR
#include <tbb/parallel_for.h>
#endif // ISPC_USE_TBB_PARALLEL_FOR
#ifdef ISPC_USE_TBB_TASK_GROUP
#include <tbb/task_group.h>
#endif // ISPC_USE_TBB_TASK_GROUP
#ifdef ISPC_USE_CILK
#include <cilk/cilk.h>
#endif // ISPC_USE_TBB
#ifdef ISPC_USE_OMP
#include <omp.h>
#endif // ISPC_USE_OMP
#ifdef ISPC_IS_LINUX
#include <malloc.h>
#endif // ISPC_IS_LINUX
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
@@ -107,6 +182,13 @@ struct TaskInfo {
#endif
};
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void **handlePtr, void *f, void *data, int count);
void *ISPCAlloc(void **handlePtr, int64_t size, int32_t alignment);
void ISPCSync(void *handle);
}
///////////////////////////////////////////////////////////////////////////
// TaskGroupBase
@@ -181,7 +263,7 @@ inline TaskGroupBase::~TaskGroupBase() {
// Note: don't delete memBuffers[0], since it points to the start of
// the "mem" member!
for (int i = 1; i < NUM_MEM_BUFFERS; ++i)
delete[] memBuffers[i];
delete[](memBuffers[i]);
}
@@ -224,10 +306,10 @@ TaskGroupBase::GetTaskInfo(int index) {
inline void *
TaskGroupBase::AllocMemory(int64_t size, int32_t alignment) {
char *basePtr = memBuffers[curMemBuffer];
int64_t iptr = (int64_t)(basePtr + curMemBufferOffset);
intptr_t iptr = (intptr_t)(basePtr + curMemBufferOffset);
iptr = (iptr + (alignment-1)) & ~(alignment-1);
int newOffset = int(iptr + size - (int64_t)basePtr);
int newOffset = int(iptr - (intptr_t)basePtr + size);
if (newOffset < memBufferSize[curMemBuffer]) {
curMemBufferOffset = newOffset;
return (char *)iptr;
@@ -249,14 +331,6 @@ TaskGroupBase::AllocMemory(int64_t size, int32_t alignment) {
///////////////////////////////////////////////////////////////////////////
// Atomics and the like
#ifndef ISPC_IS_WINDOWS
static inline void
lMemFence() {
__asm__ __volatile__("mfence":::"memory");
}
#endif // !ISPC_IS_WINDOWS
#if (__SIZEOF_POINTER__ == 4) || defined(__i386__) || defined(_WIN32)
#define ISPC_POINTER_BYTES 4
#elif (__SIZEOF_POINTER__ == 8) || defined(__x86_64__) || defined(__amd64__) || defined(_WIN64)
@@ -266,6 +340,15 @@ lMemFence() {
#endif // __SIZEOF_POINTER__
static inline void
lMemFence() {
// Windows atomic functions already contain the fence
// KNC doesn't need the memory barrier
#if !defined ISPC_IS_KNC || !defined ISPC_IS_WINDOWS
__asm__ __volatile__("mfence":::"memory");
#endif
}
static void *
lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) {
#ifdef ISPC_IS_WINDOWS
@@ -288,11 +371,11 @@ lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) {
#endif // ISPC_IS_WINDOWS
}
#ifndef ISPC_IS_WINDOWS
static int32_t
lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) {
#ifdef ISPC_IS_WINDOWS
return InterlockedCompareExchange(v, newValue, oldValue);
#else
int32_t result;
__asm__ __volatile__("lock\ncmpxchgl %2,%1"
: "=a"(result), "=m"(*v)
@@ -300,9 +383,22 @@ lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue)
: "memory");
lMemFence();
return result;
#endif // ISPC_IS_WINDOWS
}
#endif // !ISPC_IS_WINDOWS
static inline int32_t
lAtomicAdd(volatile int32_t *v, int32_t delta) {
#ifdef ISPC_IS_WINDOWS
return InterlockedAdd(v, delta);
#else
int32_t origValue;
__asm__ __volatile__("lock\n"
"xaddl %0,%1"
: "=r"(origValue), "=m"(*v) : "0"(delta)
: "memory");
return origValue;
#endif
}
///////////////////////////////////////////////////////////////////////////
@@ -366,6 +462,50 @@ private:
#endif // ISPC_USE_PTHREADS
#ifdef ISPC_USE_CILK
class TaskGroup : public TaskGroupBase {
public:
void Launch(int baseIndex, int count);
void Sync();
};
#endif // ISPC_USE_CILK
#ifdef ISPC_USE_OMP
class TaskGroup : public TaskGroupBase {
public:
void Launch(int baseIndex, int count);
void Sync();
};
#endif // ISPC_USE_OMP
#ifdef ISPC_USE_TBB_PARALLEL_FOR
class TaskGroup : public TaskGroupBase {
public:
void Launch(int baseIndex, int count);
void Sync();
};
#endif // ISPC_USE_TBB_PARALLEL_FOR
#ifdef ISPC_USE_TBB_TASK_GROUP
class TaskGroup : public TaskGroupBase {
public:
void Launch(int baseIndex, int count);
void Sync();
private:
tbb::task_group tbbTaskGroup;
};
#endif // ISPC_USE_TBB_TASK_GROUP
///////////////////////////////////////////////////////////////////////////
// Grand Central Dispatch
@@ -487,18 +627,6 @@ static pthread_mutex_t taskSysMutex;
static std::vector<TaskGroup *> activeTaskGroups;
static sem_t *workerSemaphore;
static inline int32_t
lAtomicAdd(int32_t *v, int32_t delta) {
int32_t origValue;
__asm__ __volatile__("lock\n"
"xaddl %0,%1"
: "=r"(origValue), "=m"(*v) : "0"(delta)
: "memory");
return origValue;
}
static void *
lTaskEntry(void *arg) {
int threadIndex = (int)((int64_t)arg);
@@ -724,11 +852,15 @@ TaskGroup::Sync() {
exit(1);
}
// FIXME: We basically end up busy-waiting here, which is
// extra wasteful in a world with hyperthreading. It would
// extra wasteful in a world with hyper-threading. It would
// be much better to put this thread to sleep on a
// condition variable that was signaled when the last task
// in this group was finished.
sleep(0);
#ifndef ISPC_IS_KNC
usleep(1);
#else
_mm_delay_32(8);
#endif
continue;
}
@@ -772,6 +904,124 @@ TaskGroup::Sync() {
#endif // ISPC_USE_PTHREADS
///////////////////////////////////////////////////////////////////////////
// Cilk Plus
#ifdef ISPC_USE_CILK
static void
InitTaskSystem() {
// No initialization needed
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
cilk_for(int i = 0; i < count; i++) {
TaskInfo *ti = GetTaskInfo(baseIndex + i);
// Actually run the task.
// Cilk does not expose the task -> thread mapping so we pretend it's 1:1
ti->func(ti->data, ti->taskIndex, ti->taskCount, ti->taskIndex, ti->taskCount);
}
}
inline void
TaskGroup::Sync() {
}
#endif // ISPC_USE_CILK
///////////////////////////////////////////////////////////////////////////
// OpenMP
#ifdef ISPC_USE_OMP
static void
InitTaskSystem() {
// No initialization needed
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
#pragma omp parallel for
for(int i = 0; i < count; i++) {
TaskInfo *ti = GetTaskInfo(baseIndex + i);
// Actually run the task.
int threadIndex = omp_get_thread_num();
int threadCount = omp_get_num_threads();
ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount);
}
}
inline void
TaskGroup::Sync() {
}
#endif // ISPC_USE_OMP
///////////////////////////////////////////////////////////////////////////
// Thread Building Blocks
#ifdef ISPC_USE_TBB_PARALLEL_FOR
static void
InitTaskSystem() {
// No initialization needed by default
//tbb::task_scheduler_init();
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
tbb::parallel_for(0, count, [=](int i) {
TaskInfo *ti = GetTaskInfo(baseIndex + i);
// Actually run the task.
// TBB does not expose the task -> thread mapping so we pretend it's 1:1
int threadIndex = ti->taskIndex;
int threadCount = ti->taskCount;
ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount);
});
}
inline void
TaskGroup::Sync() {
}
#endif // ISPC_USE_TBB_PARALLEL_FOR
#ifdef ISPC_USE_TBB_TASK_GROUP
static void
InitTaskSystem() {
// No initialization needed by default
//tbb::task_scheduler_init();
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
for (int i = 0; i < count; i++) {
tbbTaskGroup.run([=]() {
TaskInfo *ti = GetTaskInfo(baseIndex + i);
// TBB does not expose the task -> thread mapping so we pretend it's 1:1
int threadIndex = ti->taskIndex;
int threadCount = ti->taskCount;
ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount);
});
}
}
inline void
TaskGroup::Sync() {
tbbTaskGroup.wait();
}
#endif // ISPC_USE_TBB_TASK_GROUP
///////////////////////////////////////////////////////////////////////////
#ifndef ISPC_USE_PTHREADS_FULLY_SUBSCRIBED
#define MAX_FREE_TASK_GROUPS 64
static TaskGroup *freeTaskGroups[MAX_FREE_TASK_GROUPS];
@@ -810,13 +1060,6 @@ FreeTaskGroup(TaskGroup *tg) {
///////////////////////////////////////////////////////////////////////////
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void **handlePtr, void *f, void *data, int count);
void *ISPCAlloc(void **handlePtr, int64_t size, int32_t alignment);
void ISPCSync(void *handle);
}
void
ISPCLaunch(void **taskGroupPtr, void *func, void *data, int count) {
TaskGroup *taskGroup;
@@ -863,3 +1106,250 @@ ISPCAlloc(void **taskGroupPtr, int64_t size, int32_t alignment) {
return taskGroup->AllocMemory(size, alignment);
}
#else // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED
#define MAX_LIVE_TASKS 1024
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// Small structure used to hold the data for each task
struct Task {
public:
TaskFuncType func;
void *data;
volatile int32_t taskIndex;
int taskCount;
volatile int numDone;
int liveIndex; // index in live task queue
inline int noMoreWork() { return taskIndex >= taskCount; }
/*! given thread is done working on this task --> decrease num locks */
// inline void lock() { lAtomicAdd(&locks,1); }
// inline void unlock() { lAtomicAdd(&locks,-1); }
inline int nextJob() { return lAtomicAdd(&taskIndex,1); }
inline int numJobs() { return taskCount; }
inline void schedule(int idx) { taskIndex = 0; numDone = 0; liveIndex = idx; }
inline void run(int idx, int threadIdx);
inline void markOneDone() { lAtomicAdd(&numDone,1); }
inline void wait()
{
while (!noMoreWork()) {
int next = nextJob();
if (next < numJobs()) run(next, 0);
}
while (numDone != taskCount) {
#ifndef ISPC_IS_KNC
usleep(1);
#else
_mm_delay_32(8);
#endif
}
}
};
///////////////////////////////////////////////////////////////////////////
class TaskSys {
static int numThreadsRunning;
struct LiveTask
{
volatile int locks; /*!< num locks on this task. gets
initialized to NUM_THREADS+1, then counted
down by every thread that sees this. this
value is only valid when 'active' is set
to true */
volatile int active; /*! workers will spin on this until it
becomes active */
Task *task;
inline void doneWithThis() { lAtomicAdd(&locks,-1); }
LiveTask() : active(0), locks(-1) {}
};
public:
volatile int nextScheduleIndex; /*! next index in the task queue
where we'll insert a live task */
// inline int inc_begin() { int old = begin; begin = (begin+1)%MAX_TASKS; return old; }
// inline int inc_end() { int old = end; end = (end+1)%MAX_TASKS; return old; }
LiveTask taskQueue[MAX_LIVE_TASKS];
std::stack<Task *> taskMem;
static TaskSys *global;
TaskSys() : nextScheduleIndex(0)
{
TaskSys::global = this;
Task *mem = new Task[MAX_LIVE_TASKS]; //< could actually be more than _live_ tasks
for (int i=0;i<MAX_LIVE_TASKS;i++) {
taskMem.push(mem+i);
}
createThreads();
}
inline Task *allocOne()
{
pthread_mutex_lock(&mutex);
if (taskMem.empty()) {
fprintf(stderr, "Too many live tasks. "
"Change the value of MAX_LIVE_TASKS and recompile.\n");
exit(1);
}
Task *task = taskMem.top();
taskMem.pop();
pthread_mutex_unlock(&mutex);
return task;
}
static inline void init()
{
if (global) return;
pthread_mutex_lock(&mutex);
if (global == NULL) global = new TaskSys;
pthread_mutex_unlock(&mutex);
}
void createThreads();
int nThreads;
pthread_t *thread;
void threadFct();
inline void schedule(Task *t)
{
pthread_mutex_lock(&mutex);
int liveIndex = nextScheduleIndex;
nextScheduleIndex = (nextScheduleIndex+1)%MAX_LIVE_TASKS;
if (taskQueue[liveIndex].active) {
fprintf(stderr, "Out of task queue resources. "
"Change the value of MAX_LIVE_TASKS and recompile.\n");
exit(1);
}
taskQueue[liveIndex].task = t;
t->schedule(liveIndex);
taskQueue[liveIndex].locks = numThreadsRunning+1; // num _worker_ threads plus creator
taskQueue[liveIndex].active = true;
pthread_mutex_unlock(&mutex);
}
void sync(Task *task)
{
task->wait();
int liveIndex = task->liveIndex;
while (taskQueue[liveIndex].locks > 1) {
#ifndef ISPC_IS_KNC
usleep(1);
#else
_mm_delay_32(8);
#endif
}
_mm_free(task->data);
pthread_mutex_lock(&mutex);
taskMem.push(task); // recycle task index
taskQueue[liveIndex].active = false;
pthread_mutex_unlock(&mutex);
}
};
void TaskSys::threadFct()
{
int myIndex = 0; //lAtomicAdd(&threadIdx,1);
while (1) {
while (!taskQueue[myIndex].active) {
#ifndef ISPC_IS_KNC
usleep(4);
#else
_mm_delay_32(32);
#endif
continue;
}
Task *mine = taskQueue[myIndex].task;
while (!mine->noMoreWork()) {
int job = mine->nextJob();
if (job >= mine->numJobs()) break;
mine->run(job,myIndex);
}
taskQueue[myIndex].doneWithThis();
myIndex = (myIndex+1)%MAX_LIVE_TASKS;
}
}
inline void Task::run(int idx, int threadIdx) {
(*this->func)(data,threadIdx,TaskSys::global->nThreads,idx,taskCount);
markOneDone();
}
void *_threadFct(void *data) {
((TaskSys*)data)->threadFct();
return NULL;
}
void TaskSys::createThreads()
{
init();
int reserved = 4;
int minid = 2;
nThreads = sysconf(_SC_NPROCESSORS_ONLN) - reserved;
thread = (pthread_t *)malloc(nThreads * sizeof(pthread_t));
numThreadsRunning = 0;
for (int i = 0; i < nThreads; ++i) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 2*1024 * 1024);
int threadID = minid+i;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(threadID,&cpuset);
int ret = pthread_attr_setaffinity_np(&attr,sizeof(cpuset),&cpuset);
int err = pthread_create(&thread[i], &attr, &_threadFct, this);
++numThreadsRunning;
if (err != 0) {
fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err));
exit(1);
}
}
}
TaskSys * TaskSys::global = NULL;
int TaskSys::numThreadsRunning = 0;
///////////////////////////////////////////////////////////////////////////
void ISPCLaunch(void **taskGroupPtr, void *func, void *data, int count)
{
Task *ti = *(Task**)taskGroupPtr;
ti->func = (TaskFuncType)func;
ti->data = data;
ti->taskIndex = 0;
ti->taskCount = count;
TaskSys::global->schedule(ti);
}
void ISPCSync(void *h)
{
Task *task = (Task *)h;
assert(task);
TaskSys::global->sync(task);
}
void *ISPCAlloc(void **taskGroupPtr, int64_t size, int32_t alignment)
{
TaskSys::init();
Task *task = TaskSys::global->allocOne();
*taskGroupPtr = task;
task->data = _mm_malloc(size,alignment);
return task->data;//*taskGroupPtr;
}
#endif // ISPC_USE_PTHREADS_FULLY_SUBSCRIBED

3
ispc.h
View File

@@ -51,6 +51,9 @@
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
#endif
#if defined(__KNC__)
#define ISPC_IS_KNC
#endif
#include <stdint.h>
#include <stdlib.h>