comparison src/fftw-3.3.3/threads/threads.c @ 10:37bf6b4a2645

Add FFTW3
author Chris Cannam
date Wed, 20 Mar 2013 15:35:50 +0000
parents
children
comparison
equal deleted inserted replaced
9:c0fb53affa76 10:37bf6b4a2645
1 /*
2 * Copyright (c) 2003, 2007-11 Matteo Frigo
3 * Copyright (c) 2003, 2007-11 Massachusetts Institute of Technology
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 */
20
21 /* threads.c: Portable thread spawning for loops, via the X(spawn_loop)
22 function. The first portion of this file is a set of macros to
23 spawn and join threads on various systems. */
24
25 #include "threads.h"
26
27 #if defined(USING_POSIX_THREADS)
28
29 #include <pthread.h>
30
31 #ifdef HAVE_UNISTD_H
32 # include <unistd.h>
33 #endif
34
35 /* imlementation of semaphores and mutexes: */
36 #if (defined(_POSIX_SEMAPHORES) && (_POSIX_SEMAPHORES >= 200112L))
37
38 /* If optional POSIX semaphores are supported, use them to
39 implement both semaphores and mutexes. */
40 # include <semaphore.h>
41 # include <errno.h>
42
43 typedef sem_t os_sem_t;
44
45 static void os_sem_init(os_sem_t *s) { sem_init(s, 0, 0); }
46 static void os_sem_destroy(os_sem_t *s) { sem_destroy(s); }
47
48 static void os_sem_down(os_sem_t *s)
49 {
50 int err;
51 do {
52 err = sem_wait(s);
53 } while (err == -1 && errno == EINTR);
54 CK(err == 0);
55 }
56
57 static void os_sem_up(os_sem_t *s) { sem_post(s); }
58
59 /*
60 The reason why we use sem_t to implement mutexes is that I have
61 seen mysterious hangs with glibc-2.7 and linux-2.6.22 when using
62 pthread_mutex_t, but no hangs with sem_t or with linux >=
63 2.6.24. For lack of better information, sem_t looks like the
64 safest choice.
65 */
66 typedef sem_t os_mutex_t;
67 static void os_mutex_init(os_mutex_t *s) { sem_init(s, 0, 1); }
68 #define os_mutex_destroy os_sem_destroy
69 #define os_mutex_lock os_sem_down
70 #define os_mutex_unlock os_sem_up
71
72 #else
73
74 /* If optional POSIX semaphores are not defined, use pthread
75 mutexes for mutexes, and simulate semaphores with condition
76 variables */
77 typedef pthread_mutex_t os_mutex_t;
78
79 static void os_mutex_init(os_mutex_t *s)
80 {
81 pthread_mutex_init(s, (pthread_mutexattr_t *)0);
82 }
83
84 static void os_mutex_destroy(os_mutex_t *s) { pthread_mutex_destroy(s); }
85 static void os_mutex_lock(os_mutex_t *s) { pthread_mutex_lock(s); }
86 static void os_mutex_unlock(os_mutex_t *s) { pthread_mutex_unlock(s); }
87
88 typedef struct {
89 pthread_mutex_t m;
90 pthread_cond_t c;
91 volatile int x;
92 } os_sem_t;
93
94 static void os_sem_init(os_sem_t *s)
95 {
96 pthread_mutex_init(&s->m, (pthread_mutexattr_t *)0);
97 pthread_cond_init(&s->c, (pthread_condattr_t *)0);
98
99 /* wrap initialization in lock to exploit the release
100 semantics of pthread_mutex_unlock() */
101 pthread_mutex_lock(&s->m);
102 s->x = 0;
103 pthread_mutex_unlock(&s->m);
104 }
105
106 static void os_sem_destroy(os_sem_t *s)
107 {
108 pthread_mutex_destroy(&s->m);
109 pthread_cond_destroy(&s->c);
110 }
111
112 static void os_sem_down(os_sem_t *s)
113 {
114 pthread_mutex_lock(&s->m);
115 while (s->x <= 0)
116 pthread_cond_wait(&s->c, &s->m);
117 --s->x;
118 pthread_mutex_unlock(&s->m);
119 }
120
121 static void os_sem_up(os_sem_t *s)
122 {
123 pthread_mutex_lock(&s->m);
124 ++s->x;
125 pthread_cond_signal(&s->c);
126 pthread_mutex_unlock(&s->m);
127 }
128
129 #endif
130
131 #define FFTW_WORKER void *
132
133 static void os_create_thread(FFTW_WORKER (*worker)(void *arg),
134 void *arg)
135 {
136 pthread_attr_t attr;
137 pthread_t tid;
138
139 pthread_attr_init(&attr);
140 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
141 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
142
143 pthread_create(&tid, &attr, worker, (void *)arg);
144 pthread_attr_destroy(&attr);
145 }
146
147 static void os_destroy_thread(void)
148 {
149 pthread_exit((void *)0);
150 }
151
152 #elif defined(__WIN32__) || defined(_WIN32) || defined(_WINDOWS)
153 /* hack: windef.h defines INT for its own purposes and this causes
154 a conflict with our own INT in ifftw.h. Divert the windows
155 definition into another name unlikely to cause a conflict */
156 #define INT magnus_ab_INTegro_seclorum_nascitur_ordo
157 #include <windows.h>
158 #include <process.h>
159 #undef INT
160
161 typedef HANDLE os_mutex_t;
162
163 static void os_mutex_init(os_mutex_t *s)
164 {
165 *s = CreateMutex(NULL, FALSE, NULL);
166 }
167
168 static void os_mutex_destroy(os_mutex_t *s)
169 {
170 CloseHandle(*s);
171 }
172
173 static void os_mutex_lock(os_mutex_t *s)
174 {
175 WaitForSingleObject(*s, INFINITE);
176 }
177
178 static void os_mutex_unlock(os_mutex_t *s)
179 {
180 ReleaseMutex(*s);
181 }
182
183 typedef HANDLE os_sem_t;
184
185 static void os_sem_init(os_sem_t *s)
186 {
187 *s = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL);
188 }
189
190 static void os_sem_destroy(os_sem_t *s)
191 {
192 CloseHandle(*s);
193 }
194
195 static void os_sem_down(os_sem_t *s)
196 {
197 WaitForSingleObject(*s, INFINITE);
198 }
199
200 static void os_sem_up(os_sem_t *s)
201 {
202 ReleaseSemaphore(*s, 1, NULL);
203 }
204
205 #define FFTW_WORKER unsigned __stdcall
206 typedef unsigned (__stdcall *winthread_start) (void *);
207
208 static void os_create_thread(winthread_start worker,
209 void *arg)
210 {
211 _beginthreadex((void *)NULL, /* security attrib */
212 0, /* stack size */
213 worker, /* start address */
214 arg, /* parameters */
215 0, /* creation flags */
216 (unsigned *)NULL); /* tid */
217 }
218
219 static void os_destroy_thread(void)
220 {
221 _endthreadex(0);
222 }
223
224
225 #else
226 #error "No threading layer defined"
227 #endif
228
229 /************************************************************************/
230
231 /* Main code: */
232 struct worker {
233 os_sem_t ready;
234 os_sem_t done;
235 struct work *w;
236 struct worker *cdr;
237 };
238
239 static struct worker *make_worker(void)
240 {
241 struct worker *q = (struct worker *)MALLOC(sizeof(*q), OTHER);
242 os_sem_init(&q->ready);
243 os_sem_init(&q->done);
244 return q;
245 }
246
247 static void unmake_worker(struct worker *q)
248 {
249 os_sem_destroy(&q->done);
250 os_sem_destroy(&q->ready);
251 X(ifree)(q);
252 }
253
254 struct work {
255 spawn_function proc;
256 spawn_data d;
257 struct worker *q; /* the worker responsible for performing this work */
258 };
259
260 static os_mutex_t queue_lock;
261 static os_sem_t termination_semaphore;
262
263 static struct worker *worker_queue;
264 #define WITH_QUEUE_LOCK(what) \
265 { \
266 os_mutex_lock(&queue_lock); \
267 what; \
268 os_mutex_unlock(&queue_lock); \
269 }
270
271 static FFTW_WORKER worker(void *arg)
272 {
273 struct worker *ego = (struct worker *)arg;
274 struct work *w;
275
276 for (;;) {
277 /* wait until work becomes available */
278 os_sem_down(&ego->ready);
279
280 w = ego->w;
281
282 /* !w->proc ==> terminate worker */
283 if (!w->proc) break;
284
285 /* do the work */
286 w->proc(&w->d);
287
288 /* signal that work is done */
289 os_sem_up(&ego->done);
290 }
291
292 /* termination protocol */
293 os_sem_up(&termination_semaphore);
294
295 os_destroy_thread();
296 /* UNREACHABLE */
297 return 0;
298 }
299
300 static void enqueue(struct worker *q)
301 {
302 WITH_QUEUE_LOCK({
303 q->cdr = worker_queue;
304 worker_queue = q;
305 });
306 }
307
308 static struct worker *dequeue(void)
309 {
310 struct worker *q;
311
312 WITH_QUEUE_LOCK({
313 q = worker_queue;
314 if (q)
315 worker_queue = q->cdr;
316 });
317
318 if (!q) {
319 /* no worker is available. Create one */
320 q = make_worker();
321 os_create_thread(worker, q);
322 }
323
324 return q;
325 }
326
327
328 static void kill_workforce(void)
329 {
330 struct work w;
331
332 w.proc = 0;
333
334 THREAD_ON; /* needed for debugging mode: since make_worker
335 is called from dequeue which is only called in
336 thread_on mode, we need to unmake_worker in thread_on. */
337 WITH_QUEUE_LOCK({
338 /* tell all workers that they must terminate.
339
340 Because workers enqueue themselves before signaling the
341 completion of the work, all workers belong to the worker queue
342 if we get here. Also, all workers are waiting at
343 os_sem_down(ready), so we can hold the queue lock without
344 deadlocking */
345 while (worker_queue) {
346 struct worker *q = worker_queue;
347 worker_queue = q->cdr;
348 q->w = &w;
349 os_sem_up(&q->ready);
350 os_sem_down(&termination_semaphore);
351 unmake_worker(q);
352 }
353 });
354 THREAD_OFF;
355 }
356
357 int X(ithreads_init)(void)
358 {
359 os_mutex_init(&queue_lock);
360 os_sem_init(&termination_semaphore);
361
362 WITH_QUEUE_LOCK({
363 worker_queue = 0;
364 })
365
366 return 0; /* no error */
367 }
368
369 /* Distribute a loop from 0 to loopmax-1 over nthreads threads.
370 proc(d) is called to execute a block of iterations from d->min
371 to d->max-1. d->thr_num indicate the number of the thread
372 that is executing proc (from 0 to nthreads-1), and d->data is
373 the same as the data parameter passed to X(spawn_loop).
374
375 This function returns only after all the threads have completed. */
376 void X(spawn_loop)(int loopmax, int nthr, spawn_function proc, void *data)
377 {
378 int block_size;
379 struct work *r;
380 int i;
381
382 A(loopmax >= 0);
383 A(nthr > 0);
384 A(proc);
385
386 if (!loopmax) return;
387
388 /* Choose the block size and number of threads in order to (1)
389 minimize the critical path and (2) use the fewest threads that
390 achieve the same critical path (to minimize overhead).
391 e.g. if loopmax is 5 and nthr is 4, we should use only 3
392 threads with block sizes of 2, 2, and 1. */
393 block_size = (loopmax + nthr - 1) / nthr;
394 nthr = (loopmax + block_size - 1) / block_size;
395
396 THREAD_ON; /* prevent debugging mode from failing under threads */
397 STACK_MALLOC(struct work *, r, sizeof(struct work) * nthr);
398
399 /* distribute work: */
400 for (i = 0; i < nthr; ++i) {
401 struct work *w = &r[i];
402 spawn_data *d = &w->d;
403
404 d->max = (d->min = i * block_size) + block_size;
405 if (d->max > loopmax)
406 d->max = loopmax;
407 d->thr_num = i;
408 d->data = data;
409 w->proc = proc;
410
411 if (i == nthr - 1) {
412 /* do the work ourselves */
413 proc(d);
414 } else {
415 /* assign a worker to W */
416 w->q = dequeue();
417
418 /* tell worker w->q to do it */
419 w->q->w = w; /* Dirac could have written this */
420 os_sem_up(&w->q->ready);
421 }
422 }
423
424 for (i = 0; i < nthr - 1; ++i) {
425 struct work *w = &r[i];
426 os_sem_down(&w->q->done);
427 enqueue(w->q);
428 }
429
430 STACK_FREE(r);
431 THREAD_OFF; /* prevent debugging mode from failing under threads */
432 }
433
434 void X(threads_cleanup)(void)
435 {
436 kill_workforce();
437 os_mutex_destroy(&queue_lock);
438 os_sem_destroy(&termination_semaphore);
439 }