annotate src/fftw-3.3.5/threads/threads.c @ 83:ae30d91d2ffe

Replace these with versions built using an older toolset (so as to avoid ABI compatibilities when linking on Ubuntu 14.04 for packaging purposes)
author Chris Cannam
date Fri, 07 Feb 2020 11:51:13 +0000
parents 2cd0e3b3e1fd
children
rev   line source
Chris@42 1 /*
Chris@42 2 * Copyright (c) 2003, 2007-14 Matteo Frigo
Chris@42 3 * Copyright (c) 2003, 2007-14 Massachusetts Institute of Technology
Chris@42 4 *
Chris@42 5 * This program is free software; you can redistribute it and/or modify
Chris@42 6 * it under the terms of the GNU General Public License as published by
Chris@42 7 * the Free Software Foundation; either version 2 of the License, or
Chris@42 8 * (at your option) any later version.
Chris@42 9 *
Chris@42 10 * This program is distributed in the hope that it will be useful,
Chris@42 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Chris@42 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Chris@42 13 * GNU General Public License for more details.
Chris@42 14 *
Chris@42 15 * You should have received a copy of the GNU General Public License
Chris@42 16 * along with this program; if not, write to the Free Software
Chris@42 17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Chris@42 18 *
Chris@42 19 */
Chris@42 20
Chris@42 21 /* threads.c: Portable thread spawning for loops, via the X(spawn_loop)
Chris@42 22 function. The first portion of this file is a set of macros to
Chris@42 23 spawn and join threads on various systems. */
Chris@42 24
Chris@42 25 #include "threads.h"
Chris@42 26 #include "api.h"
Chris@42 27
Chris@42 28 #if defined(USING_POSIX_THREADS)
Chris@42 29
Chris@42 30 #include <pthread.h>
Chris@42 31
Chris@42 32 #ifdef HAVE_UNISTD_H
Chris@42 33 # include <unistd.h>
Chris@42 34 #endif
Chris@42 35
Chris@42 36 /* imlementation of semaphores and mutexes: */
Chris@42 37 #if (defined(_POSIX_SEMAPHORES) && (_POSIX_SEMAPHORES >= 200112L))
Chris@42 38
Chris@42 39 /* If optional POSIX semaphores are supported, use them to
Chris@42 40 implement both semaphores and mutexes. */
Chris@42 41 # include <semaphore.h>
Chris@42 42 # include <errno.h>
Chris@42 43
Chris@42 44 typedef sem_t os_sem_t;
Chris@42 45
Chris@42 46 static void os_sem_init(os_sem_t *s) { sem_init(s, 0, 0); }
Chris@42 47 static void os_sem_destroy(os_sem_t *s) { sem_destroy(s); }
Chris@42 48
Chris@42 49 static void os_sem_down(os_sem_t *s)
Chris@42 50 {
Chris@42 51 int err;
Chris@42 52 do {
Chris@42 53 err = sem_wait(s);
Chris@42 54 } while (err == -1 && errno == EINTR);
Chris@42 55 CK(err == 0);
Chris@42 56 }
Chris@42 57
Chris@42 58 static void os_sem_up(os_sem_t *s) { sem_post(s); }
Chris@42 59
Chris@42 60 /*
Chris@42 61 The reason why we use sem_t to implement mutexes is that I have
Chris@42 62 seen mysterious hangs with glibc-2.7 and linux-2.6.22 when using
Chris@42 63 pthread_mutex_t, but no hangs with sem_t or with linux >=
Chris@42 64 2.6.24. For lack of better information, sem_t looks like the
Chris@42 65 safest choice.
Chris@42 66 */
Chris@42 67 typedef sem_t os_mutex_t;
Chris@42 68 static void os_mutex_init(os_mutex_t *s) { sem_init(s, 0, 1); }
Chris@42 69 #define os_mutex_destroy os_sem_destroy
Chris@42 70 #define os_mutex_lock os_sem_down
Chris@42 71 #define os_mutex_unlock os_sem_up
Chris@42 72
Chris@42 73 #else
Chris@42 74
Chris@42 75 /* If optional POSIX semaphores are not defined, use pthread
Chris@42 76 mutexes for mutexes, and simulate semaphores with condition
Chris@42 77 variables */
Chris@42 78 typedef pthread_mutex_t os_mutex_t;
Chris@42 79
Chris@42 80 static void os_mutex_init(os_mutex_t *s)
Chris@42 81 {
Chris@42 82 pthread_mutex_init(s, (pthread_mutexattr_t *)0);
Chris@42 83 }
Chris@42 84
Chris@42 85 static void os_mutex_destroy(os_mutex_t *s) { pthread_mutex_destroy(s); }
Chris@42 86 static void os_mutex_lock(os_mutex_t *s) { pthread_mutex_lock(s); }
Chris@42 87 static void os_mutex_unlock(os_mutex_t *s) { pthread_mutex_unlock(s); }
Chris@42 88
Chris@42 89 typedef struct {
Chris@42 90 pthread_mutex_t m;
Chris@42 91 pthread_cond_t c;
Chris@42 92 volatile int x;
Chris@42 93 } os_sem_t;
Chris@42 94
Chris@42 95 static void os_sem_init(os_sem_t *s)
Chris@42 96 {
Chris@42 97 pthread_mutex_init(&s->m, (pthread_mutexattr_t *)0);
Chris@42 98 pthread_cond_init(&s->c, (pthread_condattr_t *)0);
Chris@42 99
Chris@42 100 /* wrap initialization in lock to exploit the release
Chris@42 101 semantics of pthread_mutex_unlock() */
Chris@42 102 pthread_mutex_lock(&s->m);
Chris@42 103 s->x = 0;
Chris@42 104 pthread_mutex_unlock(&s->m);
Chris@42 105 }
Chris@42 106
Chris@42 107 static void os_sem_destroy(os_sem_t *s)
Chris@42 108 {
Chris@42 109 pthread_mutex_destroy(&s->m);
Chris@42 110 pthread_cond_destroy(&s->c);
Chris@42 111 }
Chris@42 112
Chris@42 113 static void os_sem_down(os_sem_t *s)
Chris@42 114 {
Chris@42 115 pthread_mutex_lock(&s->m);
Chris@42 116 while (s->x <= 0)
Chris@42 117 pthread_cond_wait(&s->c, &s->m);
Chris@42 118 --s->x;
Chris@42 119 pthread_mutex_unlock(&s->m);
Chris@42 120 }
Chris@42 121
Chris@42 122 static void os_sem_up(os_sem_t *s)
Chris@42 123 {
Chris@42 124 pthread_mutex_lock(&s->m);
Chris@42 125 ++s->x;
Chris@42 126 pthread_cond_signal(&s->c);
Chris@42 127 pthread_mutex_unlock(&s->m);
Chris@42 128 }
Chris@42 129
Chris@42 130 #endif
Chris@42 131
Chris@42 132 #define FFTW_WORKER void *
Chris@42 133
Chris@42 134 static void os_create_thread(FFTW_WORKER (*worker)(void *arg),
Chris@42 135 void *arg)
Chris@42 136 {
Chris@42 137 pthread_attr_t attr;
Chris@42 138 pthread_t tid;
Chris@42 139
Chris@42 140 pthread_attr_init(&attr);
Chris@42 141 pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
Chris@42 142 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
Chris@42 143
Chris@42 144 pthread_create(&tid, &attr, worker, (void *)arg);
Chris@42 145 pthread_attr_destroy(&attr);
Chris@42 146 }
Chris@42 147
Chris@42 148 static void os_destroy_thread(void)
Chris@42 149 {
Chris@42 150 pthread_exit((void *)0);
Chris@42 151 }
Chris@42 152
Chris@42 153 /* support for static mutexes */
Chris@42 154 typedef pthread_mutex_t os_static_mutex_t;
Chris@42 155 #define OS_STATIC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
Chris@42 156 static void os_static_mutex_lock(os_static_mutex_t *s) { pthread_mutex_lock(s); }
Chris@42 157 static void os_static_mutex_unlock(os_static_mutex_t *s) { pthread_mutex_unlock(s); }
Chris@42 158
Chris@42 159 #elif defined(__WIN32__) || defined(_WIN32) || defined(_WINDOWS)
Chris@42 160 /* hack: windef.h defines INT for its own purposes and this causes
Chris@42 161 a conflict with our own INT in ifftw.h. Divert the windows
Chris@42 162 definition into another name unlikely to cause a conflict */
Chris@42 163 #define INT magnus_ab_INTegro_seclorum_nascitur_ordo
Chris@42 164 #include <windows.h>
Chris@42 165 #include <process.h>
Chris@42 166 #undef INT
Chris@42 167
Chris@42 168 typedef HANDLE os_mutex_t;
Chris@42 169
Chris@42 170 static void os_mutex_init(os_mutex_t *s)
Chris@42 171 {
Chris@42 172 *s = CreateMutex(NULL, FALSE, NULL);
Chris@42 173 }
Chris@42 174
Chris@42 175 static void os_mutex_destroy(os_mutex_t *s)
Chris@42 176 {
Chris@42 177 CloseHandle(*s);
Chris@42 178 }
Chris@42 179
Chris@42 180 static void os_mutex_lock(os_mutex_t *s)
Chris@42 181 {
Chris@42 182 WaitForSingleObject(*s, INFINITE);
Chris@42 183 }
Chris@42 184
Chris@42 185 static void os_mutex_unlock(os_mutex_t *s)
Chris@42 186 {
Chris@42 187 ReleaseMutex(*s);
Chris@42 188 }
Chris@42 189
Chris@42 190 typedef HANDLE os_sem_t;
Chris@42 191
Chris@42 192 static void os_sem_init(os_sem_t *s)
Chris@42 193 {
Chris@42 194 *s = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL);
Chris@42 195 }
Chris@42 196
Chris@42 197 static void os_sem_destroy(os_sem_t *s)
Chris@42 198 {
Chris@42 199 CloseHandle(*s);
Chris@42 200 }
Chris@42 201
Chris@42 202 static void os_sem_down(os_sem_t *s)
Chris@42 203 {
Chris@42 204 WaitForSingleObject(*s, INFINITE);
Chris@42 205 }
Chris@42 206
Chris@42 207 static void os_sem_up(os_sem_t *s)
Chris@42 208 {
Chris@42 209 ReleaseSemaphore(*s, 1, NULL);
Chris@42 210 }
Chris@42 211
Chris@42 212 #define FFTW_WORKER unsigned __stdcall
Chris@42 213 typedef unsigned (__stdcall *winthread_start) (void *);
Chris@42 214
Chris@42 215 static void os_create_thread(winthread_start worker,
Chris@42 216 void *arg)
Chris@42 217 {
Chris@42 218 _beginthreadex((void *)NULL, /* security attrib */
Chris@42 219 0, /* stack size */
Chris@42 220 worker, /* start address */
Chris@42 221 arg, /* parameters */
Chris@42 222 0, /* creation flags */
Chris@42 223 (unsigned *)NULL); /* tid */
Chris@42 224 }
Chris@42 225
Chris@42 226 static void os_destroy_thread(void)
Chris@42 227 {
Chris@42 228 _endthreadex(0);
Chris@42 229 }
Chris@42 230
Chris@42 231 /* windows does not have statically-initialized mutexes---fake a
Chris@42 232 spinlock */
Chris@42 233 typedef volatile LONG os_static_mutex_t;
Chris@42 234 #define OS_STATIC_MUTEX_INITIALIZER 0
Chris@42 235 static void os_static_mutex_lock(os_static_mutex_t *s)
Chris@42 236 {
Chris@42 237 while (InterlockedExchange(s, 1) == 1) {
Chris@42 238 YieldProcessor();
Chris@42 239 Sleep(0);
Chris@42 240 }
Chris@42 241 }
Chris@42 242 static void os_static_mutex_unlock(os_static_mutex_t *s)
Chris@42 243 {
Chris@42 244 LONG old = InterlockedExchange(s, 0);
Chris@42 245 A(old == 1);
Chris@42 246 }
Chris@42 247 #else
Chris@42 248 #error "No threading layer defined"
Chris@42 249 #endif
Chris@42 250
Chris@42 251 /************************************************************************/
Chris@42 252
Chris@42 253 /* Main code: */
Chris@42 254 struct worker {
Chris@42 255 os_sem_t ready;
Chris@42 256 os_sem_t done;
Chris@42 257 struct work *w;
Chris@42 258 struct worker *cdr;
Chris@42 259 };
Chris@42 260
Chris@42 261 static struct worker *make_worker(void)
Chris@42 262 {
Chris@42 263 struct worker *q = (struct worker *)MALLOC(sizeof(*q), OTHER);
Chris@42 264 os_sem_init(&q->ready);
Chris@42 265 os_sem_init(&q->done);
Chris@42 266 return q;
Chris@42 267 }
Chris@42 268
Chris@42 269 static void unmake_worker(struct worker *q)
Chris@42 270 {
Chris@42 271 os_sem_destroy(&q->done);
Chris@42 272 os_sem_destroy(&q->ready);
Chris@42 273 X(ifree)(q);
Chris@42 274 }
Chris@42 275
Chris@42 276 struct work {
Chris@42 277 spawn_function proc;
Chris@42 278 spawn_data d;
Chris@42 279 struct worker *q; /* the worker responsible for performing this work */
Chris@42 280 };
Chris@42 281
Chris@42 282 static os_mutex_t queue_lock;
Chris@42 283 static os_sem_t termination_semaphore;
Chris@42 284
Chris@42 285 static struct worker *worker_queue;
Chris@42 286 #define WITH_QUEUE_LOCK(what) \
Chris@42 287 { \
Chris@42 288 os_mutex_lock(&queue_lock); \
Chris@42 289 what; \
Chris@42 290 os_mutex_unlock(&queue_lock); \
Chris@42 291 }
Chris@42 292
Chris@42 293 static FFTW_WORKER worker(void *arg)
Chris@42 294 {
Chris@42 295 struct worker *ego = (struct worker *)arg;
Chris@42 296 struct work *w;
Chris@42 297
Chris@42 298 for (;;) {
Chris@42 299 /* wait until work becomes available */
Chris@42 300 os_sem_down(&ego->ready);
Chris@42 301
Chris@42 302 w = ego->w;
Chris@42 303
Chris@42 304 /* !w->proc ==> terminate worker */
Chris@42 305 if (!w->proc) break;
Chris@42 306
Chris@42 307 /* do the work */
Chris@42 308 w->proc(&w->d);
Chris@42 309
Chris@42 310 /* signal that work is done */
Chris@42 311 os_sem_up(&ego->done);
Chris@42 312 }
Chris@42 313
Chris@42 314 /* termination protocol */
Chris@42 315 os_sem_up(&termination_semaphore);
Chris@42 316
Chris@42 317 os_destroy_thread();
Chris@42 318 /* UNREACHABLE */
Chris@42 319 return 0;
Chris@42 320 }
Chris@42 321
Chris@42 322 static void enqueue(struct worker *q)
Chris@42 323 {
Chris@42 324 WITH_QUEUE_LOCK({
Chris@42 325 q->cdr = worker_queue;
Chris@42 326 worker_queue = q;
Chris@42 327 });
Chris@42 328 }
Chris@42 329
Chris@42 330 static struct worker *dequeue(void)
Chris@42 331 {
Chris@42 332 struct worker *q;
Chris@42 333
Chris@42 334 WITH_QUEUE_LOCK({
Chris@42 335 q = worker_queue;
Chris@42 336 if (q)
Chris@42 337 worker_queue = q->cdr;
Chris@42 338 });
Chris@42 339
Chris@42 340 if (!q) {
Chris@42 341 /* no worker is available. Create one */
Chris@42 342 q = make_worker();
Chris@42 343 os_create_thread(worker, q);
Chris@42 344 }
Chris@42 345
Chris@42 346 return q;
Chris@42 347 }
Chris@42 348
Chris@42 349
Chris@42 350 static void kill_workforce(void)
Chris@42 351 {
Chris@42 352 struct work w;
Chris@42 353
Chris@42 354 w.proc = 0;
Chris@42 355
Chris@42 356 THREAD_ON; /* needed for debugging mode: since make_worker
Chris@42 357 is called from dequeue which is only called in
Chris@42 358 thread_on mode, we need to unmake_worker in thread_on. */
Chris@42 359 WITH_QUEUE_LOCK({
Chris@42 360 /* tell all workers that they must terminate.
Chris@42 361
Chris@42 362 Because workers enqueue themselves before signaling the
Chris@42 363 completion of the work, all workers belong to the worker queue
Chris@42 364 if we get here. Also, all workers are waiting at
Chris@42 365 os_sem_down(ready), so we can hold the queue lock without
Chris@42 366 deadlocking */
Chris@42 367 while (worker_queue) {
Chris@42 368 struct worker *q = worker_queue;
Chris@42 369 worker_queue = q->cdr;
Chris@42 370 q->w = &w;
Chris@42 371 os_sem_up(&q->ready);
Chris@42 372 os_sem_down(&termination_semaphore);
Chris@42 373 unmake_worker(q);
Chris@42 374 }
Chris@42 375 });
Chris@42 376 THREAD_OFF;
Chris@42 377 }
Chris@42 378
Chris@42 379 static os_static_mutex_t initialization_mutex = OS_STATIC_MUTEX_INITIALIZER;
Chris@42 380
Chris@42 381 int X(ithreads_init)(void)
Chris@42 382 {
Chris@42 383 os_static_mutex_lock(&initialization_mutex); {
Chris@42 384 os_mutex_init(&queue_lock);
Chris@42 385 os_sem_init(&termination_semaphore);
Chris@42 386
Chris@42 387 WITH_QUEUE_LOCK({
Chris@42 388 worker_queue = 0;
Chris@42 389 });
Chris@42 390 } os_static_mutex_unlock(&initialization_mutex);
Chris@42 391
Chris@42 392 return 0; /* no error */
Chris@42 393 }
Chris@42 394
Chris@42 395 /* Distribute a loop from 0 to loopmax-1 over nthreads threads.
Chris@42 396 proc(d) is called to execute a block of iterations from d->min
Chris@42 397 to d->max-1. d->thr_num indicate the number of the thread
Chris@42 398 that is executing proc (from 0 to nthreads-1), and d->data is
Chris@42 399 the same as the data parameter passed to X(spawn_loop).
Chris@42 400
Chris@42 401 This function returns only after all the threads have completed. */
Chris@42 402 void X(spawn_loop)(int loopmax, int nthr, spawn_function proc, void *data)
Chris@42 403 {
Chris@42 404 int block_size;
Chris@42 405 struct work *r;
Chris@42 406 int i;
Chris@42 407
Chris@42 408 A(loopmax >= 0);
Chris@42 409 A(nthr > 0);
Chris@42 410 A(proc);
Chris@42 411
Chris@42 412 if (!loopmax) return;
Chris@42 413
Chris@42 414 /* Choose the block size and number of threads in order to (1)
Chris@42 415 minimize the critical path and (2) use the fewest threads that
Chris@42 416 achieve the same critical path (to minimize overhead).
Chris@42 417 e.g. if loopmax is 5 and nthr is 4, we should use only 3
Chris@42 418 threads with block sizes of 2, 2, and 1. */
Chris@42 419 block_size = (loopmax + nthr - 1) / nthr;
Chris@42 420 nthr = (loopmax + block_size - 1) / block_size;
Chris@42 421
Chris@42 422 THREAD_ON; /* prevent debugging mode from failing under threads */
Chris@42 423 STACK_MALLOC(struct work *, r, sizeof(struct work) * nthr);
Chris@42 424
Chris@42 425 /* distribute work: */
Chris@42 426 for (i = 0; i < nthr; ++i) {
Chris@42 427 struct work *w = &r[i];
Chris@42 428 spawn_data *d = &w->d;
Chris@42 429
Chris@42 430 d->max = (d->min = i * block_size) + block_size;
Chris@42 431 if (d->max > loopmax)
Chris@42 432 d->max = loopmax;
Chris@42 433 d->thr_num = i;
Chris@42 434 d->data = data;
Chris@42 435 w->proc = proc;
Chris@42 436
Chris@42 437 if (i == nthr - 1) {
Chris@42 438 /* do the work ourselves */
Chris@42 439 proc(d);
Chris@42 440 } else {
Chris@42 441 /* assign a worker to W */
Chris@42 442 w->q = dequeue();
Chris@42 443
Chris@42 444 /* tell worker w->q to do it */
Chris@42 445 w->q->w = w; /* Dirac could have written this */
Chris@42 446 os_sem_up(&w->q->ready);
Chris@42 447 }
Chris@42 448 }
Chris@42 449
Chris@42 450 for (i = 0; i < nthr - 1; ++i) {
Chris@42 451 struct work *w = &r[i];
Chris@42 452 os_sem_down(&w->q->done);
Chris@42 453 enqueue(w->q);
Chris@42 454 }
Chris@42 455
Chris@42 456 STACK_FREE(r);
Chris@42 457 THREAD_OFF; /* prevent debugging mode from failing under threads */
Chris@42 458 }
Chris@42 459
Chris@42 460 void X(threads_cleanup)(void)
Chris@42 461 {
Chris@42 462 kill_workforce();
Chris@42 463 os_mutex_destroy(&queue_lock);
Chris@42 464 os_sem_destroy(&termination_semaphore);
Chris@42 465 }
Chris@42 466
Chris@42 467 static os_static_mutex_t install_planner_hooks_mutex = OS_STATIC_MUTEX_INITIALIZER;
Chris@42 468 static os_mutex_t planner_mutex;
Chris@42 469 static int planner_hooks_installed = 0;
Chris@42 470
Chris@42 471 static void lock_planner_mutex(void)
Chris@42 472 {
Chris@42 473 os_mutex_lock(&planner_mutex);
Chris@42 474 }
Chris@42 475
Chris@42 476 static void unlock_planner_mutex(void)
Chris@42 477 {
Chris@42 478 os_mutex_unlock(&planner_mutex);
Chris@42 479 }
Chris@42 480
Chris@42 481 void X(threads_register_planner_hooks)(void)
Chris@42 482 {
Chris@42 483 os_static_mutex_lock(&install_planner_hooks_mutex); {
Chris@42 484 if (!planner_hooks_installed) {
Chris@42 485 os_mutex_init(&planner_mutex);
Chris@42 486 X(set_planner_hooks)(lock_planner_mutex, unlock_planner_mutex);
Chris@42 487 planner_hooks_installed = 1;
Chris@42 488 }
Chris@42 489 } os_static_mutex_unlock(&install_planner_hooks_mutex);
Chris@42 490 }