comparison data/fft/FFTDataServer.cpp @ 537:3cc4b7cd2aa5

* Merge from one-fftdataserver-per-fftmodel branch. This bit of reworking (which is not described very accurately by the title of the branch) turns the MatrixFile object into something that either reads or writes, but not both, and separates the FFT file cache reader and writer implementations separately. This allows the FFT data server to have a single thread owning writers and one reader per "customer" thread, and for all locking to be vastly simplified and concentrated in the data server alone (because none of the classes it makes use of is used in more than one thread at a time). The result is faster and more trustworthy code.
author Chris Cannam
date Tue, 27 Jan 2009 13:25:10 +0000
parents beb51f558e9c
children 87aef350f1dc
comparison
equal deleted inserted replaced
536:beb51f558e9c 537:3cc4b7cd2aa5
13 COPYING included with this distribution for more information. 13 COPYING included with this distribution for more information.
14 */ 14 */
15 15
16 #include "FFTDataServer.h" 16 #include "FFTDataServer.h"
17 17
18 #include "FFTFileCache.h" 18 #include "FFTFileCacheReader.h"
19 #include "FFTFileCacheWriter.h"
19 #include "FFTMemoryCache.h" 20 #include "FFTMemoryCache.h"
20 21
21 #include "model/DenseTimeValueModel.h" 22 #include "model/DenseTimeValueModel.h"
22 23
23 #include "system/System.h" 24 #include "system/System.h"
25 #include "base/StorageAdviser.h" 26 #include "base/StorageAdviser.h"
26 #include "base/Exceptions.h" 27 #include "base/Exceptions.h"
27 #include "base/Profiler.h" 28 #include "base/Profiler.h"
28 #include "base/Thread.h" // for debug mutex locker 29 #include "base/Thread.h" // for debug mutex locker
29 30
30 //#define DEBUG_FFT_SERVER 1 31 #include <QWriteLocker>
32
33 #define DEBUG_FFT_SERVER 1
31 //#define DEBUG_FFT_SERVER_FILL 1 34 //#define DEBUG_FFT_SERVER_FILL 1
32 35
33 #ifdef DEBUG_FFT_SERVER_FILL 36 #ifdef DEBUG_FFT_SERVER_FILL
34 #ifndef DEBUG_FFT_SERVER 37 #ifndef DEBUG_FFT_SERVER
35 #define DEBUG_FFT_SERVER 1 38 #define DEBUG_FFT_SERVER 1
324 if (i->second.first == server) { 327 if (i->second.first == server) {
325 if (i->second.second == 0) { 328 if (i->second.second == 0) {
326 std::cerr << "ERROR: FFTDataServer::releaseInstance(" 329 std::cerr << "ERROR: FFTDataServer::releaseInstance("
327 << server << "): instance not allocated" << std::endl; 330 << server << "): instance not allocated" << std::endl;
328 } else if (--i->second.second == 0) { 331 } else if (--i->second.second == 0) {
332 /*!!!
329 if (server->m_lastUsedCache == -1) { // never used 333 if (server->m_lastUsedCache == -1) { // never used
330 #ifdef DEBUG_FFT_SERVER 334 #ifdef DEBUG_FFT_SERVER
331 std::cerr << "FFTDataServer::releaseInstance: instance " 335 std::cerr << "FFTDataServer::releaseInstance: instance "
332 << server << " has never been used, erasing" 336 << server << " has never been used, erasing"
333 << std::endl; 337 << std::endl;
334 #endif 338 #endif
335 delete server; 339 delete server;
336 m_servers.erase(i); 340 m_servers.erase(i);
337 } else { 341 } else {
342 */
338 #ifdef DEBUG_FFT_SERVER 343 #ifdef DEBUG_FFT_SERVER
339 std::cerr << "FFTDataServer::releaseInstance: instance " 344 std::cerr << "FFTDataServer::releaseInstance: instance "
340 << server << " no longer in use, marking for possible collection" 345 << server << " no longer in use, marking for possible collection"
341 << std::endl; 346 << std::endl;
342 #endif 347 #endif
351 } 356 }
352 } 357 }
353 if (!found) m_releasedServers.push_back(server); 358 if (!found) m_releasedServers.push_back(server);
354 server->suspend(); 359 server->suspend();
355 purgeLimbo(); 360 purgeLimbo();
356 } 361 //!!! }
357 } else { 362 } else {
358 #ifdef DEBUG_FFT_SERVER 363 #ifdef DEBUG_FFT_SERVER
359 std::cerr << "FFTDataServer::releaseInstance: instance " 364 std::cerr << "FFTDataServer::releaseInstance: instance "
360 << server << " now has refcount " << i->second.second 365 << server << " now has refcount " << i->second.second
361 << std::endl; 366 << std::endl;
495 m_width(0), 500 m_width(0),
496 m_height(0), 501 m_height(0),
497 m_cacheWidth(0), 502 m_cacheWidth(0),
498 m_cacheWidthPower(0), 503 m_cacheWidthPower(0),
499 m_cacheWidthMask(0), 504 m_cacheWidthMask(0),
500 m_lastUsedCache(-1),
501 m_criteria(criteria), 505 m_criteria(criteria),
502 m_fftInput(0), 506 m_fftInput(0),
503 m_exiting(false), 507 m_exiting(false),
504 m_suspended(true), //!!! or false? 508 m_suspended(true), //!!! or false?
505 m_fillThread(0) 509 m_fillThread(0)
635 Profiler profiler("FFTDataServer::suspend", false); 639 Profiler profiler("FFTDataServer::suspend", false);
636 640
637 MutexLocker locker(&m_writeMutex, 641 MutexLocker locker(&m_writeMutex,
638 "FFTDataServer::suspend::m_writeMutex"); 642 "FFTDataServer::suspend::m_writeMutex");
639 m_suspended = true; 643 m_suspended = true;
640 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) {
641 if (*i) (*i)->suspend();
642 }
643 } 644 }
644 645
645 void 646 void
646 FFTDataServer::suspendWrites() 647 FFTDataServer::suspendWrites()
647 { 648 {
719 720
720 memoryCache = false; 721 memoryCache = false;
721 722
722 if ((recommendation & StorageAdviser::UseMemory) || 723 if ((recommendation & StorageAdviser::UseMemory) ||
723 (recommendation & StorageAdviser::PreferMemory)) { 724 (recommendation & StorageAdviser::PreferMemory)) {
724 memoryCache = true; 725 //!!! memoryCache = true;
725 } 726 }
726 727
727 compactCache = canCompact && 728 compactCache = canCompact &&
728 (recommendation & StorageAdviser::ConserveSpace); 729 (recommendation & StorageAdviser::ConserveSpace);
729 730
732 733
733 std::cerr << "Width " << w << " of " << m_width << ", height " << h << ", size " << w * h << std::endl; 734 std::cerr << "Width " << w << " of " << m_width << ", height " << h << ", size " << w * h << std::endl;
734 #endif 735 #endif
735 } 736 }
736 737
737 FFTCache * 738 bool
738 FFTDataServer::getCacheAux(size_t c) 739 FFTDataServer::makeCache(int c)
739 { 740 {
740 Profiler profiler("FFTDataServer::getCacheAux", false); 741 QWriteLocker locker(&m_cacheVectorLock);
741 #ifdef DEBUG_FFT_SERVER
742 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "])::getCacheAux" << std::endl;
743 #endif
744
745 MutexLocker locker(&m_writeMutex,
746 "FFTDataServer::getCacheAux::m_writeMutex");
747
748 if (m_lastUsedCache == -1) {
749 m_fillThread->start();
750 }
751
752 if (int(c) != m_lastUsedCache) {
753
754 #ifdef DEBUG_FFT_SERVER
755 std::cerr << "switch from " << m_lastUsedCache << " to " << c << std::endl;
756 #endif
757
758 for (IntQueue::iterator i = m_dormantCaches.begin();
759 i != m_dormantCaches.end(); ++i) {
760 if (*i == int(c)) {
761 m_dormantCaches.erase(i);
762 break;
763 }
764 }
765
766 if (m_lastUsedCache >= 0) {
767 bool inDormant = false;
768 for (size_t i = 0; i < m_dormantCaches.size(); ++i) {
769 if (m_dormantCaches[i] == m_lastUsedCache) {
770 inDormant = true;
771 break;
772 }
773 }
774 if (!inDormant) {
775 m_dormantCaches.push_back(m_lastUsedCache);
776 }
777 while (m_dormantCaches.size() > 4) {
778 int dc = m_dormantCaches.front();
779 m_dormantCaches.pop_front();
780 m_caches[dc]->suspend();
781 }
782 }
783 }
784 742
785 if (m_caches[c]) { 743 if (m_caches[c]) {
786 m_lastUsedCache = c; 744 // someone else must have created the cache between our
787 return m_caches[c]; 745 // testing for it and taking the write lock
788 } 746 return true;
747 }
748
749 CacheBlock *cb = new CacheBlock;
789 750
790 QString name = QString("%1-%2").arg(m_fileBaseName).arg(c); 751 QString name = QString("%1-%2").arg(m_fileBaseName).arg(c);
791
792 FFTCache *cache = 0;
793 752
794 size_t width = m_cacheWidth; 753 size_t width = m_cacheWidth;
795 if (c * m_cacheWidth + width > m_width) { 754 if (c * m_cacheWidth + width > m_width) {
796 width = m_width - c * m_cacheWidth; 755 width = m_width - c * m_cacheWidth;
797 } 756 }
799 bool memoryCache = false; 758 bool memoryCache = false;
800 bool compactCache = false; 759 bool compactCache = false;
801 760
802 getStorageAdvice(width, m_height, memoryCache, compactCache); 761 getStorageAdvice(width, m_height, memoryCache, compactCache);
803 762
804 try { 763 bool success = false;
805 764
806 if (memoryCache) { 765 if (memoryCache) {
807 766
808 cache = new FFTMemoryCache 767 try {
809 (compactCache ? FFTMemoryCache::Compact : 768
810 m_polar ? FFTMemoryCache::Polar : 769 cb->memoryCache = new FFTMemoryCache
811 FFTMemoryCache::Rectangular); 770 (compactCache ? FFTCache::Compact :
812 771 m_polar ? FFTCache::Polar :
813 } else { 772 FFTCache::Rectangular,
814 773 width, m_height);
815 cache = new FFTFileCache 774
816 (name, 775 success = true;
817 MatrixFile::ReadWrite, 776
818 compactCache ? FFTFileCache::Compact : 777 } catch (std::bad_alloc) {
819 m_polar ? FFTFileCache::Polar : 778
820 FFTFileCache::Rectangular); 779 delete cb->memoryCache;
821 } 780 cb->memoryCache = 0;
822
823 cache->resize(width, m_height);
824 cache->reset();
825
826 } catch (std::bad_alloc) {
827
828 delete cache;
829 cache = 0;
830
831 if (memoryCache) {
832 781
833 std::cerr << "WARNING: Memory allocation failed when resizing" 782 std::cerr << "WARNING: Memory allocation failed when creating"
834 << " FFT memory cache no. " << c << " to " << width 783 << " FFT memory cache no. " << c << " of " << width
835 << "x" << m_height << " (of total width " << m_width 784 << "x" << m_height << " (of total width " << m_width
836 << "): falling back to disc cache" << std::endl; 785 << "): falling back to disc cache" << std::endl;
837 786
838 try { 787 memoryCache = false;
839 788 }
840 purgeLimbo(0); 789 }
841 790
842 cache = new FFTFileCache(name, 791 if (!memoryCache) {
843 MatrixFile::ReadWrite, 792
844 FFTFileCache::Compact); 793 try {
845 794
846 cache->resize(width, m_height); 795 cb->fileCacheWriter = new FFTFileCacheWriter
847 cache->reset(); 796 (name,
848 797 compactCache ? FFTCache::Compact :
849 } catch (std::bad_alloc) { 798 m_polar ? FFTCache::Polar :
850 799 FFTCache::Rectangular,
851 delete cache; 800 width, m_height);
852 cache = 0; 801
853 } 802 success = true;
854 } 803
855 804 } catch (std::exception e) {
856 if (!cache) { 805
857 std::cerr << "ERROR: Memory allocation failed when resizing" 806 delete cb->fileCacheWriter;
858 << " FFT file cache no. " << c << " to " << width 807 cb->fileCacheWriter = 0;
859 << "x" << m_height << " (of total width " << m_width 808
860 << "): abandoning this cache" << std::endl; 809 std::cerr << "ERROR: Failed to construct disc cache for FFT data: "
861 810 << e.what() << std::endl;
862 throw AllocationFailed("Failed to create or resize an FFT model slice"); 811 }
863 } 812 }
864 } 813
865 814 m_caches[c] = cb;
866 m_caches[c] = cache; 815
867 m_lastUsedCache = c; 816 return success;
868 return cache; 817 }
869 } 818
870 819 bool
820 FFTDataServer::makeCacheReader(int c)
821 {
822 // preconditions: m_caches[c] exists and contains a file writer;
823 // m_cacheVectorLock is not locked by this thread
824 #ifdef DEBUG_FFT_SERVER
825 std::cerr << "FFTDataServer::makeCacheReader(" << c << ")" << std::endl;
826 #endif
827
828 QThread *me = QThread::currentThread();
829 QWriteLocker locker(&m_cacheVectorLock);
830 CacheBlock *cb(m_caches.at(c));
831 if (!cb || !cb->fileCacheWriter) return false;
832
833 try {
834
835 cb->fileCacheReader[me] = new FFTFileCacheReader(cb->fileCacheWriter);
836
837 } catch (std::exception e) {
838
839 delete cb->fileCacheReader[me];
840 cb->fileCacheReader.erase(me);
841
842 std::cerr << "ERROR: Failed to construct disc cache reader for FFT data: "
843 << e.what() << std::endl;
844 return false;
845 }
846
847 // erase a reader that looks like it may no longer going to be
848 // used by this thread for a while (leaving alone the current
849 // and previous cache readers)
850 int deleteCandidate = c - 2;
851 if (deleteCandidate < 0) deleteCandidate = c + 2;
852 if (deleteCandidate >= m_caches.size()) {
853 return true;
854 }
855
856 cb = m_caches.at(deleteCandidate);
857 if (cb && cb->fileCacheReader.find(me) != cb->fileCacheReader.end()) {
858 #ifdef DEBUG_FFT_SERVER
859 std::cerr << "FFTDataServer::makeCacheReader: Deleting probably unpopular reader " << deleteCandidate << " for this thread (as I create reader " << c << ")" << std::endl;
860 #endif
861 delete cb->fileCacheReader[me];
862 cb->fileCacheReader.erase(me);
863 }
864
865 return true;
866 }
867
871 float 868 float
872 FFTDataServer::getMagnitudeAt(size_t x, size_t y) 869 FFTDataServer::getMagnitudeAt(size_t x, size_t y)
873 { 870 {
874 Profiler profiler("FFTDataServer::getMagnitudeAt", false); 871 Profiler profiler("FFTDataServer::getMagnitudeAt", false);
875 872
876 if (x >= m_width || y >= m_height) return 0; 873 if (x >= m_width || y >= m_height) return 0;
877 874
878 size_t col; 875 size_t col;
879 FFTCache *cache = getCache(x, col); 876 FFTCacheReader *cache = getCacheReader(x, col);
880 if (!cache) return 0; 877 if (!cache) return 0;
881 878
879 //!!! n.b. can throw
882 if (!cache->haveSetColumnAt(col)) { 880 if (!cache->haveSetColumnAt(col)) {
883 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); 881 Profiler profiler("FFTDataServer::getMagnitudeAt: filling");
884 #ifdef DEBUG_FFT_SERVER 882 #ifdef DEBUG_FFT_SERVER
885 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" 883 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn("
886 << x << ")" << std::endl; 884 << x << ")" << std::endl;
906 else if (minbin + count * step > m_height) { 904 else if (minbin + count * step > m_height) {
907 count = (m_height - minbin) / step; 905 count = (m_height - minbin) / step;
908 } 906 }
909 907
910 size_t col; 908 size_t col;
911 FFTCache *cache = getCache(x, col); 909 FFTCacheReader *cache = getCacheReader(x, col);
912 if (!cache) return false; 910 if (!cache) return false;
913 911
912 //!!! n.b. can throw
914 if (!cache->haveSetColumnAt(col)) { 913 if (!cache->haveSetColumnAt(col)) {
915 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); 914 Profiler profiler("FFTDataServer::getMagnitudesAt: filling");
916 MutexLocker locker(&m_writeMutex, 915 MutexLocker locker(&m_writeMutex,
917 "FFTDataServer::getMagnitudesAt: m_writeMutex"); 916 "FFTDataServer::getMagnitudesAt: m_writeMutex");
918 fillColumn(x, true); 917 fillColumn(x, true);
929 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt", false); 928 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt", false);
930 929
931 if (x >= m_width || y >= m_height) return 0; 930 if (x >= m_width || y >= m_height) return 0;
932 931
933 size_t col; 932 size_t col;
934 FFTCache *cache = getCache(x, col); 933 FFTCacheReader *cache = getCacheReader(x, col);
935 if (!cache) return 0; 934 if (!cache) return 0;
936 935
936 //!!! n.b. can throw
937 if (!cache->haveSetColumnAt(col)) { 937 if (!cache->haveSetColumnAt(col)) {
938 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); 938 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling");
939 // hold mutex so that write thread doesn't mess with class 939 // hold mutex so that write thread doesn't mess with class
940 // member data in fillColumn 940 // member data in fillColumn
941 MutexLocker locker(&m_writeMutex, 941 MutexLocker locker(&m_writeMutex,
957 else if (minbin + count * step > m_height) { 957 else if (minbin + count * step > m_height) {
958 count = (m_height - minbin) / step; 958 count = (m_height - minbin) / step;
959 } 959 }
960 960
961 size_t col; 961 size_t col;
962 FFTCache *cache = getCache(x, col); 962 FFTCacheReader *cache = getCacheReader(x, col);
963 if (!cache) return false; 963 if (!cache) return false;
964 964
965 //!!! n.b. can throw
965 if (!cache->haveSetColumnAt(col)) { 966 if (!cache->haveSetColumnAt(col)) {
966 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); 967 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling");
967 MutexLocker locker(&m_writeMutex, 968 MutexLocker locker(&m_writeMutex,
968 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex"); 969 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex");
969 fillColumn(x, true); 970 fillColumn(x, true);
982 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt", false); 983 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt", false);
983 984
984 if (x >= m_width) return 0; 985 if (x >= m_width) return 0;
985 986
986 size_t col; 987 size_t col;
987 FFTCache *cache = getCache(x, col); 988 FFTCacheReader *cache = getCacheReader(x, col);
988 if (!cache) return 0; 989 if (!cache) return 0;
989 990
991 //!!! n.b. can throw
990 if (!cache->haveSetColumnAt(col)) { 992 if (!cache->haveSetColumnAt(col)) {
991 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); 993 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling");
992 // hold mutex so that write thread doesn't mess with class 994 // hold mutex so that write thread doesn't mess with class
993 // member data in fillColumn 995 // member data in fillColumn
994 MutexLocker locker(&m_writeMutex, 996 MutexLocker locker(&m_writeMutex,
1004 Profiler profiler("FFTDataServer::getPhaseAt", false); 1006 Profiler profiler("FFTDataServer::getPhaseAt", false);
1005 1007
1006 if (x >= m_width || y >= m_height) return 0; 1008 if (x >= m_width || y >= m_height) return 0;
1007 1009
1008 size_t col; 1010 size_t col;
1009 FFTCache *cache = getCache(x, col); 1011 FFTCacheReader *cache = getCacheReader(x, col);
1010 if (!cache) return 0; 1012 if (!cache) return 0;
1011 1013
1014 //!!! n.b. can throw
1012 if (!cache->haveSetColumnAt(col)) { 1015 if (!cache->haveSetColumnAt(col)) {
1013 Profiler profiler("FFTDataServer::getPhaseAt: filling"); 1016 Profiler profiler("FFTDataServer::getPhaseAt: filling");
1014 // hold mutex so that write thread doesn't mess with class 1017 // hold mutex so that write thread doesn't mess with class
1015 // member data in fillColumn 1018 // member data in fillColumn
1016 MutexLocker locker(&m_writeMutex, 1019 MutexLocker locker(&m_writeMutex,
1032 else if (minbin + count * step > m_height) { 1035 else if (minbin + count * step > m_height) {
1033 count = (m_height - minbin) / step; 1036 count = (m_height - minbin) / step;
1034 } 1037 }
1035 1038
1036 size_t col; 1039 size_t col;
1037 FFTCache *cache = getCache(x, col); 1040 FFTCacheReader *cache = getCacheReader(x, col);
1038 if (!cache) return false; 1041 if (!cache) return false;
1039 1042
1043 //!!! n.b. can throw
1040 if (!cache->haveSetColumnAt(col)) { 1044 if (!cache->haveSetColumnAt(col)) {
1041 Profiler profiler("FFTDataServer::getPhasesAt: filling"); 1045 Profiler profiler("FFTDataServer::getPhasesAt: filling");
1042 MutexLocker locker(&m_writeMutex, 1046 MutexLocker locker(&m_writeMutex,
1043 "FFTDataServer::getPhasesAt: m_writeMutex"); 1047 "FFTDataServer::getPhasesAt: m_writeMutex");
1044 fillColumn(x, true); 1048 fillColumn(x, true);
1061 imaginary = 0; 1065 imaginary = 0;
1062 return; 1066 return;
1063 } 1067 }
1064 1068
1065 size_t col; 1069 size_t col;
1066 FFTCache *cache = getCache(x, col); 1070 FFTCacheReader *cache = getCacheReader(x, col);
1067 1071
1068 if (!cache) { 1072 if (!cache) {
1069 real = 0; 1073 real = 0;
1070 imaginary = 0; 1074 imaginary = 0;
1071 return; 1075 return;
1072 } 1076 }
1073 1077
1078 //!!! n.b. can throw
1074 if (!cache->haveSetColumnAt(col)) { 1079 if (!cache->haveSetColumnAt(col)) {
1075 Profiler profiler("FFTDataServer::getValuesAt: filling"); 1080 Profiler profiler("FFTDataServer::getValuesAt: filling");
1076 #ifdef DEBUG_FFT_SERVER 1081 #ifdef DEBUG_FFT_SERVER
1077 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; 1082 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl;
1078 #endif 1083 #endif
1092 Profiler profiler("FFTDataServer::isColumnReady", false); 1097 Profiler profiler("FFTDataServer::isColumnReady", false);
1093 1098
1094 if (x >= m_width) return true; 1099 if (x >= m_width) return true;
1095 1100
1096 if (!haveCache(x)) { 1101 if (!haveCache(x)) {
1102 /*!!!
1097 if (m_lastUsedCache == -1) { 1103 if (m_lastUsedCache == -1) {
1098 if (m_suspended) { 1104 if (m_suspended) {
1099 std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl; 1105 std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl;
1100 resume(); 1106 resume();
1101 } 1107 }
1102 m_fillThread->start(); 1108 m_fillThread->start();
1103 } 1109 }
1110 */
1104 return false; 1111 return false;
1105 } 1112 }
1106 1113
1107 size_t col; 1114 size_t col;
1108 FFTCache *cache = getCache(x, col); 1115 FFTCacheReader *cache = getCacheReader(x, col);
1109 if (!cache) return true; 1116 if (!cache) return true;
1110 1117
1118 //!!! n.b. can throw
1111 return cache->haveSetColumnAt(col); 1119 return cache->haveSetColumnAt(col);
1112 } 1120 }
1113 1121
1114 void 1122 void
1115 FFTDataServer::fillColumn(size_t x, bool lockHeld) 1123 FFTDataServer::fillColumn(size_t x, bool lockHeld)
1131 1139
1132 if (x >= m_width) { 1140 if (x >= m_width) {
1133 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " 1141 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): "
1134 << "x > width (" << x << " > " << m_width << ")" 1142 << "x > width (" << x << " > " << m_width << ")"
1135 << std::endl; 1143 << std::endl;
1136 // abort(); //!!!
1137 return; 1144 return;
1138 } 1145 }
1139 1146
1140 size_t col; 1147 size_t col;
1141 #ifdef DEBUG_FFT_SERVER_FILL 1148 #ifdef DEBUG_FFT_SERVER_FILL
1142 std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl; 1149 std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl;
1143 #endif 1150 #endif
1144 FFTCache *cache = getCache(x, col); 1151 FFTCacheWriter *cache = getCacheWriter(x, col);
1145 if (!cache) return; 1152 if (!cache) return;
1146
1147 {
1148 MutexLocker locker(lockHeld ? 0 : &m_writeMutex,
1149 "FFTDataServer::fillColumn::m_writeMutex [1]");
1150
1151 if (cache->haveSetColumnAt(col)) return;
1152 }
1153 1153
1154 int winsize = m_windowSize; 1154 int winsize = m_windowSize;
1155 int fftsize = m_fftSize; 1155 int fftsize = m_fftSize;
1156 int hs = fftsize/2; 1156 int hs = fftsize/2;
1157 1157
1268 if (m_suspended) { 1268 if (m_suspended) {
1269 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; 1269 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl;
1270 // resume(); 1270 // resume();
1271 } 1271 }
1272 } 1272 }
1273
1274 void
1275 FFTDataServer::fillComplete()
1276 {
1277 for (int i = 0; i < int(m_caches.size()); ++i) {
1278 if (m_caches[i]->memoryCache) {
1279 m_caches[i]->memoryCache->allColumnsWritten();
1280 }
1281 if (m_caches[i]->fileCacheWriter) {
1282 m_caches[i]->fileCacheWriter->allColumnsWritten();
1283 }
1284 }
1285 }
1273 1286
1274 size_t 1287 size_t
1275 FFTDataServer::getFillCompletion() const 1288 FFTDataServer::getFillCompletion() const
1276 { 1289 {
1277 if (m_fillThread) return m_fillThread->getCompletion(); 1290 if (m_fillThread) return m_fillThread->getCompletion();
1416 if (updateAt > maxUpdateAt) updateAt = maxUpdateAt; 1429 if (updateAt > maxUpdateAt) updateAt = maxUpdateAt;
1417 } 1430 }
1418 } 1431 }
1419 } 1432 }
1420 1433
1434 m_server.fillComplete();
1421 m_completion = 100; 1435 m_completion = 100;
1422 m_extent = end; 1436 m_extent = end;
1423 } 1437
1424 1438 #ifdef DEBUG_FFT_SERVER
1439 std::cerr << "FFTDataServer::FillThread::run exiting" << std::endl;
1440 #endif
1441 }
1442