diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index f3c5d9ec93..d71f1729b6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static void resetTableListIndex(SReaderStatus* pStatus); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -241,11 +242,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.fileSetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); - } + *hasNext = true; return TSDB_CODE_SUCCESS; } @@ -436,6 +433,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } + pReader->bDurationOrder = true; + tsdbInitReaderLock(pReader); *ppReader = pReader; @@ -2550,6 +2549,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { + pReader->status.bProcMemPreFileset = true; + resetTableListIndex(&pReader->status); break; } } @@ -2931,7 +2932,7 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) { return code; } -static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -2948,13 +2949,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { if (!hasNexTable) { return TSDB_CODE_SUCCESS; } - pBlockScanInfo = pStatus->pTableIter; + continue; } initMemDataIterator(*pBlockScanInfo, pReader); initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4350,6 +4350,70 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) { return pBlock->info.rows > 0; } +static int32_t doTsdbNextDataBlockDurationOrder(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + if (pStatus->loadFromFile) { + if (pStatus->bProcMemPreFileset) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + code = buildBlockFromBufferSequentially(pReader, win.skey); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } else { + pStatus->bProcMemPreFileset = false; + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.fileSetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION, &info, pReader->notifyParam); + } + } + } + + code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pBlock->info.rows <= 0) { + resetTableListIndex(&pReader->status); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + } else { // no data in files, let's try the buffer + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + return code; +} + +static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + if (pStatus->loadFromFile) { + code = buildBlockFromFiles(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pBlock->info.rows <= 0) { + resetTableListIndex(&pReader->status); + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + } else { // no data in files, let's try the buffer + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; + code = buildBlockFromBufferSequentially(pReader, endKey); + } + return code; +} + static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { int32_t code = TSDB_CODE_SUCCESS; @@ -4367,19 +4431,10 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (READ_MODE_COUNT_ONLY == pReader->info.readMode) { return tsdbReadRowsCountOnly(pReader); } - - if (pStatus->loadFromFile) { - code = buildBlockFromFiles(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pBlock->info.rows <= 0) { - resetTableListIndex(&pReader->status); - code = buildBlockFromBufferSequentially(pReader); - } - } else { // no data in files, let's try the buffer - code = buildBlockFromBufferSequentially(pReader); + if (!pReader->bDurationOrder) { + code = doTsdbNextDataBlockFilesFirst(pReader); + } else { + code = doTsdbNextDataBlockDurationOrder(pReader); } *hasNext = pBlock->info.rows > 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index fb32190115..2d5b6f38a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -206,6 +206,7 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + bool bProcMemPreFileset; } SReaderStatus; struct STsdbReader { @@ -228,6 +229,7 @@ struct STsdbReader { SBlockInfoBuf blockInfoBuf; EContentData step; STsdbReader* innerReader[2]; + bool bDurationOrder; // duration by duration output TsdReaderNotifyCbFn notifyFn; void* notifyParam; };