Added updated task launch implementation that now tracks task groups.

Within each function that launches tasks, we now can easily track which
tasks that function launched, so that the sync at the end of the function
can just sync on the tasks launched by that function (not all tasks
launched by all functions.)

Implementing this led to a rework of the task system API that ispc generates
code to call; the example task systems in examples/tasksys.cpp have been
updated to conform to this API.  (The updated API is also documented in
the ispc user's guide.)

As part of this, "launch[n]" syntax was added to launch a number of tasks
in a single launch statement, rather than requiring a loop over 'n' to
launch n tasks.

This commit thus fixes issue #84 (enhancement to launch multiple tasks from
a single launch statement) as well as issue #105 (recursive task launches
were broken).
This commit is contained in:
Matt Pharr
2011-09-30 11:20:53 -07:00
parent 5ee4d7fce8
commit cb7976bbf6
43 changed files with 1309 additions and 1043 deletions

View File

@@ -1,14 +1,8 @@
ARCH = $(shell uname)
TASK_CXX=../tasks_pthreads.cpp
TASK_CXX=../tasksys.cpp
TASK_LIB=-lpthread
ifeq ($(ARCH), Darwin)
TASK_CXX=../tasks_gcd.cpp
TASK_LIB=
endif
TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o)))
CXX=g++

View File

@@ -323,16 +323,13 @@ export void ao_ispc(uniform int w, uniform int h, uniform int nsubsamples,
}
static void task ao_task(uniform int y0, uniform int y1, uniform int width,
uniform int height, uniform int nsubsamples,
uniform float image[]) {
ao_scanlines(y0, y1, width, height, nsubsamples, image);
static void task ao_task(uniform int width, uniform int height,
uniform int nsubsamples, uniform float image[]) {
ao_scanlines(taskIndex, taskIndex+1, width, height, nsubsamples, image);
}
export void ao_ispc_tasks(uniform int w, uniform int h, uniform int nsubsamples,
uniform float image[]) {
uniform int dy = 1;
for (uniform int y = 0; y < h; y += dy)
launch < ao_task(y, y+dy, w, h, nsubsamples, image) >;
launch[h] < ao_task(w, h, nsubsamples, image) >;
}

2
examples/aobench/aobench.vcxproj Executable file → Normal file
View File

@@ -21,7 +21,7 @@
<ItemGroup>
<ClCompile Include="ao.cpp" />
<ClCompile Include="ao_serial.cpp" />
<ClCompile Include="../tasks_concrt.cpp" />
<ClCompile Include="../tasksys.cpp" />
</ItemGroup>
<ItemGroup>
<CustomBuild Include="ao.ispc">

View File

0
examples/mandelbrot/mandelbrot.vcxproj Executable file → Normal file
View File

View File

@@ -1,14 +1,8 @@
ARCH = $(shell uname)
TASK_CXX=../tasks_pthreads.cpp
TASK_CXX=../tasksys.cpp
TASK_LIB=-lpthread
ifeq ($(ARCH), Darwin)
TASK_CXX=../tasks_gcd.cpp
TASK_LIB=
endif
TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o)))
CXX=g++

View File

