Mercurial > hg > svcore
comparison data/fft/FFTDataServer.cpp @ 549:388afa99d537
* More careful (I hope!) locking
author | Chris Cannam |
---|---|
date | Thu, 05 Feb 2009 12:53:19 +0000 |
parents | 1469caaa8e67 |
children | 107d3f3705c9 |
comparison
equal
deleted
inserted
replaced
548:1469caaa8e67 | 549:388afa99d537 |
---|---|
600 if (m_fillThread) { | 600 if (m_fillThread) { |
601 m_fillThread->wait(); | 601 m_fillThread->wait(); |
602 delete m_fillThread; | 602 delete m_fillThread; |
603 } | 603 } |
604 | 604 |
605 MutexLocker locker(&m_writeMutex, | 605 // MutexLocker locker(&m_writeMutex, |
606 "FFTDataServer::~FFTDataServer::m_writeMutex"); | 606 // "FFTDataServer::~FFTDataServer::m_writeMutex"); |
607 | |
608 QMutexLocker mlocker(&m_fftBuffersLock); | |
609 QWriteLocker wlocker(&m_cacheVectorLock); | |
607 | 610 |
608 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) { | 611 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) { |
609 | |
610 if (*i) { | 612 if (*i) { |
611 delete *i; | 613 delete *i; |
612 } | 614 } |
613 } | 615 } |
614 | 616 |
636 #ifdef DEBUG_FFT_SERVER | 638 #ifdef DEBUG_FFT_SERVER |
637 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspend" << std::endl; | 639 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspend" << std::endl; |
638 #endif | 640 #endif |
639 Profiler profiler("FFTDataServer::suspend", false); | 641 Profiler profiler("FFTDataServer::suspend", false); |
640 | 642 |
641 MutexLocker locker(&m_writeMutex, | 643 QMutexLocker locker(&m_fftBuffersLock); |
642 "FFTDataServer::suspend::m_writeMutex"); | |
643 m_suspended = true; | 644 m_suspended = true; |
644 } | 645 } |
645 | 646 |
646 void | 647 void |
647 FFTDataServer::suspendWrites() | 648 FFTDataServer::suspendWrites() |
897 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); | 898 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); |
898 #ifdef DEBUG_FFT_SERVER | 899 #ifdef DEBUG_FFT_SERVER |
899 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" | 900 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" |
900 << x << ")" << std::endl; | 901 << x << ")" << std::endl; |
901 #endif | 902 #endif |
902 // hold mutex so that write thread doesn't mess with class | 903 fillColumn(x, cache); |
903 // member data in fillColumn | |
904 MutexLocker locker(&m_writeMutex, | |
905 "FFTDataServer::getMagnitudeAt: m_writeMutex"); | |
906 fillColumn(x, true); | |
907 } | 904 } |
908 return cache->getMagnitudeAt(col, y); | 905 return cache->getMagnitudeAt(col, y); |
909 } | 906 } |
910 | 907 |
911 bool | 908 bool |
926 if (!cache) return false; | 923 if (!cache) return false; |
927 | 924 |
928 //!!! n.b. can throw | 925 //!!! n.b. can throw |
929 if (!cache->haveSetColumnAt(col)) { | 926 if (!cache->haveSetColumnAt(col)) { |
930 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); | 927 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); |
931 MutexLocker locker(&m_writeMutex, | 928 fillColumn(x, cache); |
932 "FFTDataServer::getMagnitudesAt: m_writeMutex"); | |
933 fillColumn(x, true); | |
934 } | 929 } |
935 | 930 |
936 cache->getMagnitudesAt(col, values, minbin, count, step); | 931 cache->getMagnitudesAt(col, values, minbin, count, step); |
937 | 932 |
938 return true; | 933 return true; |
950 if (!cache) return 0; | 945 if (!cache) return 0; |
951 | 946 |
952 //!!! n.b. can throw | 947 //!!! n.b. can throw |
953 if (!cache->haveSetColumnAt(col)) { | 948 if (!cache->haveSetColumnAt(col)) { |
954 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); | 949 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); |
955 // hold mutex so that write thread doesn't mess with class | 950 fillColumn(x, cache); |
956 // member data in fillColumn | |
957 MutexLocker locker(&m_writeMutex, | |
958 "FFTDataServer::getNormalizedMagnitudeAt: m_writeMutex"); | |
959 fillColumn(x, true); | |
960 } | 951 } |
961 return cache->getNormalizedMagnitudeAt(col, y); | 952 return cache->getNormalizedMagnitudeAt(col, y); |
962 } | 953 } |
963 | 954 |
964 bool | 955 bool |
979 if (!cache) return false; | 970 if (!cache) return false; |
980 | 971 |
981 //!!! n.b. can throw | 972 //!!! n.b. can throw |
982 if (!cache->haveSetColumnAt(col)) { | 973 if (!cache->haveSetColumnAt(col)) { |
983 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); | 974 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); |
984 MutexLocker locker(&m_writeMutex, | 975 fillColumn(x, cache); |
985 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex"); | |
986 fillColumn(x, true); | |
987 } | 976 } |
988 | 977 |
989 for (size_t i = 0; i < count; ++i) { | 978 for (size_t i = 0; i < count; ++i) { |
990 values[i] = cache->getNormalizedMagnitudeAt(col, i * step + minbin); | 979 values[i] = cache->getNormalizedMagnitudeAt(col, i * step + minbin); |
991 } | 980 } |
1005 if (!cache) return 0; | 994 if (!cache) return 0; |
1006 | 995 |
1007 //!!! n.b. can throw | 996 //!!! n.b. can throw |
1008 if (!cache->haveSetColumnAt(col)) { | 997 if (!cache->haveSetColumnAt(col)) { |
1009 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); | 998 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); |
1010 // hold mutex so that write thread doesn't mess with class | 999 fillColumn(x, cache); |
1011 // member data in fillColumn | |
1012 MutexLocker locker(&m_writeMutex, | |
1013 "FFTDataServer::getMaximumMagnitudeAt: m_writeMutex"); | |
1014 fillColumn(x, true); | |
1015 } | 1000 } |
1016 return cache->getMaximumMagnitudeAt(col); | 1001 return cache->getMaximumMagnitudeAt(col); |
1017 } | 1002 } |
1018 | 1003 |
1019 float | 1004 float |
1028 if (!cache) return 0; | 1013 if (!cache) return 0; |
1029 | 1014 |
1030 //!!! n.b. can throw | 1015 //!!! n.b. can throw |
1031 if (!cache->haveSetColumnAt(col)) { | 1016 if (!cache->haveSetColumnAt(col)) { |
1032 Profiler profiler("FFTDataServer::getPhaseAt: filling"); | 1017 Profiler profiler("FFTDataServer::getPhaseAt: filling"); |
1033 // hold mutex so that write thread doesn't mess with class | 1018 fillColumn(x, cache); |
1034 // member data in fillColumn | |
1035 MutexLocker locker(&m_writeMutex, | |
1036 "FFTDataServer::getPhaseAt: m_writeMutex"); | |
1037 fillColumn(x, true); | |
1038 } | 1019 } |
1039 return cache->getPhaseAt(col, y); | 1020 return cache->getPhaseAt(col, y); |
1040 } | 1021 } |
1041 | 1022 |
1042 bool | 1023 bool |
1057 if (!cache) return false; | 1038 if (!cache) return false; |
1058 | 1039 |
1059 //!!! n.b. can throw | 1040 //!!! n.b. can throw |
1060 if (!cache->haveSetColumnAt(col)) { | 1041 if (!cache->haveSetColumnAt(col)) { |
1061 Profiler profiler("FFTDataServer::getPhasesAt: filling"); | 1042 Profiler profiler("FFTDataServer::getPhasesAt: filling"); |
1062 MutexLocker locker(&m_writeMutex, | 1043 fillColumn(x, cache); |
1063 "FFTDataServer::getPhasesAt: m_writeMutex"); | |
1064 fillColumn(x, true); | |
1065 } | 1044 } |
1066 | 1045 |
1067 for (size_t i = 0; i < count; ++i) { | 1046 for (size_t i = 0; i < count; ++i) { |
1068 values[i] = cache->getPhaseAt(col, i * step + minbin); | 1047 values[i] = cache->getPhaseAt(col, i * step + minbin); |
1069 } | 1048 } |
1095 if (!cache->haveSetColumnAt(col)) { | 1074 if (!cache->haveSetColumnAt(col)) { |
1096 Profiler profiler("FFTDataServer::getValuesAt: filling"); | 1075 Profiler profiler("FFTDataServer::getValuesAt: filling"); |
1097 #ifdef DEBUG_FFT_SERVER | 1076 #ifdef DEBUG_FFT_SERVER |
1098 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; | 1077 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; |
1099 #endif | 1078 #endif |
1100 // hold mutex so that write thread doesn't mess with class | 1079 fillColumn(x, cache); |
1101 // member data in fillColumn | |
1102 MutexLocker locker(&m_writeMutex, | |
1103 "FFTDataServer::getValuesAt: m_writeMutex"); | |
1104 fillColumn(x, true); | |
1105 } | 1080 } |
1106 | 1081 |
1107 cache->getValuesAt(col, y, real, imaginary); | 1082 cache->getValuesAt(col, y, real, imaginary); |
1108 } | 1083 } |
1109 | 1084 |
1134 //!!! n.b. can throw | 1109 //!!! n.b. can throw |
1135 return cache->haveSetColumnAt(col); | 1110 return cache->haveSetColumnAt(col); |
1136 } | 1111 } |
1137 | 1112 |
1138 void | 1113 void |
1139 FFTDataServer::fillColumn(size_t x, bool lockHeld) | 1114 FFTDataServer::fillColumn(size_t x, FFTCacheReader *tester) |
1140 { | 1115 { |
1141 Profiler profiler("FFTDataServer::fillColumn", false); | 1116 Profiler profiler("FFTDataServer::fillColumn", false); |
1142 | 1117 |
1143 if (!m_model->isReady()) { | 1118 if (!m_model->isReady()) { |
1144 std::cerr << "WARNING: FFTDataServer::fillColumn(" | 1119 std::cerr << "WARNING: FFTDataServer::fillColumn(" |
1145 << x << "): model not yet ready" << std::endl; | 1120 << x << "): model not yet ready" << std::endl; |
1146 return; | 1121 return; |
1147 } | 1122 } |
1148 | 1123 /* |
1149 if (!m_fftInput) { | 1124 if (!m_fftInput) { |
1150 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " | 1125 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " |
1151 << "input has already been completed and discarded?" | 1126 << "input has already been completed and discarded?" |
1152 << std::endl; | 1127 << std::endl; |
1153 return; | 1128 return; |
1154 } | 1129 } |
1155 | 1130 */ |
1156 if (x >= m_width) { | 1131 if (x >= m_width) { |
1157 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " | 1132 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " |
1158 << "x > width (" << x << " > " << m_width << ")" | 1133 << "x > width (" << x << " > " << m_width << ")" |
1159 << std::endl; | 1134 << std::endl; |
1160 return; | 1135 return; |
1177 int startFrame = m_windowIncrement * x; | 1152 int startFrame = m_windowIncrement * x; |
1178 int endFrame = startFrame + m_windowSize; | 1153 int endFrame = startFrame + m_windowSize; |
1179 | 1154 |
1180 startFrame -= winsize / 2; | 1155 startFrame -= winsize / 2; |
1181 endFrame -= winsize / 2; | 1156 endFrame -= winsize / 2; |
1182 | |
1183 for (int i = 0; i < off; ++i) { | |
1184 m_fftInput[i] = 0.0; | |
1185 } | |
1186 | |
1187 for (int i = 0; i < off; ++i) { | |
1188 m_fftInput[fftsize - i - 1] = 0.0; | |
1189 } | |
1190 | |
1191 if (startFrame < 0) { | |
1192 pfx = -startFrame; | |
1193 for (int i = 0; i < pfx; ++i) { | |
1194 m_fftInput[off + i] = 0.0; | |
1195 } | |
1196 } | |
1197 | 1157 |
1198 #ifdef DEBUG_FFT_SERVER_FILL | 1158 #ifdef DEBUG_FFT_SERVER_FILL |
1199 std::cerr << "FFTDataServer::fillColumn: requesting frames " | 1159 std::cerr << "FFTDataServer::fillColumn: requesting frames " |
1200 << startFrame + pfx << " -> " << endFrame << " ( = " | 1160 << startFrame + pfx << " -> " << endFrame << " ( = " |
1201 << endFrame - (startFrame + pfx) << ") at index " | 1161 << endFrame - (startFrame + pfx) << ") at index " |
1202 << off + pfx << " in buffer of size " << m_fftSize | 1162 << off + pfx << " in buffer of size " << m_fftSize |
1203 << " with window size " << m_windowSize | 1163 << " with window size " << m_windowSize |
1204 << " from channel " << m_channel << std::endl; | 1164 << " from channel " << m_channel << std::endl; |
1205 #endif | 1165 #endif |
1206 | 1166 |
1167 QMutexLocker locker(&m_fftBuffersLock); | |
1168 | |
1169 if (tester) { | |
1170 // We are being called from a function that wanted to obtain a | |
1171 // column using an FFTCacheReader. Before calling us, it | |
1172 // checked whether the column was available already, and the | |
1173 // reader reported that it wasn't. Now we test again with the | |
1174 // mutex held, to avoid a race condition in case another | |
1175 // thread has called fillColumn at the same time. | |
1176 if (tester->haveSetColumnAt(x & m_cacheWidthMask)) { | |
1177 return; | |
1178 } | |
1179 } | |
1180 | |
1181 if (!m_fftInput) { | |
1182 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " | |
1183 << "input has already been completed and discarded?" | |
1184 << std::endl; | |
1185 return; | |
1186 } | |
1187 | |
1188 for (int i = 0; i < off; ++i) { | |
1189 m_fftInput[i] = 0.0; | |
1190 } | |
1191 | |
1192 for (int i = 0; i < off; ++i) { | |
1193 m_fftInput[fftsize - i - 1] = 0.0; | |
1194 } | |
1195 | |
1196 if (startFrame < 0) { | |
1197 pfx = -startFrame; | |
1198 for (int i = 0; i < pfx; ++i) { | |
1199 m_fftInput[off + i] = 0.0; | |
1200 } | |
1201 } | |
1202 | |
1207 int count = 0; | 1203 int count = 0; |
1208 if (endFrame > startFrame + pfx) count = endFrame - (startFrame + pfx); | 1204 if (endFrame > startFrame + pfx) count = endFrame - (startFrame + pfx); |
1209 | 1205 |
1210 int got = m_model->getData(m_channel, startFrame + pfx, | 1206 int got = m_model->getData(m_channel, startFrame + pfx, |
1211 count, m_fftInput + off + pfx); | 1207 count, m_fftInput + off + pfx); |
1232 m_fftInput[i + hs] = temp; | 1228 m_fftInput[i + hs] = temp; |
1233 } | 1229 } |
1234 | 1230 |
1235 fftf_execute(m_fftPlan); | 1231 fftf_execute(m_fftPlan); |
1236 | 1232 |
1237 // If our cache uses polar storage, it's more friendly for us to | |
1238 // do the conversion before taking the write mutex | |
1239 | |
1240 float factor = 0.f; | 1233 float factor = 0.f; |
1241 | 1234 |
1242 if (cache->getStorageType() == FFTCache::Compact || | 1235 if (cache->getStorageType() == FFTCache::Compact || |
1243 cache->getStorageType() == FFTCache::Polar) { | 1236 cache->getStorageType() == FFTCache::Polar) { |
1244 | 1237 |
1259 } | 1252 } |
1260 } | 1253 } |
1261 | 1254 |
1262 Profiler subprof("FFTDataServer::fillColumn: set to cache"); | 1255 Profiler subprof("FFTDataServer::fillColumn: set to cache"); |
1263 | 1256 |
1264 { | 1257 if (cache->getStorageType() == FFTCache::Compact || |
1265 MutexLocker locker(lockHeld ? 0 : &m_writeMutex, | 1258 cache->getStorageType() == FFTCache::Polar) { |
1266 "FFTDataServer::fillColumn: m_writeMutex [2]"); | |
1267 | |
1268 if (cache->getStorageType() == FFTCache::Compact || | |
1269 cache->getStorageType() == FFTCache::Polar) { | |
1270 | 1259 |
1271 cache->setColumnAt(col, | 1260 cache->setColumnAt(col, |
1272 m_workbuffer, | 1261 m_workbuffer, |
1273 m_workbuffer + hs + 1, | 1262 m_workbuffer + hs + 1, |
1274 factor); | 1263 factor); |
1275 | 1264 |
1276 } else { | 1265 } else { |
1277 | 1266 |
1278 cache->setColumnAt(col, | 1267 cache->setColumnAt(col, |
1279 m_workbuffer, | 1268 m_workbuffer, |
1280 m_workbuffer + hs + 1); | 1269 m_workbuffer + hs + 1); |
1281 } | |
1282 } | 1270 } |
1283 | 1271 |
1284 if (m_suspended) { | 1272 if (m_suspended) { |
1285 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; | 1273 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; |
1286 // resume(); | 1274 // resume(); |
1374 | 1362 |
1375 if (m_fillFrom > start) { | 1363 if (m_fillFrom > start) { |
1376 | 1364 |
1377 for (size_t f = m_fillFrom; f < end; f += m_server.m_windowIncrement) { | 1365 for (size_t f = m_fillFrom; f < end; f += m_server.m_windowIncrement) { |
1378 | 1366 |
1379 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement), | 1367 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement)); |
1380 false); | |
1381 | 1368 |
1382 if (m_server.m_exiting) return; | 1369 if (m_server.m_exiting) return; |
1383 | 1370 |
1384 while (m_server.m_suspended) { | 1371 while (m_server.m_suspended) { |
1385 #ifdef DEBUG_FFT_SERVER | 1372 #ifdef DEBUG_FFT_SERVER |
1386 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; | 1373 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; |
1387 #endif | 1374 #endif |
1388 { | 1375 MutexLocker locker(&m_server.m_fftBuffersLock, |
1389 MutexLocker locker(&m_server.m_writeMutex, | 1376 "FFTDataServer::run::m_fftBuffersLock [1]"); |
1390 "FFTDataServer::run::m_writeMutex [1]"); | 1377 if (m_server.m_suspended && !m_server.m_exiting) { |
1391 m_server.m_condition.wait(&m_server.m_writeMutex, 10000); | 1378 m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000); |
1392 } | 1379 } |
1393 #ifdef DEBUG_FFT_SERVER | 1380 #ifdef DEBUG_FFT_SERVER |
1394 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): waited" << std::endl; | 1381 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): waited" << std::endl; |
1395 #endif | 1382 #endif |
1396 if (m_server.m_exiting) return; | 1383 if (m_server.m_exiting) return; |
1415 | 1402 |
1416 size_t baseCompletion = m_completion; | 1403 size_t baseCompletion = m_completion; |
1417 | 1404 |
1418 for (size_t f = start; f < remainingEnd; f += m_server.m_windowIncrement) { | 1405 for (size_t f = start; f < remainingEnd; f += m_server.m_windowIncrement) { |
1419 | 1406 |
1420 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement), | 1407 m_server.fillColumn(int((f - start) / m_server.m_windowIncrement)); |
1421 false); | |
1422 | 1408 |
1423 if (m_server.m_exiting) return; | 1409 if (m_server.m_exiting) return; |
1424 | 1410 |
1425 while (m_server.m_suspended) { | 1411 while (m_server.m_suspended) { |
1426 #ifdef DEBUG_FFT_SERVER | 1412 #ifdef DEBUG_FFT_SERVER |
1427 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; | 1413 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "]): suspended, waiting..." << std::endl; |
1428 #endif | 1414 #endif |
1429 { | 1415 { |
1430 MutexLocker locker(&m_server.m_writeMutex, | 1416 MutexLocker locker(&m_server.m_fftBuffersLock, |
1431 "FFTDataServer::run::m_writeMutex [2]"); | 1417 "FFTDataServer::run::m_fftBuffersLock [2]"); |
1432 m_server.m_condition.wait(&m_server.m_writeMutex, 10000); | 1418 if (m_server.m_suspended && !m_server.m_exiting) { |
1419 m_server.m_condition.wait(&m_server.m_fftBuffersLock, 10000); | |
1420 } | |
1433 } | 1421 } |
1434 if (m_server.m_exiting) return; | 1422 if (m_server.m_exiting) return; |
1435 } | 1423 } |
1436 | 1424 |
1437 if (++counter == updateAt) { | 1425 if (++counter == updateAt) { |