comparison data/fft/FFTDataServer.cpp @ 549:388afa99d537

* More careful (I hope!) locking
author Chris Cannam
date Thu, 05 Feb 2009 12:53:19 +0000
parents 1469caaa8e67
children 107d3f3705c9
comparison
equal deleted inserted replaced
548:1469caaa8e67 549:388afa99d537
600 if (m_fillThread) { 600 if (m_fillThread) {
601 m_fillThread->wait(); 601 m_fillThread->wait();
602 delete m_fillThread; 602 delete m_fillThread;
603 } 603 }
604 604
605 MutexLocker locker(&m_writeMutex, 605 // MutexLocker locker(&m_writeMutex,
606 "FFTDataServer::~FFTDataServer::m_writeMutex"); 606 // "FFTDataServer::~FFTDataServer::m_writeMutex");
607
608 QMutexLocker mlocker(&m_fftBuffersLock);
609 QWriteLocker wlocker(&m_cacheVectorLock);
607 610
608 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) { 611 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) {
609
610 if (*i) { 612 if (*i) {
611 delete *i; 613 delete *i;
612 } 614 }
613 } 615 }
614 616
636 #ifdef DEBUG_FFT_SERVER 638 #ifdef DEBUG_FFT_SERVER
637 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspend" << std::endl; 639 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspend" << std::endl;
638 #endif 640 #endif
639 Profiler profiler("FFTDataServer::suspend", false); 641 Profiler profiler("FFTDataServer::suspend", false);
640 642
641 MutexLocker locker(&m_writeMutex, 643 QMutexLocker locker(&m_fftBuffersLock);
642 "FFTDataServer::suspend::m_writeMutex");
643 m_suspended = true; 644 m_suspended = true;
644 } 645 }
645 646
646 void 647 void
647 FFTDataServer::suspendWrites() 648 FFTDataServer::suspendWrites()
897 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); 898 Profiler profiler("FFTDataServer::getMagnitudeAt: filling");
898 #ifdef DEBUG_FFT_SERVER 899 #ifdef DEBUG_FFT_SERVER
899 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" 900 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn("
900 << x << ")" << std::endl; 901 << x << ")" << std::endl;
901 #endif 902 #endif
902 // hold mutex so that write thread doesn't mess with class 903 fillColumn(x, cache);
903 // member data in fillColumn
904 MutexLocker locker(&m_writeMutex,
905 "FFTDataServer::getMagnitudeAt: m_writeMutex");
906 fillColumn(x, true);
907 } 904 }
908 return cache->getMagnitudeAt(col, y); 905 return cache->getMagnitudeAt(col, y);
909 } 906 }
910 907
911 bool 908 bool
926 if (!cache) return false; 923 if (!cache) return false;
927 924
928 //!!! n.b. can throw 925 //!!! n.b. can throw
929 if (!cache->haveSetColumnAt(col)) { 926 if (!cache->haveSetColumnAt(col)) {
930 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); 927 Profiler profiler("FFTDataServer::getMagnitudesAt: filling");
931 MutexLocker locker(&m_writeMutex, 928 fillColumn(x, cache);
932 "FFTDataServer::getMagnitudesAt: m_writeMutex");
933 fillColumn(x, true);
934 } 929 }
935 930
936 cache->getMagnitudesAt(col, values, minbin, count, step); 931 cache->getMagnitudesAt(col, values, minbin, count, step);
937 932
938 return true; 933 return true;
950 if (!cache) return 0; 945 if (!cache) return 0;
951 946
952 //!!! n.b. can throw 947 //!!! n.b. can throw
953 if (!cache->haveSetColumnAt(col)) { 948 if (!cache->haveSetColumnAt(col)) {
954 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); 949 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling");
955 // hold mutex so that write thread doesn't mess with class 950 fillColumn(x, cache);
956 // member data in fillColumn
957 MutexLocker locker(&m_writeMutex,
958 "FFTDataServer::getNormalizedMagnitudeAt: m_writeMutex");
959 fillColumn(x, true);
960 } 951 }
961 return cache->getNormalizedMagnitudeAt(col, y); 952 return cache->getNormalizedMagnitudeAt(col, y);
962 } 953 }
963 954
964 bool 955 bool
979 if (!cache) return false; 970 if (!cache) return false;
980 971
981 //!!! n.b. can throw 972 //!!! n.b. can throw
982 if (!cache->haveSetColumnAt(col)) { 973 if (!cache->haveSetColumnAt(col)) {
983 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); 974 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling");
984 MutexLocker locker(&m_writeMutex, 975 fillColumn(x, cache);
985 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex");
986 fillColumn(x, true);
987 } 976 }
988 977
989 for (size_t i = 0; i < count; ++i) { 978 for (size_t i = 0; i < count; ++i) {
990 values[i] = cache->getNormalizedMagnitudeAt(col, i * step + minbin); 979 values[i] = cache->getNormalizedMagnitudeAt(col, i * step + minbin);
991 } 980 }
1005 if (!cache) return 0; 994 if (!cache) return 0;
1006 995
1007 //!!! n.b. can throw 996 //!!! n.b. can throw
1008 if (!cache->haveSetColumnAt(col)) { 997 if (!cache->haveSetColumnAt(col)) {
1009 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); 998 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling");
1010 // hold mutex so that write thread doesn't mess with class 999 fillColumn(x, cache);
1011 // member data in fillColumn
1012 MutexLocker locker(&m_writeMutex,
1013 "FFTDataServer::getMaximumMagnitudeAt: m_writeMutex");
1014 fillColumn(x, true);
1015 } 1000 }
1016 return cache->getMaximumMagnitudeAt(col); 1001 return cache->getMaximumMagnitudeAt(col);
1017 } 1002 }
1018 1003
1019 float 1004 float
1028 if (!cache) return 0; 1013 if (!cache) return 0;
1029 1014
1030 //!!! n.b. can throw 1015 //!!! n.b. can throw
1031 if (!cache->haveSetColumnAt(col)) { 1016 if (!cache->haveSetColumnAt(col)) {
1032 Profiler profiler("FFTDataServer::getPhaseAt: filling"); 1017 Profiler profiler("FFTDataServer::getPhaseAt: filling");
1033 // hold mutex so that write thread doesn't mess with class 1018 fillColumn(x, cache);
1034 // member data in fillColumn
1035 MutexLocker locker(&m_writeMutex,
1036 "FFTDataServer::getPhaseAt: m_writeMutex");
1037 fillColumn(x, true);
1038 } 1019 }
1039 return cache->getPhaseAt(col, y); 1020 return cache->getPhaseAt(col, y);
1040 } 1021 }
1041 1022
1042 bool 1023 bool
1057 if (!cache) return false; 1038 if (!cache) return false;
1058 1039
1059 //!!! n.b. can throw 1040 //!!! n.b. can throw
1060 if (!cache->haveSetColumnAt(col)) { 1041 if (!cache->haveSetColumnAt(col)) {
1061 Profiler profiler("FFTDataServer::getPhasesAt: filling"); 1042 Profiler profiler("FFTDataServer::getPhasesAt: filling");
1062 MutexLocker locker(&m_writeMutex, 1043 fillColumn(x, cache);
1063 "FFTDataServer::getPhasesAt: m_writeMutex");
1064 fillColumn(x, true);
1065 } 1044 }
1066 1045
1067 for (size_t i = 0; i < count; ++i) { 1046 for (size_t i = 0; i < count; ++i) {
1068 values[i] = cache->getPhaseAt(col, i * step + minbin); 1047 values[i] = cache->getPhaseAt(col, i * step + minbin);
1069 } 1048 }
1095 if (!cache->haveSetColumnAt(col)) { 1074 if (!cache->haveSetColumnAt(col)) {
1096 Profiler profiler("FFTDataServer::getValuesAt: filling"); 1075 Profiler profiler("FFTDataServer::getValuesAt: filling");
1097 #ifdef DEBUG_FFT_SERVER 1076 #ifdef DEBUG_FFT_SERVER
1098 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; 1077 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl;
1099 #endif 1078 #endif
1100 // hold mutex so that write thread doesn't mess with class 1079 fillColumn(x, cache);
1101 // member data in fillColumn
1102 MutexLocker locker(&m_writeMutex,
1103 "FFTDataServer::getValuesAt: m_writeMutex");
1104 fillColumn(x, true);
1105 } 1080 }
1106 1081
1107 cache->getValuesAt(col, y, real, imaginary); 1082 cache->getValuesAt(col, y, real, imaginary);
1108 } 1083 }
1109 1084
1134 //!!! n.b. can throw 1109 //!!! n.b. can throw
1135 return cache->haveSetColumnAt(col); 1110 return cache->haveSetColumnAt(col);
1136 } 1111 }
1137 1112
1138 void 1113 void
1139 FFTDataServer::fillColumn(size_t x, bool lockHeld) 1114 FFTDataServer::fillColumn(size_t x, FFTCacheReader *tester)
1140 { 1115 {
1141 Profiler profiler("FFTDataServer::fillColumn", false); 1116 Profiler profiler("FFTDataServer::fillColumn", false);
1142 1117
1143 if (!m_model->isReady()) { 1118 if (!m_model->isReady()) {
1144 std::cerr << "WARNING: FFTDataServer::fillColumn(" 1119 std::cerr << "WARNING: FFTDataServer::fillColumn("
1145 << x << "): model not yet ready" << std::endl; 1120 << x << "): model not yet ready" << std::endl;
1146 return; 1121 return;
1147 } 1122 }
1148 1123 /*
1149 if (!m_fftInput) { 1124 if (!m_fftInput) {
1150 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " 1125 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
1151 << "input has already been completed and discarded?" 1126 << "input has already been completed and discarded?"
1152 << std::endl; 1127 << std::endl;
1153 return; 1128 return;
1154 } 1129 }
1155 1130 */
1156 if (x >= m_width) { 1131 if (x >= m_width) {
1157 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " 1132 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
1158 << "x > width (" << x << " > " << m_width << ")" 1133 << "x > width (" << x << " > " << m_width << ")"
1159 << std::endl; 1134 << std::endl;
1160 return; 1135 return;
1177 int startFrame = m_windowIncrement * x; 1152 int startFrame = m_windowIncrement * x;
1178 int endFrame = startFrame + m_windowSize; 1153 int endFrame = startFrame + m_windowSize;
1179 1154
1180 startFrame -= winsize / 2; 1155 startFrame -= winsize / 2;
1181 endFrame -= winsize / 2; 1156 endFrame -= winsize / 2;
1182
1183 for (int i = 0; i < off; ++i) {
1184 m_fftInput[i] = 0.0;
1185 }
1186
1187 for (int i = 0; i < off; ++i) {
1188 m_fftInput[fftsize - i - 1] = 0.0;
1189 }
1190
1191 if (startFrame < 0) {
1192 pfx = -startFrame;
1193 for (int i = 0; i < pfx; ++i) {
1194 m_fftInput[off + i] = 0.0;
1195 }
1196 }
1197 1157
1198 #ifdef DEBUG_FFT_SERVER_FILL 1158 #ifdef DEBUG_FFT_SERVER_FILL
1199 std::cerr << "FFTDataServer::fillColumn: requesting frames " 1159 std::cerr << "FFTDataServer::fillColumn: requesting frames "
1200 << startFrame + pfx << " -> " << endFrame << " ( = " 1160 << startFrame + pfx << " -> " << endFrame << " ( = "
1201 << endFrame - (startFrame + pfx) << ") at index " 1161 << endFrame - (startFrame + pfx) << ") at index "
1202 << off + pfx << " in buffer of size " << m_fftSize 1162 << off + pfx << " in buffer of size " << m_fftSize
1203 << " with window size " << m_windowSize 1163 << " with window size " << m_windowSize
1204 << " from channel " << m_channel << std::endl; 1164 << " from channel " << m_channel << std::endl;
1205 #endif 1165 #endif
1206 1166
1167 QMutexLocker locker(&m_fftBuffersLock);
1168
1169 if (tester) {
1170 // We are being called from a function that wanted to obtain a
1171 // column using an FFTCacheReader. Before calling us, it
1172 // checked whether the column was available already, and the
1173 // reader reported that it wasn't. Now we test again with the
1174 // mutex held, to avoid a race condition in case another
1175 // thread has called fillColumn at the same time.
1176 if (tester->haveSetColumnAt(x & m_cacheWidthMask)) {
1177 return;
1178 }
1179 }
1180
1181 if (!m_fftInput) {
1182 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
1183 << "input has already been completed and discarded?"
1184 << std::endl;
1185 return;
1186 }
1187
1188 for (int i = 0; i < off; ++i) {
1189 m_fftInput[i] = 0.0;
1190 }
1191
1192 for (int i = 0; i < off; ++i) {
1193 m_fftInput[fftsize - i - 1] = 0.0;
1194 }
1195
1196 if (startFrame < 0) {
1197 pfx = -startFrame;
1198 for (int i = 0; i < pfx; ++i) {
1199 m_fftInput[off + i] = 0.0;
1200 }
1201 }
1202
1207 int count = 0; 1203 int count = 0;
1208 if (endFrame > startFrame + pfx) count = endFrame - (startFrame + pfx); 1204 if (endFrame > startFrame + pfx) count = endFrame - (startFrame + pfx);
1209 1205
1210 int got = m_model->getData(m_channel, startFrame + pfx, 1206 int got = m_model->getData(m_channel, startFrame + pfx,
1211 count, m_fftInput + off + pfx); 1207 count, m_fftInput + off + pfx);
1232 m_fftInput[i + hs] = temp; 1228 m_fftInput[i + hs] = temp;
1233 } 1229 }
1234 1230
1235 fftf_execute(m_fftPlan); 1231 fftf_execute(m_fftPlan);
1236 1232
1237 // If our cache uses polar storage, it's more friendly for us to
1238 // do the conversion before taking the write mutex
1239
1240 float factor = 0.f; 1233 float factor = 0.f;
1241 1234
1242 if (cache->getStorageType() == FFTCache::Compact || 1235 if (cache->getStorageType() == FFTCache::Compact ||
1243 cache->getStorageType() == FFTCache::Polar) { 1236 cache->getStorageType() == FFTCache::Polar) {
1244 1237
1259 } 1252 }
1260 } 1253 }
1261 1254
1262 Profiler subprof("FFTDataServer::fillColumn: set to cache"); 1255 Profiler subprof("FFTDataServer::fillColumn: set to cache");
1263 1256
1264 { 1257 if (cache->getStorageType() == FFTCache::Compact ||
1265 MutexLocker locker(lockHeld ? 0 : &m_writeMutex, 1258 cache->getStorageType() == FFTCache::Polar) {
1266 "FFTDataServer::fillColumn: m_writeMutex [2]");
1267
1268 if (cache->getStorageType() == FFTCache::Compact ||
1269 cache->getStorageType() == FFTCache::Polar) {
1270 1259
1271 cache->setColumnAt(col, 1260 cache->setColumnAt(col,
1272 m_workbuffer, 1261 m_workbuffer,
1273 m_workbuffer + hs + 1, 1262 m_workbuffer + hs + 1,
1274 factor); 1263 factor);
1275 1264
1276 } else { 1265 } else {
1277 1266
1278 cache->setColumnAt(col, 1267 cache->setColumnAt(col,
1279 m_workbuffer, 1268 m_workbuffer,
1280 m_workbuffer + hs + 1); 1269 m_workbuffer + hs + 1);
1281 }
1282 } 1270 }
1283 1271
1284 if (m_suspended) { 1272 if (m_suspended) {
1285 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; 1273 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl;
1286 // resume(); 1274 // resume();
1374 1362
1375 if (m_fillFrom > start) { 1363 if (m_fillFrom > start) {
1376 1364
1377 for (size_t f = m_fillFrom; f < end; f += m_server.m_windowIncrement) { 1365 for (size_t f = m_fillFrom; f < end; f += m_server.m_windowIncrement) {
1378 1366
1379 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement), 1367 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement));
1380 false);
1381 1368
1382 if (m_server.m_exiting) return; 1369 if (m_server.m_exiting) return;
1383 1370
1384 while (m_server.m_suspended) { 1371 while (m_server.m_suspended) {
1385 #ifdef DEBUG_FFT_SERVER 1372 #ifdef DEBUG_FFT_SERVER
1386 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; 1373 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl;
1387 #endif 1374 #endif
1388 { 1375 MutexLocker locker(&m_server.m_fftBuffersLock,
1389 MutexLocker locker(&m_server.m_writeMutex, 1376 "FFTDataServer::run::m_fftBuffersLock [1]");
1390 "FFTDataServer::run::m_writeMutex [1]"); 1377 if (m_server.m_suspended && !m_server.m_exiting) {
1391 m_server.m_condition.wait(&m_server.m_writeMutex, 10000); 1378 m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000);
1392 } 1379 }
1393 #ifdef DEBUG_FFT_SERVER 1380 #ifdef DEBUG_FFT_SERVER
1394 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): waited" << std::endl; 1381 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): waited" << std::endl;
1395 #endif 1382 #endif
1396 if (m_server.m_exiting) return; 1383 if (m_server.m_exiting) return;
1415 1402
1416 size_t baseCompletion = m_completion; 1403 size_t baseCompletion = m_completion;
1417 1404
1418 for (size_t f = start; f < remainingEnd; f += m_server.m_windowIncrement) { 1405 for (size_t f = start; f < remainingEnd; f += m_server.m_windowIncrement) {
1419 1406
1420 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement), 1407 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement));
1421 false);
1422 1408
1423 if (m_server.m_exiting) return; 1409 if (m_server.m_exiting) return;
1424 1410
1425 while (m_server.m_suspended) { 1411 while (m_server.m_suspended) {
1426 #ifdef DEBUG_FFT_SERVER 1412 #ifdef DEBUG_FFT_SERVER
1427 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; 1413 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl;
1428 #endif 1414 #endif
1429 { 1415 {
1430 MutexLocker locker(&m_server.m_writeMutex, 1416 MutexLocker locker(&m_server.m_fftBuffersLock,
1431 "FFTDataServer::run::m_writeMutex [2]"); 1417 "FFTDataServer::run::m_fftBuffersLock [2]");
1432 m_server.m_condition.wait(&m_server.m_writeMutex, 10000); 1418 if (m_server.m_suspended && !m_server.m_exiting) {
1419 m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000);
1420 }
1433 } 1421 }
1434 if (m_server.m_exiting) return; 1422 if (m_server.m_exiting) return;
1435 } 1423 }
1436 1424
1437 if (++counter == updateAt) { 1425 if (++counter == updateAt) {