comparison src/fftw-3.3.5/threads/threads.c @ 127:7867fa7e1b6b

Current fftw source
author Chris Cannam <cannam@all-day-breakfast.com>
date Tue, 18 Oct 2016 13:40:26 +0100
parents
children
comparison
equal deleted inserted replaced
126:4a7071416412 127:7867fa7e1b6b
1 /*
2 * Copyright (c) 2003, 2007-14 Matteo Frigo
3 * Copyright (c) 2003, 2007-14 Massachusetts Institute of Technology
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 *
19 */
20
21 /* threads.c: Portable thread spawning for loops, via the X(spawn_loop)
22 function. The first portion of this file is a set of macros to
23 spawn and join threads on various systems. */
24
25 #include "threads.h"
26 #include "api.h"
27
28 #if defined(USING_POSIX_THREADS)
29
30 #include <pthread.h>
31
32 #ifdef HAVE_UNISTD_H
33 # include <unistd.h>
34 #endif
35
36 /* imlementation of semaphores and mutexes: */
37 #if (defined(_POSIX_SEMAPHORES) && (_POSIX_SEMAPHORES >= 200112L))
38
39 /* If optional POSIX semaphores are supported, use them to
40 implement both semaphores and mutexes. */
41 # include <semaphore.h>
42 # include <errno.h>
43
44 typedef sem_t os_sem_t;
45
46 static void os_sem_init(os_sem_t *s) { sem_init(s, 0, 0); }
47 static void os_sem_destroy(os_sem_t *s) { sem_destroy(s); }
48
49 static void os_sem_down(os_sem_t *s)
50 {
51 int err;
52 do {
53 err = sem_wait(s);
54 } while (err == -1 && errno == EINTR);
55 CK(err == 0);
56 }
57
58 static void os_sem_up(os_sem_t *s) { sem_post(s); }
59
60 /*
61 The reason why we use sem_t to implement mutexes is that I have
62 seen mysterious hangs with glibc-2.7 and linux-2.6.22 when using
63 pthread_mutex_t, but no hangs with sem_t or with linux >=
64 2.6.24. For lack of better information, sem_t looks like the
65 safest choice.
66 */
67 typedef sem_t os_mutex_t;
68 static void os_mutex_init(os_mutex_t *s) { sem_init(s, 0, 1); }
69 #define os_mutex_destroy os_sem_destroy
70 #define os_mutex_lock os_sem_down
71 #define os_mutex_unlock os_sem_up
72
73 #else
74
75 /* If optional POSIX semaphores are not defined, use pthread
76 mutexes for mutexes, and simulate semaphores with condition
77 variables */
78 typedef pthread_mutex_t os_mutex_t;
79
80 static void os_mutex_init(os_mutex_t *s)
81 {
82 pthread_mutex_init(s, (pthread_mutexattr_t *)0);
83 }
84
85 static void os_mutex_destroy(os_mutex_t *s) { pthread_mutex_destroy(s); }
86 static void os_mutex_lock(os_mutex_t *s) { pthread_mutex_lock(s); }
87 static void os_mutex_unlock(os_mutex_t *s) { pthread_mutex_unlock(s); }
88
89 typedef struct {
90 pthread_mutex_t m;
91 pthread_cond_t c;
92 volatile int x;
93 } os_sem_t;
94
95 static void os_sem_init(os_sem_t *s)
96 {
97 pthread_mutex_init(&s->m, (pthread_mutexattr_t *)0);
98 pthread_cond_init(&s->c, (pthread_condattr_t *)0);
99
100 /* wrap initialization in lock to exploit the release
101 semantics of pthread_mutex_unlock() */
102 pthread_mutex_lock(&s->m);
103 s->x = 0;
104 pthread_mutex_unlock(&s->m);
105 }
106
107 static void os_sem_destroy(os_sem_t *s)
108 {
109 pthread_mutex_destroy(&s->m);
110 pthread_cond_destroy(&s->c);
111 }
112
113 static void os_sem_down(os_sem_t *s)
114 {
115 pthread_mutex_lock(&s->m);
116 while (s->x <= 0)
117 pthread_cond_wait(&s->c, &s->m);
118 --s->x;
119 pthread_mutex_unlock(&s->m);
120 }
121
122 static void os_sem_up(os_sem_t *s)
123 {
124 pthread_mutex_lock(&s->m);
125 ++s->x;
126 pthread_cond_signal(&s->c);
127 pthread_mutex_unlock(&s->m);
128 }
129
130 #endif
131
132 #define FFTW_WORKER void *
133
134 static void os_create_thread(FFTW_WORKER (*worker)(void *arg),
135 void *arg)
136 {
137 pthread_attr_t attr;
138 pthread_t tid;
139
140 pthread_attr_init(&attr);
141 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
142 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
143
144 pthread_create(&tid, &attr, worker, (void *)arg);
145 pthread_attr_destroy(&attr);
146 }
147
148 static void os_destroy_thread(void)
149 {
150 pthread_exit((void *)0);
151 }
152
153 /* support for static mutexes */
154 typedef pthread_mutex_t os_static_mutex_t;
155 #define OS_STATIC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
156 static void os_static_mutex_lock(os_static_mutex_t *s) { pthread_mutex_lock(s); }
157 static void os_static_mutex_unlock(os_static_mutex_t *s) { pthread_mutex_unlock(s); }
158
159 #elif defined(__WIN32__) || defined(_WIN32) || defined(_WINDOWS)
160 /* hack: windef.h defines INT for its own purposes and this causes
161 a conflict with our own INT in ifftw.h. Divert the windows
162 definition into another name unlikely to cause a conflict */
163 #define INT magnus_ab_INTegro_seclorum_nascitur_ordo
164 #include <windows.h>
165 #include <process.h>
166 #undef INT
167
168 typedef HANDLE os_mutex_t;
169
170 static void os_mutex_init(os_mutex_t *s)
171 {
172 *s = CreateMutex(NULL, FALSE, NULL);
173 }
174
175 static void os_mutex_destroy(os_mutex_t *s)
176 {
177 CloseHandle(*s);
178 }
179
180 static void os_mutex_lock(os_mutex_t *s)
181 {
182 WaitForSingleObject(*s, INFINITE);
183 }
184
185 static void os_mutex_unlock(os_mutex_t *s)
186 {
187 ReleaseMutex(*s);
188 }
189
190 typedef HANDLE os_sem_t;
191
192 static void os_sem_init(os_sem_t *s)
193 {
194 *s = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL);
195 }
196
197 static void os_sem_destroy(os_sem_t *s)
198 {
199 CloseHandle(*s);
200 }
201
202 static void os_sem_down(os_sem_t *s)
203 {
204 WaitForSingleObject(*s, INFINITE);
205 }
206
207 static void os_sem_up(os_sem_t *s)
208 {
209 ReleaseSemaphore(*s, 1, NULL);
210 }
211
212 #define FFTW_WORKER unsigned __stdcall
213 typedef unsigned (__stdcall *winthread_start) (void *);
214
215 static void os_create_thread(winthread_start worker,
216 void *arg)
217 {
218 _beginthreadex((void *)NULL, /* security attrib */
219 0, /* stack size */
220 worker, /* start address */
221 arg, /* parameters */
222 0, /* creation flags */
223 (unsigned *)NULL); /* tid */
224 }
225
226 static void os_destroy_thread(void)
227 {
228 _endthreadex(0);
229 }
230
231 /* windows does not have statically-initialized mutexes---fake a
232 spinlock */
233 typedef volatile LONG os_static_mutex_t;
234 #define OS_STATIC_MUTEX_INITIALIZER 0
235 static void os_static_mutex_lock(os_static_mutex_t *s)
236 {
237 while (InterlockedExchange(s, 1) == 1) {
238 YieldProcessor();
239 Sleep(0);
240 }
241 }
242 static void os_static_mutex_unlock(os_static_mutex_t *s)
243 {
244 LONG old = InterlockedExchange(s, 0);
245 A(old == 1);
246 }
247 #else
248 #error "No threading layer defined"
249 #endif
250
251 /************************************************************************/
252
253 /* Main code: */
254 struct worker {
255 os_sem_t ready;
256 os_sem_t done;
257 struct work *w;
258 struct worker *cdr;
259 };
260
261 static struct worker *make_worker(void)
262 {
263 struct worker *q = (struct worker *)MALLOC(sizeof(*q), OTHER);
264 os_sem_init(&q->ready);
265 os_sem_init(&q->done);
266 return q;
267 }
268
269 static void unmake_worker(struct worker *q)
270 {
271 os_sem_destroy(&q->done);
272 os_sem_destroy(&q->ready);
273 X(ifree)(q);
274 }
275
276 struct work {
277 spawn_function proc;
278 spawn_data d;
279 struct worker *q; /* the worker responsible for performing this work */
280 };
281
282 static os_mutex_t queue_lock;
283 static os_sem_t termination_semaphore;
284
285 static struct worker *worker_queue;
286 #define WITH_QUEUE_LOCK(what) \
287 { \
288 os_mutex_lock(&queue_lock); \
289 what; \
290 os_mutex_unlock(&queue_lock); \
291 }
292
293 static FFTW_WORKER worker(void *arg)
294 {
295 struct worker *ego = (struct worker *)arg;
296 struct work *w;
297
298 for (;;) {
299 /* wait until work becomes available */
300 os_sem_down(&ego->ready);
301
302 w = ego->w;
303
304 /* !w->proc ==> terminate worker */
305 if (!w->proc) break;
306
307 /* do the work */
308 w->proc(&w->d);
309
310 /* signal that work is done */
311 os_sem_up(&ego->done);
312 }
313
314 /* termination protocol */
315 os_sem_up(&termination_semaphore);
316
317 os_destroy_thread();
318 /* UNREACHABLE */
319 return 0;
320 }
321
322 static void enqueue(struct worker *q)
323 {
324 WITH_QUEUE_LOCK({
325 q->cdr = worker_queue;
326 worker_queue = q;
327 });
328 }
329
330 static struct worker *dequeue(void)
331 {
332 struct worker *q;
333
334 WITH_QUEUE_LOCK({
335 q = worker_queue;
336 if (q)
337 worker_queue = q->cdr;
338 });
339
340 if (!q) {
341 /* no worker is available. Create one */
342 q = make_worker();
343 os_create_thread(worker, q);
344 }
345
346 return q;
347 }
348
349
350 static void kill_workforce(void)
351 {
352 struct work w;
353
354 w.proc = 0;
355
356 THREAD_ON; /* needed for debugging mode: since make_worker
357 is called from dequeue which is only called in
358 thread_on mode, we need to unmake_worker in thread_on. */
359 WITH_QUEUE_LOCK({
360 /* tell all workers that they must terminate.
361
362 Because workers enqueue themselves before signaling the
363 completion of the work, all workers belong to the worker queue
364 if we get here. Also, all workers are waiting at
365 os_sem_down(ready), so we can hold the queue lock without
366 deadlocking */
367 while (worker_queue) {
368 struct worker *q = worker_queue;
369 worker_queue = q->cdr;
370 q->w = &w;
371 os_sem_up(&q->ready);
372 os_sem_down(&termination_semaphore);
373 unmake_worker(q);
374 }
375 });
376 THREAD_OFF;
377 }
378
379 static os_static_mutex_t initialization_mutex = OS_STATIC_MUTEX_INITIALIZER;
380
381 int X(ithreads_init)(void)
382 {
383 os_static_mutex_lock(&initialization_mutex); {
384 os_mutex_init(&queue_lock);
385 os_sem_init(&termination_semaphore);
386
387 WITH_QUEUE_LOCK({
388 worker_queue = 0;
389 });
390 } os_static_mutex_unlock(&initialization_mutex);
391
392 return 0; /* no error */
393 }
394
395 /* Distribute a loop from 0 to loopmax-1 over nthreads threads.
396 proc(d) is called to execute a block of iterations from d->min
397 to d->max-1. d->thr_num indicate the number of the thread
398 that is executing proc (from 0 to nthreads-1), and d->data is
399 the same as the data parameter passed to X(spawn_loop).
400
401 This function returns only after all the threads have completed. */
402 void X(spawn_loop)(int loopmax, int nthr, spawn_function proc, void *data)
403 {
404 int block_size;
405 struct work *r;
406 int i;
407
408 A(loopmax >= 0);
409 A(nthr > 0);
410 A(proc);
411
412 if (!loopmax) return;
413
414 /* Choose the block size and number of threads in order to (1)
415 minimize the critical path and (2) use the fewest threads that
416 achieve the same critical path (to minimize overhead).
417 e.g. if loopmax is 5 and nthr is 4, we should use only 3
418 threads with block sizes of 2, 2, and 1. */
419 block_size = (loopmax + nthr - 1) / nthr;
420 nthr = (loopmax + block_size - 1) / block_size;
421
422 THREAD_ON; /* prevent debugging mode from failing under threads */
423 STACK_MALLOC(struct work *, r, sizeof(struct work) * nthr);
424
425 /* distribute work: */
426 for (i = 0; i < nthr; ++i) {
427 struct work *w = &r[i];
428 spawn_data *d = &w->d;
429
430 d->max = (d->min = i * block_size) + block_size;
431 if (d->max > loopmax)
432 d->max = loopmax;
433 d->thr_num = i;
434 d->data = data;
435 w->proc = proc;
436
437 if (i == nthr - 1) {
438 /* do the work ourselves */
439 proc(d);
440 } else {
441 /* assign a worker to W */
442 w->q = dequeue();
443
444 /* tell worker w->q to do it */
445 w->q->w = w; /* Dirac could have written this */
446 os_sem_up(&w->q->ready);
447 }
448 }
449
450 for (i = 0; i < nthr - 1; ++i) {
451 struct work *w = &r[i];
452 os_sem_down(&w->q->done);
453 enqueue(w->q);
454 }
455
456 STACK_FREE(r);
457 THREAD_OFF; /* prevent debugging mode from failing under threads */
458 }
459
460 void X(threads_cleanup)(void)
461 {
462 kill_workforce();
463 os_mutex_destroy(&queue_lock);
464 os_sem_destroy(&termination_semaphore);
465 }
466
467 static os_static_mutex_t install_planner_hooks_mutex = OS_STATIC_MUTEX_INITIALIZER;
468 static os_mutex_t planner_mutex;
469 static int planner_hooks_installed = 0;
470
471 static void lock_planner_mutex(void)
472 {
473 os_mutex_lock(&planner_mutex);
474 }
475
476 static void unlock_planner_mutex(void)
477 {
478 os_mutex_unlock(&planner_mutex);
479 }
480
481 void X(threads_register_planner_hooks)(void)
482 {
483 os_static_mutex_lock(&install_planner_hooks_mutex); {
484 if (!planner_hooks_installed) {
485 os_mutex_init(&planner_mutex);
486 X(set_planner_hooks)(lock_planner_mutex, unlock_planner_mutex);
487 planner_hooks_installed = 1;
488 }
489 } os_static_mutex_unlock(&install_planner_hooks_mutex);
490 }