Mercurial > hg > svcore
comparison data/fft/FFTDataServer.cpp @ 537:3cc4b7cd2aa5
* Merge from one-fftdataserver-per-fftmodel branch. This bit of
reworking (which is not described very accurately by the title of
the branch) turns the MatrixFile object into something that either
reads or writes, but not both, and separates the FFT file cache
reader and writer implementations separately. This allows the
FFT data server to have a single thread owning writers and one reader
per "customer" thread, and for all locking to be vastly simplified
and concentrated in the data server alone (because none of the
classes it makes use of is used in more than one thread at a time).
The result is faster and more trustworthy code.
author | Chris Cannam |
---|---|
date | Tue, 27 Jan 2009 13:25:10 +0000 |
parents | beb51f558e9c |
children | 87aef350f1dc |
comparison
equal
deleted
inserted
replaced
536:beb51f558e9c | 537:3cc4b7cd2aa5 |
---|---|
13 COPYING included with this distribution for more information. | 13 COPYING included with this distribution for more information. |
14 */ | 14 */ |
15 | 15 |
16 #include "FFTDataServer.h" | 16 #include "FFTDataServer.h" |
17 | 17 |
18 #include "FFTFileCache.h" | 18 #include "FFTFileCacheReader.h" |
19 #include "FFTFileCacheWriter.h" | |
19 #include "FFTMemoryCache.h" | 20 #include "FFTMemoryCache.h" |
20 | 21 |
21 #include "model/DenseTimeValueModel.h" | 22 #include "model/DenseTimeValueModel.h" |
22 | 23 |
23 #include "system/System.h" | 24 #include "system/System.h" |
25 #include "base/StorageAdviser.h" | 26 #include "base/StorageAdviser.h" |
26 #include "base/Exceptions.h" | 27 #include "base/Exceptions.h" |
27 #include "base/Profiler.h" | 28 #include "base/Profiler.h" |
28 #include "base/Thread.h" // for debug mutex locker | 29 #include "base/Thread.h" // for debug mutex locker |
29 | 30 |
30 //#define DEBUG_FFT_SERVER 1 | 31 #include <QWriteLocker> |
32 | |
33 #define DEBUG_FFT_SERVER 1 | |
31 //#define DEBUG_FFT_SERVER_FILL 1 | 34 //#define DEBUG_FFT_SERVER_FILL 1 |
32 | 35 |
33 #ifdef DEBUG_FFT_SERVER_FILL | 36 #ifdef DEBUG_FFT_SERVER_FILL |
34 #ifndef DEBUG_FFT_SERVER | 37 #ifndef DEBUG_FFT_SERVER |
35 #define DEBUG_FFT_SERVER 1 | 38 #define DEBUG_FFT_SERVER 1 |
324 if (i->second.first == server) { | 327 if (i->second.first == server) { |
325 if (i->second.second == 0) { | 328 if (i->second.second == 0) { |
326 std::cerr << "ERROR: FFTDataServer::releaseInstance(" | 329 std::cerr << "ERROR: FFTDataServer::releaseInstance(" |
327 << server << "): instance not allocated" << std::endl; | 330 << server << "): instance not allocated" << std::endl; |
328 } else if (--i->second.second == 0) { | 331 } else if (--i->second.second == 0) { |
332 /*!!! | |
329 if (server->m_lastUsedCache == -1) { // never used | 333 if (server->m_lastUsedCache == -1) { // never used |
330 #ifdef DEBUG_FFT_SERVER | 334 #ifdef DEBUG_FFT_SERVER |
331 std::cerr << "FFTDataServer::releaseInstance: instance " | 335 std::cerr << "FFTDataServer::releaseInstance: instance " |
332 << server << " has never been used, erasing" | 336 << server << " has never been used, erasing" |
333 << std::endl; | 337 << std::endl; |
334 #endif | 338 #endif |
335 delete server; | 339 delete server; |
336 m_servers.erase(i); | 340 m_servers.erase(i); |
337 } else { | 341 } else { |
342 */ | |
338 #ifdef DEBUG_FFT_SERVER | 343 #ifdef DEBUG_FFT_SERVER |
339 std::cerr << "FFTDataServer::releaseInstance: instance " | 344 std::cerr << "FFTDataServer::releaseInstance: instance " |
340 << server << " no longer in use, marking for possible collection" | 345 << server << " no longer in use, marking for possible collection" |
341 << std::endl; | 346 << std::endl; |
342 #endif | 347 #endif |
351 } | 356 } |
352 } | 357 } |
353 if (!found) m_releasedServers.push_back(server); | 358 if (!found) m_releasedServers.push_back(server); |
354 server->suspend(); | 359 server->suspend(); |
355 purgeLimbo(); | 360 purgeLimbo(); |
356 } | 361 //!!! } |
357 } else { | 362 } else { |
358 #ifdef DEBUG_FFT_SERVER | 363 #ifdef DEBUG_FFT_SERVER |
359 std::cerr << "FFTDataServer::releaseInstance: instance " | 364 std::cerr << "FFTDataServer::releaseInstance: instance " |
360 << server << " now has refcount " << i->second.second | 365 << server << " now has refcount " << i->second.second |
361 << std::endl; | 366 << std::endl; |
495 m_width(0), | 500 m_width(0), |
496 m_height(0), | 501 m_height(0), |
497 m_cacheWidth(0), | 502 m_cacheWidth(0), |
498 m_cacheWidthPower(0), | 503 m_cacheWidthPower(0), |
499 m_cacheWidthMask(0), | 504 m_cacheWidthMask(0), |
500 m_lastUsedCache(-1), | |
501 m_criteria(criteria), | 505 m_criteria(criteria), |
502 m_fftInput(0), | 506 m_fftInput(0), |
503 m_exiting(false), | 507 m_exiting(false), |
504 m_suspended(true), //!!! or false? | 508 m_suspended(true), //!!! or false? |
505 m_fillThread(0) | 509 m_fillThread(0) |
635 Profiler profiler("FFTDataServer::suspend", false); | 639 Profiler profiler("FFTDataServer::suspend", false); |
636 | 640 |
637 MutexLocker locker(&m_writeMutex, | 641 MutexLocker locker(&m_writeMutex, |
638 "FFTDataServer::suspend::m_writeMutex"); | 642 "FFTDataServer::suspend::m_writeMutex"); |
639 m_suspended = true; | 643 m_suspended = true; |
640 for (CacheVector::iterator i = m_caches.begin(); i != m_caches.end(); ++i) { | |
641 if (*i) (*i)->suspend(); | |
642 } | |
643 } | 644 } |
644 | 645 |
645 void | 646 void |
646 FFTDataServer::suspendWrites() | 647 FFTDataServer::suspendWrites() |
647 { | 648 { |
719 | 720 |
720 memoryCache = false; | 721 memoryCache = false; |
721 | 722 |
722 if ((recommendation & StorageAdviser::UseMemory) || | 723 if ((recommendation & StorageAdviser::UseMemory) || |
723 (recommendation & StorageAdviser::PreferMemory)) { | 724 (recommendation & StorageAdviser::PreferMemory)) { |
724 memoryCache = true; | 725 //!!! memoryCache = true; |
725 } | 726 } |
726 | 727 |
727 compactCache = canCompact && | 728 compactCache = canCompact && |
728 (recommendation & StorageAdviser::ConserveSpace); | 729 (recommendation & StorageAdviser::ConserveSpace); |
729 | 730 |
732 | 733 |
733 std::cerr << "Width " << w << " of " << m_width << ", height " << h << ", size " << w * h << std::endl; | 734 std::cerr << "Width " << w << " of " << m_width << ", height " << h << ", size " << w * h << std::endl; |
734 #endif | 735 #endif |
735 } | 736 } |
736 | 737 |
737 FFTCache * | 738 bool |
738 FFTDataServer::getCacheAux(size_t c) | 739 FFTDataServer::makeCache(int c) |
739 { | 740 { |
740 Profiler profiler("FFTDataServer::getCacheAux", false); | 741 QWriteLocker locker(&m_cacheVectorLock); |
741 #ifdef DEBUG_FFT_SERVER | |
742 std::cerr << "FFTDataServer(" << this << " [" << (void *)QThread::currentThreadId() << "])::getCacheAux" << std::endl; | |
743 #endif | |
744 | |
745 MutexLocker locker(&m_writeMutex, | |
746 "FFTDataServer::getCacheAux::m_writeMutex"); | |
747 | |
748 if (m_lastUsedCache == -1) { | |
749 m_fillThread->start(); | |
750 } | |
751 | |
752 if (int(c) != m_lastUsedCache) { | |
753 | |
754 #ifdef DEBUG_FFT_SERVER | |
755 std::cerr << "switch from " << m_lastUsedCache << " to " << c << std::endl; | |
756 #endif | |
757 | |
758 for (IntQueue::iterator i = m_dormantCaches.begin(); | |
759 i != m_dormantCaches.end(); ++i) { | |
760 if (*i == int(c)) { | |
761 m_dormantCaches.erase(i); | |
762 break; | |
763 } | |
764 } | |
765 | |
766 if (m_lastUsedCache >= 0) { | |
767 bool inDormant = false; | |
768 for (size_t i = 0; i < m_dormantCaches.size(); ++i) { | |
769 if (m_dormantCaches[i] == m_lastUsedCache) { | |
770 inDormant = true; | |
771 break; | |
772 } | |
773 } | |
774 if (!inDormant) { | |
775 m_dormantCaches.push_back(m_lastUsedCache); | |
776 } | |
777 while (m_dormantCaches.size() > 4) { | |
778 int dc = m_dormantCaches.front(); | |
779 m_dormantCaches.pop_front(); | |
780 m_caches[dc]->suspend(); | |
781 } | |
782 } | |
783 } | |
784 | 742 |
785 if (m_caches[c]) { | 743 if (m_caches[c]) { |
786 m_lastUsedCache = c; | 744 // someone else must have created the cache between our |
787 return m_caches[c]; | 745 // testing for it and taking the write lock |
788 } | 746 return true; |
747 } | |
748 | |
749 CacheBlock *cb = new CacheBlock; | |
789 | 750 |
790 QString name = QString("%1-%2").arg(m_fileBaseName).arg(c); | 751 QString name = QString("%1-%2").arg(m_fileBaseName).arg(c); |
791 | |
792 FFTCache *cache = 0; | |
793 | 752 |
794 size_t width = m_cacheWidth; | 753 size_t width = m_cacheWidth; |
795 if (c * m_cacheWidth + width > m_width) { | 754 if (c * m_cacheWidth + width > m_width) { |
796 width = m_width - c * m_cacheWidth; | 755 width = m_width - c * m_cacheWidth; |
797 } | 756 } |
799 bool memoryCache = false; | 758 bool memoryCache = false; |
800 bool compactCache = false; | 759 bool compactCache = false; |
801 | 760 |
802 getStorageAdvice(width, m_height, memoryCache, compactCache); | 761 getStorageAdvice(width, m_height, memoryCache, compactCache); |
803 | 762 |
804 try { | 763 bool success = false; |
805 | 764 |
806 if (memoryCache) { | 765 if (memoryCache) { |
807 | 766 |
808 cache = new FFTMemoryCache | 767 try { |
809 (compactCache ? FFTMemoryCache::Compact : | 768 |
810 m_polar ? FFTMemoryCache::Polar : | 769 cb->memoryCache = new FFTMemoryCache |
811 FFTMemoryCache::Rectangular); | 770 (compactCache ? FFTCache::Compact : |
812 | 771 m_polar ? FFTCache::Polar : |
813 } else { | 772 FFTCache::Rectangular, |
814 | 773 width, m_height); |
815 cache = new FFTFileCache | 774 |
816 (name, | 775 success = true; |
817 MatrixFile::ReadWrite, | 776 |
818 compactCache ? FFTFileCache::Compact : | 777 } catch (std::bad_alloc) { |
819 m_polar ? FFTFileCache::Polar : | 778 |
820 FFTFileCache::Rectangular); | 779 delete cb->memoryCache; |
821 } | 780 cb->memoryCache = 0; |
822 | |
823 cache->resize(width, m_height); | |
824 cache->reset(); | |
825 | |
826 } catch (std::bad_alloc) { | |
827 | |
828 delete cache; | |
829 cache = 0; | |
830 | |
831 if (memoryCache) { | |
832 | 781 |
833 std::cerr << "WARNING: Memory allocation failed when resizing" | 782 std::cerr << "WARNING: Memory allocation failed when creating" |
834 << " FFT memory cache no. " << c << " to " << width | 783 << " FFT memory cache no. " << c << " of " << width |
835 << "x" << m_height << " (of total width " << m_width | 784 << "x" << m_height << " (of total width " << m_width |
836 << "): falling back to disc cache" << std::endl; | 785 << "): falling back to disc cache" << std::endl; |
837 | 786 |
838 try { | 787 memoryCache = false; |
839 | 788 } |
840 purgeLimbo(0); | 789 } |
841 | 790 |
842 cache = new FFTFileCache(name, | 791 if (!memoryCache) { |
843 MatrixFile::ReadWrite, | 792 |
844 FFTFileCache::Compact); | 793 try { |
845 | 794 |
846 cache->resize(width, m_height); | 795 cb->fileCacheWriter = new FFTFileCacheWriter |
847 cache->reset(); | 796 (name, |
848 | 797 compactCache ? FFTCache::Compact : |
849 } catch (std::bad_alloc) { | 798 m_polar ? FFTCache::Polar : |
850 | 799 FFTCache::Rectangular, |
851 delete cache; | 800 width, m_height); |
852 cache = 0; | 801 |
853 } | 802 success = true; |
854 } | 803 |
855 | 804 } catch (std::exception e) { |
856 if (!cache) { | 805 |
857 std::cerr << "ERROR: Memory allocation failed when resizing" | 806 delete cb->fileCacheWriter; |
858 << " FFT file cache no. " << c << " to " << width | 807 cb->fileCacheWriter = 0; |
859 << "x" << m_height << " (of total width " << m_width | 808 |
860 << "): abandoning this cache" << std::endl; | 809 std::cerr << "ERROR: Failed to construct disc cache for FFT data: " |
861 | 810 << e.what() << std::endl; |
862 throw AllocationFailed("Failed to create or resize an FFT model slice"); | 811 } |
863 } | 812 } |
864 } | 813 |
865 | 814 m_caches[c] = cb; |
866 m_caches[c] = cache; | 815 |
867 m_lastUsedCache = c; | 816 return success; |
868 return cache; | 817 } |
869 } | 818 |
870 | 819 bool |
820 FFTDataServer::makeCacheReader(int c) | |
821 { | |
822 // preconditions: m_caches[c] exists and contains a file writer; | |
823 // m_cacheVectorLock is not locked by this thread | |
824 #ifdef DEBUG_FFT_SERVER | |
825 std::cerr << "FFTDataServer::makeCacheReader(" << c << ")" << std::endl; | |
826 #endif | |
827 | |
828 QThread *me = QThread::currentThread(); | |
829 QWriteLocker locker(&m_cacheVectorLock); | |
830 CacheBlock *cb(m_caches.at(c)); | |
831 if (!cb || !cb->fileCacheWriter) return false; | |
832 | |
833 try { | |
834 | |
835 cb->fileCacheReader[me] = new FFTFileCacheReader(cb->fileCacheWriter); | |
836 | |
837 } catch (std::exception e) { | |
838 | |
839 delete cb->fileCacheReader[me]; | |
840 cb->fileCacheReader.erase(me); | |
841 | |
842 std::cerr << "ERROR: Failed to construct disc cache reader for FFT data: " | |
843 << e.what() << std::endl; | |
844 return false; | |
845 } | |
846 | |
847 // erase a reader that looks like it may no longer going to be | |
848 // used by this thread for a while (leaving alone the current | |
849 // and previous cache readers) | |
850 int deleteCandidate = c - 2; | |
851 if (deleteCandidate < 0) deleteCandidate = c + 2; | |
852 if (deleteCandidate >= m_caches.size()) { | |
853 return true; | |
854 } | |
855 | |
856 cb = m_caches.at(deleteCandidate); | |
857 if (cb && cb->fileCacheReader.find(me) != cb->fileCacheReader.end()) { | |
858 #ifdef DEBUG_FFT_SERVER | |
859 std::cerr << "FFTDataServer::makeCacheReader: Deleting probably unpopular reader " << deleteCandidate << " for this thread (as I create reader " << c << ")" << std::endl; | |
860 #endif | |
861 delete cb->fileCacheReader[me]; | |
862 cb->fileCacheReader.erase(me); | |
863 } | |
864 | |
865 return true; | |
866 } | |
867 | |
871 float | 868 float |
872 FFTDataServer::getMagnitudeAt(size_t x, size_t y) | 869 FFTDataServer::getMagnitudeAt(size_t x, size_t y) |
873 { | 870 { |
874 Profiler profiler("FFTDataServer::getMagnitudeAt", false); | 871 Profiler profiler("FFTDataServer::getMagnitudeAt", false); |
875 | 872 |
876 if (x >= m_width || y >= m_height) return 0; | 873 if (x >= m_width || y >= m_height) return 0; |
877 | 874 |
878 size_t col; | 875 size_t col; |
879 FFTCache *cache = getCache(x, col); | 876 FFTCacheReader *cache = getCacheReader(x, col); |
880 if (!cache) return 0; | 877 if (!cache) return 0; |
881 | 878 |
879 //!!! n.b. can throw | |
882 if (!cache->haveSetColumnAt(col)) { | 880 if (!cache->haveSetColumnAt(col)) { |
883 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); | 881 Profiler profiler("FFTDataServer::getMagnitudeAt: filling"); |
884 #ifdef DEBUG_FFT_SERVER | 882 #ifdef DEBUG_FFT_SERVER |
885 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" | 883 std::cerr << "FFTDataServer::getMagnitudeAt: calling fillColumn(" |
886 << x << ")" << std::endl; | 884 << x << ")" << std::endl; |
906 else if (minbin + count * step > m_height) { | 904 else if (minbin + count * step > m_height) { |
907 count = (m_height - minbin) / step; | 905 count = (m_height - minbin) / step; |
908 } | 906 } |
909 | 907 |
910 size_t col; | 908 size_t col; |
911 FFTCache *cache = getCache(x, col); | 909 FFTCacheReader *cache = getCacheReader(x, col); |
912 if (!cache) return false; | 910 if (!cache) return false; |
913 | 911 |
912 //!!! n.b. can throw | |
914 if (!cache->haveSetColumnAt(col)) { | 913 if (!cache->haveSetColumnAt(col)) { |
915 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); | 914 Profiler profiler("FFTDataServer::getMagnitudesAt: filling"); |
916 MutexLocker locker(&m_writeMutex, | 915 MutexLocker locker(&m_writeMutex, |
917 "FFTDataServer::getMagnitudesAt: m_writeMutex"); | 916 "FFTDataServer::getMagnitudesAt: m_writeMutex"); |
918 fillColumn(x, true); | 917 fillColumn(x, true); |
929 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt", false); | 928 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt", false); |
930 | 929 |
931 if (x >= m_width || y >= m_height) return 0; | 930 if (x >= m_width || y >= m_height) return 0; |
932 | 931 |
933 size_t col; | 932 size_t col; |
934 FFTCache *cache = getCache(x, col); | 933 FFTCacheReader *cache = getCacheReader(x, col); |
935 if (!cache) return 0; | 934 if (!cache) return 0; |
936 | 935 |
936 //!!! n.b. can throw | |
937 if (!cache->haveSetColumnAt(col)) { | 937 if (!cache->haveSetColumnAt(col)) { |
938 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); | 938 Profiler profiler("FFTDataServer::getNormalizedMagnitudeAt: filling"); |
939 // hold mutex so that write thread doesn't mess with class | 939 // hold mutex so that write thread doesn't mess with class |
940 // member data in fillColumn | 940 // member data in fillColumn |
941 MutexLocker locker(&m_writeMutex, | 941 MutexLocker locker(&m_writeMutex, |
957 else if (minbin + count * step > m_height) { | 957 else if (minbin + count * step > m_height) { |
958 count = (m_height - minbin) / step; | 958 count = (m_height - minbin) / step; |
959 } | 959 } |
960 | 960 |
961 size_t col; | 961 size_t col; |
962 FFTCache *cache = getCache(x, col); | 962 FFTCacheReader *cache = getCacheReader(x, col); |
963 if (!cache) return false; | 963 if (!cache) return false; |
964 | 964 |
965 //!!! n.b. can throw | |
965 if (!cache->haveSetColumnAt(col)) { | 966 if (!cache->haveSetColumnAt(col)) { |
966 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); | 967 Profiler profiler("FFTDataServer::getNormalizedMagnitudesAt: filling"); |
967 MutexLocker locker(&m_writeMutex, | 968 MutexLocker locker(&m_writeMutex, |
968 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex"); | 969 "FFTDataServer::getNormalizedMagnitudesAt: m_writeMutex"); |
969 fillColumn(x, true); | 970 fillColumn(x, true); |
982 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt", false); | 983 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt", false); |
983 | 984 |
984 if (x >= m_width) return 0; | 985 if (x >= m_width) return 0; |
985 | 986 |
986 size_t col; | 987 size_t col; |
987 FFTCache *cache = getCache(x, col); | 988 FFTCacheReader *cache = getCacheReader(x, col); |
988 if (!cache) return 0; | 989 if (!cache) return 0; |
989 | 990 |
991 //!!! n.b. can throw | |
990 if (!cache->haveSetColumnAt(col)) { | 992 if (!cache->haveSetColumnAt(col)) { |
991 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); | 993 Profiler profiler("FFTDataServer::getMaximumMagnitudeAt: filling"); |
992 // hold mutex so that write thread doesn't mess with class | 994 // hold mutex so that write thread doesn't mess with class |
993 // member data in fillColumn | 995 // member data in fillColumn |
994 MutexLocker locker(&m_writeMutex, | 996 MutexLocker locker(&m_writeMutex, |
1004 Profiler profiler("FFTDataServer::getPhaseAt", false); | 1006 Profiler profiler("FFTDataServer::getPhaseAt", false); |
1005 | 1007 |
1006 if (x >= m_width || y >= m_height) return 0; | 1008 if (x >= m_width || y >= m_height) return 0; |
1007 | 1009 |
1008 size_t col; | 1010 size_t col; |
1009 FFTCache *cache = getCache(x, col); | 1011 FFTCacheReader *cache = getCacheReader(x, col); |
1010 if (!cache) return 0; | 1012 if (!cache) return 0; |
1011 | 1013 |
1014 //!!! n.b. can throw | |
1012 if (!cache->haveSetColumnAt(col)) { | 1015 if (!cache->haveSetColumnAt(col)) { |
1013 Profiler profiler("FFTDataServer::getPhaseAt: filling"); | 1016 Profiler profiler("FFTDataServer::getPhaseAt: filling"); |
1014 // hold mutex so that write thread doesn't mess with class | 1017 // hold mutex so that write thread doesn't mess with class |
1015 // member data in fillColumn | 1018 // member data in fillColumn |
1016 MutexLocker locker(&m_writeMutex, | 1019 MutexLocker locker(&m_writeMutex, |
1032 else if (minbin + count * step > m_height) { | 1035 else if (minbin + count * step > m_height) { |
1033 count = (m_height - minbin) / step; | 1036 count = (m_height - minbin) / step; |
1034 } | 1037 } |
1035 | 1038 |
1036 size_t col; | 1039 size_t col; |
1037 FFTCache *cache = getCache(x, col); | 1040 FFTCacheReader *cache = getCacheReader(x, col); |
1038 if (!cache) return false; | 1041 if (!cache) return false; |
1039 | 1042 |
1043 //!!! n.b. can throw | |
1040 if (!cache->haveSetColumnAt(col)) { | 1044 if (!cache->haveSetColumnAt(col)) { |
1041 Profiler profiler("FFTDataServer::getPhasesAt: filling"); | 1045 Profiler profiler("FFTDataServer::getPhasesAt: filling"); |
1042 MutexLocker locker(&m_writeMutex, | 1046 MutexLocker locker(&m_writeMutex, |
1043 "FFTDataServer::getPhasesAt: m_writeMutex"); | 1047 "FFTDataServer::getPhasesAt: m_writeMutex"); |
1044 fillColumn(x, true); | 1048 fillColumn(x, true); |
1061 imaginary = 0; | 1065 imaginary = 0; |
1062 return; | 1066 return; |
1063 } | 1067 } |
1064 | 1068 |
1065 size_t col; | 1069 size_t col; |
1066 FFTCache *cache = getCache(x, col); | 1070 FFTCacheReader *cache = getCacheReader(x, col); |
1067 | 1071 |
1068 if (!cache) { | 1072 if (!cache) { |
1069 real = 0; | 1073 real = 0; |
1070 imaginary = 0; | 1074 imaginary = 0; |
1071 return; | 1075 return; |
1072 } | 1076 } |
1073 | 1077 |
1078 //!!! n.b. can throw | |
1074 if (!cache->haveSetColumnAt(col)) { | 1079 if (!cache->haveSetColumnAt(col)) { |
1075 Profiler profiler("FFTDataServer::getValuesAt: filling"); | 1080 Profiler profiler("FFTDataServer::getValuesAt: filling"); |
1076 #ifdef DEBUG_FFT_SERVER | 1081 #ifdef DEBUG_FFT_SERVER |
1077 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; | 1082 std::cerr << "FFTDataServer::getValuesAt(" << x << ", " << y << "): filling" << std::endl; |
1078 #endif | 1083 #endif |
1092 Profiler profiler("FFTDataServer::isColumnReady", false); | 1097 Profiler profiler("FFTDataServer::isColumnReady", false); |
1093 | 1098 |
1094 if (x >= m_width) return true; | 1099 if (x >= m_width) return true; |
1095 | 1100 |
1096 if (!haveCache(x)) { | 1101 if (!haveCache(x)) { |
1102 /*!!! | |
1097 if (m_lastUsedCache == -1) { | 1103 if (m_lastUsedCache == -1) { |
1098 if (m_suspended) { | 1104 if (m_suspended) { |
1099 std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl; | 1105 std::cerr << "FFTDataServer::isColumnReady(" << x << "): no cache, calling resume" << std::endl; |
1100 resume(); | 1106 resume(); |
1101 } | 1107 } |
1102 m_fillThread->start(); | 1108 m_fillThread->start(); |
1103 } | 1109 } |
1110 */ | |
1104 return false; | 1111 return false; |
1105 } | 1112 } |
1106 | 1113 |
1107 size_t col; | 1114 size_t col; |
1108 FFTCache *cache = getCache(x, col); | 1115 FFTCacheReader *cache = getCacheReader(x, col); |
1109 if (!cache) return true; | 1116 if (!cache) return true; |
1110 | 1117 |
1118 //!!! n.b. can throw | |
1111 return cache->haveSetColumnAt(col); | 1119 return cache->haveSetColumnAt(col); |
1112 } | 1120 } |
1113 | 1121 |
1114 void | 1122 void |
1115 FFTDataServer::fillColumn(size_t x, bool lockHeld) | 1123 FFTDataServer::fillColumn(size_t x, bool lockHeld) |
1131 | 1139 |
1132 if (x >= m_width) { | 1140 if (x >= m_width) { |
1133 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " | 1141 std::cerr << "WARNING: FFTDataServer::fillColumn(" << x << "): " |
1134 << "x > width (" << x << " > " << m_width << ")" | 1142 << "x > width (" << x << " > " << m_width << ")" |
1135 << std::endl; | 1143 << std::endl; |
1136 // abort(); //!!! | |
1137 return; | 1144 return; |
1138 } | 1145 } |
1139 | 1146 |
1140 size_t col; | 1147 size_t col; |
1141 #ifdef DEBUG_FFT_SERVER_FILL | 1148 #ifdef DEBUG_FFT_SERVER_FILL |
1142 std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl; | 1149 std::cout << "FFTDataServer::fillColumn(" << x << ")" << std::endl; |
1143 #endif | 1150 #endif |
1144 FFTCache *cache = getCache(x, col); | 1151 FFTCacheWriter *cache = getCacheWriter(x, col); |
1145 if (!cache) return; | 1152 if (!cache) return; |
1146 | |
1147 { | |
1148 MutexLocker locker(lockHeld ? 0 : &m_writeMutex, | |
1149 "FFTDataServer::fillColumn::m_writeMutex [1]"); | |
1150 | |
1151 if (cache->haveSetColumnAt(col)) return; | |
1152 } | |
1153 | 1153 |
1154 int winsize = m_windowSize; | 1154 int winsize = m_windowSize; |
1155 int fftsize = m_fftSize; | 1155 int fftsize = m_fftSize; |
1156 int hs = fftsize/2; | 1156 int hs = fftsize/2; |
1157 | 1157 |
1268 if (m_suspended) { | 1268 if (m_suspended) { |
1269 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; | 1269 // std::cerr << "FFTDataServer::fillColumn(" << x << "): calling resume" << std::endl; |
1270 // resume(); | 1270 // resume(); |
1271 } | 1271 } |
1272 } | 1272 } |
1273 | |
1274 void | |
1275 FFTDataServer::fillComplete() | |
1276 { | |
1277 for (int i = 0; i < int(m_caches.size()); ++i) { | |
1278 if (m_caches[i]->memoryCache) { | |
1279 m_caches[i]->memoryCache->allColumnsWritten(); | |
1280 } | |
1281 if (m_caches[i]->fileCacheWriter) { | |
1282 m_caches[i]->fileCacheWriter->allColumnsWritten(); | |
1283 } | |
1284 } | |
1285 } | |
1273 | 1286 |
1274 size_t | 1287 size_t |
1275 FFTDataServer::getFillCompletion() const | 1288 FFTDataServer::getFillCompletion() const |
1276 { | 1289 { |
1277 if (m_fillThread) return m_fillThread->getCompletion(); | 1290 if (m_fillThread) return m_fillThread->getCompletion(); |
1416 if (updateAt > maxUpdateAt) updateAt = maxUpdateAt; | 1429 if (updateAt > maxUpdateAt) updateAt = maxUpdateAt; |
1417 } | 1430 } |
1418 } | 1431 } |
1419 } | 1432 } |
1420 | 1433 |
1434 m_server.fillComplete(); | |
1421 m_completion = 100; | 1435 m_completion = 100; |
1422 m_extent = end; | 1436 m_extent = end; |
1423 } | 1437 |
1424 | 1438 #ifdef DEBUG_FFT_SERVER |
1439 std::cerr << "FFTDataServer::FillThread::run exiting" << std::endl; | |
1440 #endif | |
1441 } | |
1442 |