@@ -101,7 +101,7 @@ ensureTargetISAIsSupported() {
}
static void usage() {
fprintf(stderr, "usage: mandelbrot [--scale=<factor]\n");
fprintf(stderr, "usage: mandelbrot [--scale=<factor>]\n");
exit(1);
}
@@ -143,6 +143,9 @@ int main(int argc, char *argv[]) {
//
double minISPC = 1e30;
for (int i = 0; i < 3; ++i) {
// Clear out the buffer
for (unsigned int i = 0; i < width * height; ++i)
buf[i] = 0;
reset_and_start_timer();
mandelbrot_ispc(x0, y0, x1, y1, width, height, maxIterations, buf);
double dt = get_elapsed_mcycles();
@@ -152,9 +155,6 @@ int main(int argc, char *argv[]) {
printf("[mandelbrot ispc+tasks]:\t[%.3f] million cycles\n", minISPC);
writePPM(buf, width, height, "mandelbrot-ispc.ppm");
// Clear out the buffer
for (unsigned int i = 0; i < width * height; ++i)
buf[i] = 0;
//
// And run the serial implementation 3 times, again reporting the
@@ -162,6 +162,9 @@ int main(int argc, char *argv[]) {
//
double minSerial = 1e30;
for (int i = 0; i < 3; ++i) {
// Clear out the buffer
for (unsigned int i = 0; i < width * height; ++i)
buf[i] = 0;
reset_and_start_timer();
mandelbrot_serial(x0, y0, x1, y1, width, height, maxIterations, buf);
double dt = get_elapsed_mcycles();

View File

@@ -53,11 +53,14 @@ mandel(float c_re, float c_im, int count) {
[ystart,yend).
*/
task void
mandelbrot_scanlines(uniform int ystart, uniform int yend,
mandelbrot_scanlines(uniform int ybase, uniform int span,
uniform float x0, uniform float dx,
uniform float y0, uniform float dy,
uniform int width, uniform int maxIterations,
reference uniform int output[]) {
uniform int ystart = ybase + taskIndex * span;
uniform int yend = ystart + span;
for (uniform int j = ystart; j < yend; ++j) {
for (uniform int i = 0; i < width; i += programCount) {
float x = x0 + (programIndex + i) * dx;
@@ -70,6 +73,20 @@ mandelbrot_scanlines(uniform int ystart, uniform int yend,
}
task void
mandelbrot_chunk(uniform float x0, uniform float dx,
uniform float y0, uniform float dy,
uniform int width, uniform int height,
uniform int maxIterations, reference uniform int output[]) {
uniform int ystart = taskIndex * (height/taskCount);
uniform int yend = (taskIndex+1) * (height/taskCount);
uniform int span = 1;
launch[(yend-ystart)/span] < mandelbrot_scanlines(ystart, span, x0, dx, y0, dy,
width, maxIterations, output) >;
}
export void
mandelbrot_ispc(uniform float x0, uniform float y0,
uniform float x1, uniform float y1,
@@ -78,9 +95,6 @@ mandelbrot_ispc(uniform float x0, uniform float y0,
uniform float dx = (x1 - x0) / width;
uniform float dy = (y1 - y0) / height;
/* Launch task to compute results for spans of 'span' scanlines. */
uniform int span = 2;
for (uniform int j = 0; j < height; j += span)
launch < mandelbrot_scanlines(j, j+span, x0, dx, y0, dy, width,
maxIterations, output) >;
launch[32] < mandelbrot_chunk(x0, dx, y0, dy, width, height,
maxIterations, output) >;
}

2
examples/mandelbrot_tasks/mandelbrot_tasks.vcxproj Executable file → Normal file
View File

@@ -143,7 +143,7 @@
<ItemGroup>
<ClCompile Include="mandelbrot.cpp" />
<ClCompile Include="mandelbrot_serial.cpp" />
<ClCompile Include="../tasks_concrt.cpp" />
<ClCompile Include="../tasksys.cpp" />
</ItemGroup>
<ItemGroup>
<CustomBuild Include="mandelbrot.ispc">

2
examples/noise/noise.vcxproj Executable file → Normal file
View File

@@ -164,4 +164,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

0
examples/options/options.vcxproj Executable file → Normal file
View File

View File

@@ -1,14 +1,8 @@
ARCH = $(shell uname)
TASK_CXX=../tasks_pthreads.cpp
TASK_CXX=../tasksys.cpp
TASK_LIB=-lpthread
ifeq ($(ARCH), Darwin)
TASK_CXX=../tasks_gcd.cpp
TASK_LIB=
endif
TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o)))
CXX=g++

View File

@@ -283,8 +283,7 @@ export void raytrace_ispc(uniform int width, uniform int height,
}
task void raytrace_tile_task(uniform int x0, uniform int x1,
uniform int y0, uniform int y1,
task void raytrace_tile_task(uniform int y0, uniform int y1,
uniform int width, uniform int height,
uniform int baseWidth, uniform int baseHeight,
const uniform float raster2camera[4][4],
@@ -292,6 +291,12 @@ task void raytrace_tile_task(uniform int x0, uniform int x1,
uniform float image[], uniform int id[],
const LinearBVHNode nodes[],
const Triangle triangles[]) {
uniform int dx = 16; // must match dx below
uniform int xTasks = (width + (dx-1)) / dx;
uniform int x0 = (taskIndex % xTasks) * dx;
uniform int x1 = x0 + dx;
x1 = min(x1, width);
raytrace_tile(x0, x1, y0, y1, width, height, baseWidth, baseHeight,
raster2camera, camera2world, image,
id, nodes, triangles);
@@ -306,13 +311,11 @@ export void raytrace_ispc_tasks(uniform int width, uniform int height,
const LinearBVHNode nodes[],
const Triangle triangles[]) {
uniform int dx = 16, dy = 16;
uniform int nTasks = (width + (dx-1)) / dx;
for (uniform int y = 0; y < height; y += dy) {
uniform int y1 = min(y + dy, height);
for (uniform int x = 0; x < width; x += dx) {
uniform int x1 = min(x + dx, width);
launch < raytrace_tile_task(x, x1, y, y1, width, height, baseWidth,
baseHeight, raster2camera, camera2world,
image, id, nodes, triangles) >;
}
launch[nTasks] < raytrace_tile_task(y, y1, width, height, baseWidth,
baseHeight, raster2camera, camera2world,
image, id, nodes, triangles) >;
}
}

2
examples/rt/rt.vcxproj Executable file → Normal file
View File

@@ -164,7 +164,7 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h
<ItemGroup>
<ClCompile Include="rt.cpp" />
<ClCompile Include="rt_serial.cpp" />
<ClCompile Include="../tasks_concrt.cpp" />
<ClCompile Include="../tasksys.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">

4
examples/simple/simple.vcxproj Executable file → Normal file
View File

@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
@@ -161,4 +161,4 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@@ -1,14 +1,8 @@
ARCH = $(shell uname)
TASK_CXX=../tasks_pthreads.cpp
TASK_CXX=../tasksys.cpp
TASK_LIB=-lpthread
ifeq ($(ARCH), Darwin)
TASK_CXX=../tasks_gcd.cpp
TASK_LIB=
endif
TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o)))
CXX=g++

2
examples/stencil/stencil.vcxproj Executable file → Normal file
View File

@@ -164,7 +164,7 @@ ispc -O2 %(Filename).ispc -o %(Filename).obj -h %(Filename)_ispc.h
<ItemGroup>
<ClCompile Include="stencil.cpp" />
<ClCompile Include="stencil_serial.cpp" />
<ClCompile Include="../tasks_concrt.cpp" />
<ClCompile Include="../tasksys.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">

View File

@@ -1,180 +0,0 @@
/*
Copyright (c) 2011, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef TASKINFO_H
#define TASKINFO_H 1
#ifdef _MSC_VER
#define ISPC_IS_WINDOWS
#elif defined(__linux__)
#define ISPC_IS_LINUX
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
#endif
#ifdef ISPC_IS_WINDOWS
#define NOMINMAX
#include <windows.h>
#include <concrt.h>
using namespace Concurrency;
#endif // ISPC_IS_WINDOWS
#if (__SIZEOF_POINTER__ == 4) || defined(__i386__) || defined(_WIN32)
#define ISPC_POINTER_BYTES 4
#elif (__SIZEOF_POINTER__ == 8) || defined(__x86_64__) || defined(__amd64__) || defined(_WIN64)
#define ISPC_POINTER_BYTES 8
#else
#error "Pointer size unknown!"
#endif // __SIZEOF_POINTER__
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
typedef struct TaskInfo {
void *func;
void *data;
#if defined(ISPC_IS_WINDOWS)
event taskEvent;
#endif
} TaskInfo;
#ifndef ISPC_IS_WINDOWS
static int32_t
lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) {
int32_t result;
__asm__ __volatile__("lock\ncmpxchgl %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
__asm__ __volatile__("mfence":::"memory");
return result;
}
#endif // !ISPC_IS_WINDOWS
static void *
lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) {
#ifdef ISPC_IS_WINDOWS
return InterlockedCompareExchangePointer(v, newValue, oldValue);
#else
void *result;
#if (ISPC_POINTER_BYTES == 4)
__asm__ __volatile__("lock\ncmpxchgd %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
#else
__asm__ __volatile__("lock\ncmpxchgq %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
#endif // ISPC_POINTER_BYTES
__asm__ __volatile__("mfence":::"memory");
return result;
#endif // ISPC_IS_WINDOWS
}
#ifndef ISPC_IS_WINDOWS
static int32_t
lAtomicAdd32(volatile int32_t *v, int32_t delta) {
// Do atomic add with gcc x86 inline assembly
int32_t origValue;
__asm__ __volatile__("lock\n"
"xaddl %0,%1"
: "=r"(origValue), "=m"(*v) : "0"(delta)
: "memory");
return origValue;
}
#endif
#define LOG_TASK_QUEUE_CHUNK_SIZE 13
#define MAX_TASK_QUEUE_CHUNKS 1024
#define TASK_QUEUE_CHUNK_SIZE (1<<LOG_TASK_QUEUE_CHUNK_SIZE)
#define MAX_LAUNCHED_TASKS (MAX_TASK_QUEUE_CHUNKS * TASK_QUEUE_CHUNK_SIZE)
typedef void (*TaskFuncType)(void *, int, int);
#ifdef ISPC_IS_WINDOWS
static volatile LONG nextTaskInfoCoordinate;
#else
static volatile int nextTaskInfoCoordinate;
#endif
static TaskInfo *taskInfo[MAX_TASK_QUEUE_CHUNKS];
static inline void
lInitTaskInfo() {
taskInfo[0] = new TaskInfo[TASK_QUEUE_CHUNK_SIZE];
}
static inline TaskInfo *
lGetTaskInfo() {
#ifdef ISPC_IS_WINDOWS
int myCoord = InterlockedExchangeAdd(&nextTaskInfoCoordinate, 1);
#else
int myCoord = lAtomicAdd32(&nextTaskInfoCoordinate, 1);
#endif
int index = (myCoord >> LOG_TASK_QUEUE_CHUNK_SIZE);
int offset = myCoord & (TASK_QUEUE_CHUNK_SIZE-1);
if (index == MAX_TASK_QUEUE_CHUNKS) {
fprintf(stderr, "A total of %d tasks have been launched--the simple "
"built-in task system can handle no more. Exiting.", myCoord);
exit(1);
}
if (taskInfo[index] == NULL) {
TaskInfo *newChunk = new TaskInfo[TASK_QUEUE_CHUNK_SIZE];
if (lAtomicCompareAndSwapPointer((void **)&taskInfo[index], newChunk,
NULL) != NULL) {
// failure--someone else got it, but that's cool
assert(taskInfo[index] != NULL);
free(newChunk);
}
}
return &taskInfo[index][offset];
}
static inline void
lResetTaskInfo() {
nextTaskInfoCoordinate = 0;
}
#endif // TASKINFO_H

View File

@@ -1,104 +0,0 @@
/*
Copyright (c) 2011, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "taskinfo.h"
/* Simple task system implementation for ispc based on Microsoft's
Concurrency Runtime. */
#include <windows.h>
#include <concrt.h>
using namespace Concurrency;
#include <stdint.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void *f, void *data);
void ISPCSync();
void *ISPCMalloc(int64_t size, int32_t alignment);
void ISPCFree(void *ptr);
}
void __cdecl
lRunTask(LPVOID param) {
TaskInfo *ti = (TaskInfo *)param;
// Actually run the task.
// FIXME: like the GCD implementation for OS X, this is passing bogus
// values for the threadIndex and threadCount builtins, which in turn
// will cause bugs in code that uses those.
int threadIndex = 0;
int threadCount = 1;
TaskFuncType func = (TaskFuncType)ti->func;
func(ti->data, threadIndex, threadCount);
// Signal the event that this task is done
ti->taskEvent.set();
}
void
ISPCLaunch(void *func, void *data) {
TaskInfo *ti = lGetTaskInfo();
ti->func = (TaskFuncType)func;
ti->data = data;
ti->taskEvent.reset();
CurrentScheduler::ScheduleTask(lRunTask, ti);
}
void ISPCSync() {
for (int i = 0; i < nextTaskInfoCoordinate; ++i) {
int index = (i >> LOG_TASK_QUEUE_CHUNK_SIZE);
int offset = i & (TASK_QUEUE_CHUNK_SIZE-1);
taskInfo[index][offset].taskEvent.wait();
taskInfo[index][offset].taskEvent.reset();
}
lResetTaskInfo();
}
void *ISPCMalloc(int64_t size, int32_t alignment) {
return _aligned_malloc(size, alignment);
}
void ISPCFree(void *ptr) {
_aligned_free(ptr);
}

View File

@@ -1,126 +0,0 @@
/*
Copyright (c) 2011, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "taskinfo.h"
#if defined(_WIN32) || defined(_WIN64)
#define ISPC_IS_WINDOWS
#elif defined(__linux__)
#define ISPC_IS_LINUX
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
#endif
/* A simple task system for ispc programs based on Apple's Grand Central
Dispatch. */
#include <dispatch/dispatch.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
static int initialized = 0;
static volatile int32_t lock = 0;
static dispatch_queue_t gcdQueue;
static dispatch_group_t gcdGroup;
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void *f, void *data);
void ISPCSync();
void *ISPCMalloc(int64_t size, int32_t alignment);
void ISPCFree(void *ptr);
}
static void
lRunTask(void *ti) {
TaskInfo *taskInfo = (TaskInfo *)ti;
// FIXME: these are bogus values; may cause bugs in code that depends
// on them having unique values in different threads.
int threadIndex = 0;
int threadCount = 1;
TaskFuncType func = (TaskFuncType)(taskInfo->func);
// Actually run the task
func(taskInfo->data, threadIndex, threadCount);
}
void ISPCLaunch(void *func, void *data) {
if (!initialized) {
while (1) {
if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) {
if (!initialized) {
gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
gcdGroup = dispatch_group_create();
lInitTaskInfo();
__asm__ __volatile__("mfence":::"memory");
initialized = 1;
}
lock = 0;
break;
}
}
}
TaskInfo *ti = lGetTaskInfo();
ti->func = func;
ti->data = data;
dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask);
}
void ISPCSync() {
if (!initialized)
return;
// Wait for all of the tasks in the group to complete before returning
dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER);
lResetTaskInfo();
}
void *ISPCMalloc(int64_t size, int32_t alignment) {
void *mem = malloc(size + (alignment-1) + sizeof(void*));
char *amem = ((char*)mem) + sizeof(void*);
amem = amem + uint32_t(alignment - (reinterpret_cast<uint64_t>(amem) &
(alignment - 1)));
((void**)amem)[-1] = mem;
return amem;
}
void ISPCFree(void *ptr) {
free(((void**)ptr)[-1]);
}

View File

@@ -1,339 +0,0 @@
/*
Copyright (c) 2011, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#if defined(_WIN32) || defined(_WIN64)
#define ISPC_IS_WINDOWS
#elif defined(__linux__)
#define ISPC_IS_LINUX
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
#endif
#include "taskinfo.h"
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <stdint.h>
#include <stdlib.h>
#include <errno.h>
#ifdef ISPC_IS_LINUX
#include <malloc.h>
#endif
static int initialized = 0;
static volatile int32_t lock = 0;
static int nThreads;
static pthread_t *threads;
static pthread_mutex_t taskQueueMutex;
static int nextTaskToRun;
static sem_t *workerSemaphore;
static uint32_t numUnfinishedTasks;
static pthread_mutex_t tasksRunningConditionMutex;
static pthread_cond_t tasksRunningCondition;
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void *f, void *data);
void ISPCSync();
void *ISPCMalloc(int64_t size, int32_t alignment);
void ISPCFree(void *ptr);
}
static void *lTaskEntry(void *arg);
/** Figure out how many CPU cores there are in the system
*/
static int
lNumCPUCores() {
return sysconf(_SC_NPROCESSORS_ONLN);
}
static void
lTasksInit() {
nThreads = lNumCPUCores();
threads = (pthread_t *)malloc(nThreads * sizeof(pthread_t));
int err;
if ((err = pthread_mutex_init(&taskQueueMutex, NULL)) != 0) {
fprintf(stderr, "Error creating mutex: %s\n", strerror(err));
exit(1);
}
char name[32];
sprintf(name, "ispc_task.%d", (int)getpid());
workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0);
if (!workerSemaphore) {
fprintf(stderr, "Error creating semaphore: %s\n", strerror(err));
exit(1);
}
if ((err = pthread_cond_init(&tasksRunningCondition, NULL)) != 0) {
fprintf(stderr, "Error creating condition variable: %s\n", strerror(err));
exit(1);
}
if ((err = pthread_mutex_init(&tasksRunningConditionMutex, NULL)) != 0) {
fprintf(stderr, "Error creating mutex: %s\n", strerror(err));
exit(1);
}
for (int i = 0; i < nThreads; ++i) {
err = pthread_create(&threads[i], NULL, &lTaskEntry, (void *)(i));
if (err != 0) {
fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err));
exit(1);
}
}
}
void
ISPCLaunch(void *f, void *d) {
int err;
if (!initialized) {
while (1) {
if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) {
if (!initialized) {
lTasksInit();
__asm__ __volatile__("mfence":::"memory");
initialized = 1;
}
lock = 0;
break;
}
}
}
//
// Acquire mutex, add task
//
if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
// Need a mutex here to ensure we get this filled in before a worker
// grabs it and starts running...
TaskInfo *ti = lGetTaskInfo();
ti->func = f;
ti->data = d;
if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
//
// Update count of number of tasks left to run
//
if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
// FIXME: is this redundant with nextTaskInfoCoordinate?
++numUnfinishedTasks;
if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
//
// Post to the worker semaphore to wake up worker threads that are
// sleeping waiting for tasks to show up
//
if ((err = sem_post(workerSemaphore)) != 0) {
fprintf(stderr, "Error from sem_post: %s\n", strerror(err));
exit(1);
}
}
static void *
lTaskEntry(void *arg) {
int threadIndex = (int)((int64_t)arg);
int threadCount = nThreads;
TaskFuncType func;
while (1) {
int err;
if ((err = sem_wait(workerSemaphore)) != 0) {
fprintf(stderr, "Error from sem_wait: %s\n", strerror(err));
exit(1);
}
//
// Acquire mutex, get task
//
if ((err = pthread_mutex_lock(&taskQueueMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
if (nextTaskToRun == nextTaskInfoCoordinate) {
//
// Task queue is empty, go back and wait on the semaphore
//
if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
continue;
}
int runCoord = nextTaskToRun++;
int index = (runCoord >> LOG_TASK_QUEUE_CHUNK_SIZE);
int offset = runCoord & (TASK_QUEUE_CHUNK_SIZE-1);
TaskInfo *myTask = &taskInfo[index][offset];
if ((err = pthread_mutex_unlock(&taskQueueMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
//
// Do work for _myTask_
//
func = (TaskFuncType)myTask->func;
func(myTask->data, threadIndex, threadCount);
//
// Decrement the number of unfinished tasks counter
//
if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
// FIXME: can this be a comparison of (nextTaskToRun == nextTaskInfoCoordinate)?
// (I don't think so--think there is a race...)
int unfinished = --numUnfinishedTasks;
if (unfinished == 0) {
//
// Signal the "no more tasks are running" condition if all of
// them are done.
//
int err;
if ((err = pthread_cond_signal(&tasksRunningCondition)) != 0) {
fprintf(stderr, "Error from pthread_cond_signal: %s\n", strerror(err));
exit(1);
}
}
if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
}
pthread_exit(NULL);
return 0;
}
void ISPCSync() {
int err;
if ((err = pthread_mutex_lock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
// As long as there are tasks running, wait on the condition variable;
// doing so causes this thread to go to sleep until someone signals on
// the tasksRunningCondition condition variable.
while (numUnfinishedTasks > 0) {
if ((err = pthread_cond_wait(&tasksRunningCondition,
&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_cond_wait: %s\n", strerror(err));
exit(1);
}
}
lResetTaskInfo();
nextTaskToRun = 0;
// We acquire ownership of the condition variable mutex when the above
// pthread_cond_wait returns.
// FIXME: is there a lurking issue here if numUnfinishedTasks gets back
// to zero by the time we get to ISPCSync() and thence we're trying to
// unlock a mutex we don't have a lock on?
if ((err = pthread_mutex_unlock(&tasksRunningConditionMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
}
void *ISPCMalloc(int64_t size, int32_t alignment) {
#ifdef ISPC_IS_WINDOWS
return _aligned_malloc(size, alignment);
#endif
#ifdef ISPC_IS_LINUX
return memalign(alignment, size);
#endif
#ifdef ISPC_IS_APPLE
void *mem = malloc(size + (alignment-1) + sizeof(void*));
char *amem = ((char*)mem) + sizeof(void*);
amem = amem + uint32_t(alignment - (reinterpret_cast<uint64_t>(amem) &
(alignment - 1)));
((void**)amem)[-1] = mem;
return amem;
#endif
}
void ISPCFree(void *ptr) {
#ifdef ISPC_IS_WINDOWS
_aligned_free(ptr);
#endif
#ifdef ISPC_IS_LINUX
free(ptr);
#endif
#ifdef ISPC_IS_APPLE
free(((void**)ptr)[-1]);
#endif
}

868
examples/tasksys.cpp Normal file
View File

@@ -0,0 +1,868 @@
/*
Copyright (c) 2011, Intel Corporation
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of Intel Corporation nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
This file implements simple task systems that provide the three
entrypoints used by ispc-generated to code to handle 'launch' and 'sync'
statements in ispc programs. See the section "Task Parallelism: Language
Syntax" in the ispc documentation for information about using task
parallelism in ispc programs, and see the section "Task Parallelism:
Runtime Requirements" for information about the task-related entrypoints
that are implemented here.
There are three task systems in this file: one built using Microsoft's
Concurrency Runtime, one built with Apple's Grand Central Dispatch, and
one built on top of bare pthreads.
*/
#if defined(_WIN32) || defined(_WIN64)
#define ISPC_IS_WINDOWS
#define ISPC_USE_CONCRT
#elif defined(__linux__)
#define ISPC_IS_LINUX
#define ISPC_USE_PTHREADS
#elif defined(__APPLE__)
#define ISPC_IS_APPLE
// pthreads is noticably more efficient than GCD on OSX
#define ISPC_USE_PTHREADS
//#define ISPC_USE_GCD
#endif
#define DBG(x)
#ifdef ISPC_IS_WINDOWS
#define NOMINMAX
#include <windows.h>
#endif // ISPC_IS_WINDOWS
#ifdef ISPC_USE_CONCRT
#include <concrt.h>
using namespace Concurrency;
#endif // ISPC_USE_CONCRT
#ifdef ISPC_USE_GCD
#include <dispatch/dispatch.h>
#include <pthread.h>
#endif // ISPC_USE_GCD
#ifdef ISPC_USE_PTHREADS
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <vector>
#include <algorithm>
#endif // ISPC_USE_PTHREADS
#ifdef ISPC_IS_LINUX
#include <malloc.h>
#endif // ISPC_IS_LINUX
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <algorithm>
// Signature of ispc-generated 'task' functions
typedef void (*TaskFuncType)(void *data, int threadIndex, int threadCount,
int taskIndex, int taskCount);
// Small structure used to hold the data for each task
struct TaskInfo {
TaskFuncType func;
void *data;
int taskIndex, taskCount;
#if defined(ISPC_IS_WINDOWS)
event taskEvent;
#endif
};
///////////////////////////////////////////////////////////////////////////
// TaskGroupBase
#define LOG_TASK_QUEUE_CHUNK_SIZE 12
#define MAX_TASK_QUEUE_CHUNKS 8
#define TASK_QUEUE_CHUNK_SIZE (1<<LOG_TASK_QUEUE_CHUNK_SIZE)
#define MAX_LAUNCHED_TASKS (MAX_TASK_QUEUE_CHUNKS * TASK_QUEUE_CHUNK_SIZE)
#define NUM_MEM_BUFFERS 16
class TaskGroup;
/** The TaskGroupBase structure provides common functionality for "task
groups"; a task group is the set of tasks launched from within a single
ispc function. When the function is ready to return, it waits for all
of the tasks in its task group to finish before it actually returns.
*/
class TaskGroupBase {
public:
void Reset();
int AllocTaskInfo(int count);
TaskInfo *GetTaskInfo(int index);
void *AllocMemory(int64_t size, int32_t alignment);
protected:
TaskGroupBase();
~TaskGroupBase();
int nextTaskInfoIndex;
private:
/* We allocate blocks of TASK_QUEUE_CHUNK_SIZE TaskInfo structures as
needed by the calling function. We hold up to MAX_TASK_QUEUE_CHUNKS
of these (and then exit at runtime if more than this many tasks are
launched.)
*/
TaskInfo *taskInfo[MAX_TASK_QUEUE_CHUNKS];
/* We also allocate chunks of memory to service ISPCAlloc() calls. The
memBuffers[] array holds pointers to this memory. The first element
of this array is initialized to point to mem and then any subsequent
elements required are initialized with dynamic allocation.
*/
int curMemBuffer, curMemBufferOffset;
int memBufferSize[NUM_MEM_BUFFERS];
char *memBuffers[NUM_MEM_BUFFERS];
char mem[256];
};
inline TaskGroupBase::TaskGroupBase() {
nextTaskInfoIndex = 0;
curMemBuffer = 0;
curMemBufferOffset = 0;
memBuffers[0] = mem;
memBufferSize[0] = sizeof(mem) / sizeof(mem[0]);
for (int i = 1; i < NUM_MEM_BUFFERS; ++i) {
memBuffers[i] = NULL;
memBufferSize[i] = 0;
}
for (int i = 0; i < MAX_TASK_QUEUE_CHUNKS; ++i)
taskInfo[i] = NULL;
}
inline TaskGroupBase::~TaskGroupBase() {
// Note: don't delete memBuffers[0], since it points to the start of
// the "mem" member!
for (int i = 1; i < NUM_MEM_BUFFERS; ++i)
delete[] memBuffers[i];
}
inline void
TaskGroupBase::Reset() {
nextTaskInfoIndex = 0;
curMemBuffer = 0;
curMemBufferOffset = 0;
}
inline int
TaskGroupBase::AllocTaskInfo(int count) {
int ret = nextTaskInfoIndex;
nextTaskInfoIndex += count;
return ret;
}
inline TaskInfo *
TaskGroupBase::GetTaskInfo(int index) {
int chunk = (index >> LOG_TASK_QUEUE_CHUNK_SIZE);
int offset = index & (TASK_QUEUE_CHUNK_SIZE-1);
if (chunk == MAX_TASK_QUEUE_CHUNKS) {
fprintf(stderr, "A total of %d tasks have been launched from the "
"current function--the simple built-in task system can handle "
"no more. You can increase the values of TASK_QUEUE_CHUNK_SIZE "
"and LOG_TASK_QUEUE_CHUNK_SIZE to work around this limitation. "
"Sorry! Exiting.\n", index);
exit(1);
}
if (taskInfo[chunk] == NULL)
taskInfo[chunk] = new TaskInfo[TASK_QUEUE_CHUNK_SIZE];
return &taskInfo[chunk][offset];
}
inline void *
TaskGroupBase::AllocMemory(int64_t size, int32_t alignment) {
char *basePtr = memBuffers[curMemBuffer];
int64_t iptr = (int64_t)(basePtr + curMemBufferOffset);
iptr = (iptr + (alignment-1)) & ~(alignment-1);
int newOffset = int(iptr + size - (int64_t)basePtr);
if (newOffset < memBufferSize[curMemBuffer]) {
curMemBufferOffset = newOffset;
return (char *)iptr;
}
++curMemBuffer;
curMemBufferOffset = 0;
assert(curMemBuffer < NUM_MEM_BUFFERS);
int allocSize = 1 << (12 + curMemBuffer);
allocSize = std::max(int(size+alignment), allocSize);
char *newBuf = new char[allocSize];
memBufferSize[curMemBuffer] = allocSize;
memBuffers[curMemBuffer] = newBuf;
return AllocMemory(size, alignment);
}
///////////////////////////////////////////////////////////////////////////
// Atomics and the like
#ifndef ISPC_IS_WINDOWS
static inline void
lMemFence() {
__asm__ __volatile__("mfence":::"memory");
}
#endif // !ISPC_IS_WINDOWS
#if (__SIZEOF_POINTER__ == 4) || defined(__i386__) || defined(_WIN32)
#define ISPC_POINTER_BYTES 4
#elif (__SIZEOF_POINTER__ == 8) || defined(__x86_64__) || defined(__amd64__) || defined(_WIN64)
#define ISPC_POINTER_BYTES 8
#else
#error "Pointer size unknown!"
#endif // __SIZEOF_POINTER__
static void *
lAtomicCompareAndSwapPointer(void **v, void *newValue, void *oldValue) {
#ifdef ISPC_IS_WINDOWS
return InterlockedCompareExchangePointer(v, newValue, oldValue);
#else
void *result;
#if (ISPC_POINTER_BYTES == 4)
__asm__ __volatile__("lock\ncmpxchgd %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
#else
__asm__ __volatile__("lock\ncmpxchgq %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
#endif // ISPC_POINTER_BYTES
lMemFence();
return result;
#endif // ISPC_IS_WINDOWS
}
#ifndef ISPC_IS_WINDOWS
static int32_t
lAtomicCompareAndSwap32(volatile int32_t *v, int32_t newValue, int32_t oldValue) {
int32_t result;
__asm__ __volatile__("lock\ncmpxchgl %2,%1"
: "=a"(result), "=m"(*v)
: "q"(newValue), "0"(oldValue)
: "memory");
lMemFence();
return result;
}
#endif // !ISPC_IS_WINDOWS
///////////////////////////////////////////////////////////////////////////
#ifdef ISPC_USE_CONCRT
// With ConcRT, we don't need to extend TaskGroupBase at all.
class TaskGroup : public TaskGroupBase {
public:
void Launch(int baseIndex, int count);
void Sync();
};
#endif // ISPC_USE_CONCRT
#ifdef ISPC_USE_GCD
/* With Grand Central Dispatch, we associate a GCD dispatch group with each
task group. (We'll later wait on this dispatch group when we need to
wait on all of the tasks in the group to finish.)
*/
class TaskGroup : public TaskGroupBase {
public:
TaskGroup() {
gcdGroup = dispatch_group_create();
}
void Launch(int baseIndex, int count);
void Sync();
private:
dispatch_group_t gcdGroup;
};
#endif // ISPC_USE_GCD
#ifdef ISPC_USE_PTHREADS
static void *lTaskEntry(void *arg);
class TaskGroup : public TaskGroupBase {
public:
TaskGroup() {
numUnfinishedTasks = 0;
waitingTasks.reserve(128);
inActiveList = false;
}
void Reset() {
TaskGroupBase::Reset();
numUnfinishedTasks = 0;
assert(inActiveList == false);
lMemFence();
}
void Launch(int baseIndex, int count);
void Sync();
private:
friend void *lTaskEntry(void *arg);
int32_t numUnfinishedTasks;
int32_t pad[3];
std::vector<int> waitingTasks;
bool inActiveList;
};
#endif // ISPC_USE_PTHREADS
///////////////////////////////////////////////////////////////////////////
// Grand Central Dispatch
#ifdef ISPC_USE_GCD
/* A simple task system for ispc programs based on Apple's Grand Central
Dispatch. */
static dispatch_queue_t gcdQueue;
static volatile int32_t lock = 0;
static void
InitTaskSystem() {
if (gcdQueue != NULL)
return;
while (1) {
if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) {
if (gcdQueue == NULL) {
gcdQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
assert(gcdQueue != NULL);
lMemFence();
}
lock = 0;
break;
}
}
}
static void
lRunTask(void *ti) {
TaskInfo *taskInfo = (TaskInfo *)ti;
// FIXME: these are bogus values; may cause bugs in code that depends
// on them having unique values in different threads.
int threadIndex = 0;
int threadCount = 1;
// Actually run the task
taskInfo->func(taskInfo->data, threadIndex, threadCount,
taskInfo->taskIndex, taskInfo->taskCount);
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
for (int i = 0; i < count; ++i) {
TaskInfo *ti = GetTaskInfo(baseIndex + i);
dispatch_group_async_f(gcdGroup, gcdQueue, ti, lRunTask);
}
}
inline void
TaskGroup::Sync() {
dispatch_group_wait(gcdGroup, DISPATCH_TIME_FOREVER);
}
#endif // ISPC_USE_GCD
///////////////////////////////////////////////////////////////////////////
// Concurrency Runtime
#ifdef ISPC_USE_CONCRT
static void
InitTaskSystem() {
// No initialization needed
}
static void __cdecl
lRunTask(LPVOID param) {
TaskInfo *ti = (TaskInfo *)param;
// Actually run the task.
// FIXME: like the GCD implementation for OS X, this is passing bogus
// values for the threadIndex and threadCount builtins, which in turn
// will cause bugs in code that uses those.
int threadIndex = 0;
int threadCount = 1;
ti->func(ti->data, threadIndex, threadCount, ti->taskIndex, ti->taskCount);
// Signal the event that this task is done
ti->taskEvent.set();
}
inline void
TaskGroup::Launch(int baseIndex, int count) {
for (int i = 0; i < count; ++i)
CurrentScheduler::ScheduleTask(lRunTask, GetTaskInfo(baseIndex + i));
}
inline void
TaskGroup::Sync() {
for (int i = 0; i < nextTaskInfoIndex; ++i) {
TaskInfo *ti = GetTaskInfo(i);
ti->taskEvent.wait();
ti->taskEvent.reset();
}
}
#endif // ISPC_USE_CONCRT
///////////////////////////////////////////////////////////////////////////
// pthreads
#ifdef ISPC_USE_PTHREADS
static volatile int32_t lock = 0;
static int nThreads;
static pthread_t *threads = NULL;
static pthread_mutex_t taskSysMutex;
static std::vector<TaskGroup *> activeTaskGroups;
static sem_t *workerSemaphore;
static inline int32_t
lAtomicAdd(int32_t *v, int32_t delta) {
int32_t origValue;
__asm__ __volatile__("lock\n"
"xaddl %0,%1"
: "=r"(origValue), "=m"(*v) : "0"(delta)
: "memory");
return origValue;
}
static void *
lTaskEntry(void *arg) {
int threadIndex = (int)((int64_t)arg);
int threadCount = nThreads;
while (1) {
int err;
//
// Wait on the semaphore until we're woken up due to the arrival of
// more work.
//
if ((err = sem_wait(workerSemaphore)) != 0) {
fprintf(stderr, "Error from sem_wait: %s\n", strerror(err));
exit(1);
}
//
// Acquire the mutex
//
if ((err = pthread_mutex_lock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
if (activeTaskGroups.size() == 0) {
//
// Task queue is empty, go back and wait on the semaphore
//
if ((err = pthread_mutex_unlock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
continue;
}
//
// Get the last task group on the active list and the last task
// from its waiting tasks list.
//
TaskGroup *tg = activeTaskGroups.back();
assert(tg->waitingTasks.size() > 0);
int taskNumber = tg->waitingTasks.back();
tg->waitingTasks.pop_back();
if (tg->waitingTasks.size() == 0) {
// We just took the last task from this task group, so remove
// it from the active list.
activeTaskGroups.pop_back();
tg->inActiveList = false;
}
if ((err = pthread_mutex_unlock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
//
// And now actually run the task
//
DBG(fprintf(stderr, "running task %d from group %p\n", taskNumber, tg));
TaskInfo *myTask = tg->GetTaskInfo(taskNumber);
myTask->func(myTask->data, threadIndex, threadCount, myTask->taskIndex,
myTask->taskCount);
//
// Decrement the "number of unfinished tasks" counter in the task
// group.
//
lMemFence();
lAtomicAdd(&tg->numUnfinishedTasks, -1);
}
pthread_exit(NULL);
return 0;
}
static void
InitTaskSystem() {
if (threads == NULL) {
while (1) {
if (lAtomicCompareAndSwap32(&lock, 1, 0) == 0) {
if (threads == NULL) {
// We launch one fewer thread than there are cores,
// since the main thread here will also grab jobs from
// the task queue itself.
nThreads = sysconf(_SC_NPROCESSORS_ONLN) - 1;
int err;
if ((err = pthread_mutex_init(&taskSysMutex, NULL)) != 0) {
fprintf(stderr, "Error creating mutex: %s\n", strerror(err));
exit(1);
}
char name[32];
sprintf(name, "ispc_task.%d", (int)getpid());
workerSemaphore = sem_open(name, O_CREAT, S_IRUSR|S_IWUSR, 0);
if (!workerSemaphore) {
fprintf(stderr, "Error creating semaphore: %s\n", strerror(err));
exit(1);
}
threads = (pthread_t *)malloc(nThreads * sizeof(pthread_t));
for (int i = 0; i < nThreads; ++i) {
err = pthread_create(&threads[i], NULL, &lTaskEntry, (void *)(i));
if (err != 0) {
fprintf(stderr, "Error creating pthread %d: %s\n", i, strerror(err));
exit(1);
}
}
activeTaskGroups.reserve(64);
}
// Make sure all of the above goes to memory before we
// clear the lock.
lMemFence();
lock = 0;
break;
}
}
}
}
inline void
TaskGroup::Launch(int baseCoord, int count) {
//
// Acquire mutex, add task
//
int err;
if ((err = pthread_mutex_lock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
// Add the corresponding set of tasks to the waiting-to-be-run list for
// this task group.
//
// FIXME: it's a little ugly to hold a global mutex for this when we
// only need to make sure no one else is accessing this task group's
// waitingTasks list. (But a small experiment in switching to a
// per-TaskGroup mutex showed worse performance!)
for (int i = 0; i < count; ++i)
waitingTasks.push_back(baseCoord + i);
// Add the task group to the global active list if it isn't there
// already.
if (inActiveList == false) {
activeTaskGroups.push_back(this);
inActiveList = true;
}
if ((err = pthread_mutex_unlock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
//
// Update the count of the number of tasks left to run in this task
// group.
//
lMemFence();
lAtomicAdd(&numUnfinishedTasks, count);
//
// Post to the worker semaphore to wake up worker threads that are
// sleeping waiting for tasks to show up
//
for (int i = 0; i < count; ++i)
if ((err = sem_post(workerSemaphore)) != 0) {
fprintf(stderr, "Error from sem_post: %s\n", strerror(err));
exit(1);
}
}
inline void
TaskGroup::Sync() {
DBG(fprintf(stderr, "syncing %p - %d unfinished\n", tg, numUnfinishedTasks));
while (numUnfinishedTasks > 0) {
// All of the tasks in this group aren't finished yet. We'll try
// to help out here since we don't have anything else to do...
DBG(fprintf(stderr, "while syncing %p - %d unfinished\n", tg,
numUnfinishedTasks));
//
// Acquire the global task system mutex to grab a task to work on
//
int err;
if ((err = pthread_mutex_lock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_lock: %s\n", strerror(err));
exit(1);
}
TaskInfo *myTask = NULL;
TaskGroup *runtg = this;
if (waitingTasks.size() > 0) {
int taskNumber = waitingTasks.back();
waitingTasks.pop_back();
if (waitingTasks.size() == 0) {
// There's nothing left to start running from this group,
// so remove it from the active task list.
activeTaskGroups.erase(std::find(activeTaskGroups.begin(),
activeTaskGroups.end(), this));
inActiveList = false;
}
myTask = GetTaskInfo(taskNumber);
DBG(fprintf(stderr, "running task %d from group %p in sync\n", taskNumber, tg));
}
else {
// Other threads are already working on all of the tasks in
// this group, so we can't help out by running one ourself.
// We'll try to run one from another group to make ourselves
// useful here.
if (activeTaskGroups.size() == 0) {
// No active task groups left--there's nothing for us to do.
if ((err = pthread_mutex_unlock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
// FIXME: We basically end up busy-waiting here, which is
// extra wasteful in a world with hyperthreading. It would
// be much better to put this thread to sleep on a
// condition variable that was signaled when the last task
// in this group was finished.
sleep(0);
continue;
}
// Get a task to run from another task group.
runtg = activeTaskGroups.back();
assert(runtg->waitingTasks.size() > 0);
int taskNumber = runtg->waitingTasks.back();
runtg->waitingTasks.pop_back();
if (runtg->waitingTasks.size() == 0) {
// There's left to start running from this group, so remove
// it from the active task list.
activeTaskGroups.pop_back();
runtg->inActiveList = false;
}
myTask = runtg->GetTaskInfo(taskNumber);
DBG(fprintf(stderr, "running task %d from other group %p in sync\n",
taskNumber, runtg));
}
if ((err = pthread_mutex_unlock(&taskSysMutex)) != 0) {
fprintf(stderr, "Error from pthread_mutex_unlock: %s\n", strerror(err));
exit(1);
}
//
// Do work for _myTask_
//
// FIXME: bogus values for thread index/thread count here as well..
myTask->func(myTask->data, 0, 1, myTask->taskIndex, myTask->taskCount);
//
// Decrement the number of unfinished tasks counter
//
lMemFence();
lAtomicAdd(&runtg->numUnfinishedTasks, -1);
}
DBG(fprintf(stderr, "sync for %p done!n", tg));
}
#endif // ISPC_USE_PTHREADS
///////////////////////////////////////////////////////////////////////////
#define MAX_FREE_TASK_GROUPS 64
static TaskGroup *freeTaskGroups[MAX_FREE_TASK_GROUPS];
static inline TaskGroup *
AllocTaskGroup() {
for (int i = 0; i < MAX_FREE_TASK_GROUPS; ++i) {
TaskGroup *tg = freeTaskGroups[i];
if (tg != NULL) {
void *ptr = lAtomicCompareAndSwapPointer((void **)(&freeTaskGroups[i]), NULL, tg);
if (ptr != NULL) {
assert(ptr == tg);
return (TaskGroup *)ptr;
}
}
}
return new TaskGroup;
}
static inline void
FreeTaskGroup(TaskGroup *tg) {
tg->Reset();
for (int i = 0; i < MAX_FREE_TASK_GROUPS; ++i) {
if (freeTaskGroups[i] == NULL) {
void *ptr = lAtomicCompareAndSwapPointer((void **)&freeTaskGroups[i], tg, NULL);
if (ptr == NULL)
return;
}
}
delete tg;
}
///////////////////////////////////////////////////////////////////////////
// ispc expects these functions to have C linkage / not be mangled
extern "C" {
void ISPCLaunch(void **handlePtr, void *f, void *data, int count);
void *ISPCAlloc(void **handlePtr, int64_t size, int32_t alignment);
void ISPCSync(void *handle);
}
void
ISPCLaunch(void **taskGroupPtr, void *func, void *data, int count) {
TaskGroup *taskGroup;
if (*taskGroupPtr == NULL) {
InitTaskSystem();
taskGroup = AllocTaskGroup();
*taskGroupPtr = taskGroup;
}
else
taskGroup = (TaskGroup *)(*taskGroupPtr);
int baseIndex = taskGroup->AllocTaskInfo(count);
for (int i = 0; i < count; ++i) {
TaskInfo *ti = taskGroup->GetTaskInfo(baseIndex+i);
ti->func = (TaskFuncType)func;
ti->data = data;
ti->taskIndex = i;
ti->taskCount = count;
}
taskGroup->Launch(baseIndex, count);
}
void
ISPCSync(void *h) {
TaskGroup *taskGroup = (TaskGroup *)h;
if (taskGroup != NULL) {
taskGroup->Sync();
FreeTaskGroup(taskGroup);
}
}
void *
ISPCAlloc(void **taskGroupPtr, int64_t size, int32_t alignment) {
TaskGroup *taskGroup;
if (*taskGroupPtr == NULL) {
InitTaskSystem();
taskGroup = AllocTaskGroup();
*taskGroupPtr = taskGroup;
}
else
taskGroup = (TaskGroup *)(*taskGroupPtr);
return taskGroup->AllocMemory(size, alignment);
}

View File

@@ -1,14 +1,8 @@
ARCH = $(shell uname)
TASK_CXX=../tasks_pthreads.cpp
TASK_CXX=../tasksys.cpp
TASK_LIB=-lpthread
ifeq ($(ARCH), Darwin)
TASK_CXX=../tasks_gcd.cpp
TASK_LIB=
endif
TASK_OBJ=$(addprefix objs/, $(subst ../,, $(TASK_CXX:.cpp=.o)))
CXX=g++

View File

@@ -343,11 +343,20 @@ volume_tile(uniform int x0, uniform int y0, uniform int x1,
task void
volume_task(uniform int x0, uniform int y0, uniform int x1,
uniform int y1, uniform float density[], uniform int nVoxels[3],
volume_task(uniform float density[], uniform int nVoxels[3],
const uniform float raster2camera[4][4],
const uniform float camera2world[4][4],
uniform int width, uniform int height, uniform float image[]) {
uniform int dx = 8, dy = 8; // must match value in volume_ispc_tasks
uniform int xbuckets = (width + (dx-1)) / dx;
uniform int ybuckets = (height + (dy-1)) / dy;
uniform int x0 = (taskIndex % xbuckets) * dx;
uniform int y0 = (taskIndex / ybuckets) * dy;
uniform int x1 = x0 + dx, y1 = y0 + dy;
x1 = min(x1, width);
y1 = min(y1, height);
volume_tile(x0, y0, x1, y1, density, nVoxels, raster2camera,
camera2world, width, height, image);
}
@@ -370,9 +379,7 @@ volume_ispc_tasks(uniform float density[], uniform int nVoxels[3],
uniform int width, uniform int height, uniform float image[]) {
// Launch tasks to work on (dx,dy)-sized tiles of the image
uniform int dx = 8, dy = 8;
for (uniform int y = 0; y < height; y += dy)
for (uniform int x = 0; x < width; x += dx)
launch < volume_task(x, y, x+dx, y+dy, density, nVoxels,
raster2camera, camera2world, width, height,
image) >;
uniform int nTasks = ((width+(dx-1))/dx) * ((height+(dy-1))/dy);
launch[nTasks] < volume_task(density, nVoxels, raster2camera, camera2world,
width, height, image) >;
}

2
examples/volume_rendering/volume.vcxproj Executable file → Normal file
View File

@@ -143,7 +143,7 @@
<ItemGroup>
<ClCompile Include="volume.cpp" />
<ClCompile Include="volume_serial.cpp" />
<ClCompile Include="../tasks_concrt.cpp" />
<ClCompile Include="../tasksys.cpp" />
</ItemGroup>
<ItemGroup>
<CustomBuild Include="volume.ispc">