diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 52ee6d0b14..ea8801bf1f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -76,6 +76,8 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } +static void resetProcMemTableListIndex(SReaderStatus* pStatus); + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; @@ -2565,7 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { } if (pReader->status.bProcMemPreFileset) { - resetTableListIndex(&pReader->status); + uInfo("has mem preset"); + pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList; + resetProcMemTableListIndex(&pReader->status); } if (!pReader->status.bProcMemPreFileset) { @@ -2573,6 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + uInfo("new duration notification. %d", fid); } } @@ -2643,6 +2648,14 @@ static void resetTableListIndex(SReaderStatus* pStatus) { pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } +static void resetProcMemTableListIndex(SReaderStatus* pStatus) { + STableUidList* pList = &pStatus->procMemUidList; + + pList->currentIndex = 0; + uint64_t uid = pList->tableUidList[0]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); +} + static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { pOrderedCheckInfo->currentIndex += 1; if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { @@ -2655,6 +2668,18 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } +static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { + pOrderedCheckInfo->currentIndex += 1; + if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { + pStatus->pProcMemTableIter = NULL; + return false; + } + + uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + return (pStatus->pProcMemTableIter != NULL); +} + static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; @@ -2889,6 +2914,48 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } +static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) { + SReaderStatus* pStatus = &pReader->status; + STableUidList* pUidList = &pStatus->procMemUidList; + + tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + + while (1) { + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; + } + + STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter; + if (pReader->pIgnoreTables && + taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { + bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + continue; + } + + initMemDataIterator(*pBlockScanInfo, pReader); + initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + + int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + + // current table is exhausted, let's try next table + bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + } +} + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -4244,6 +4311,37 @@ _err: return code; } +static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SReaderStatus* pStatus = &pReader->status; + + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; + code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey); + uInfo("zsl mem block rows: %ld", pBlock->info.rows); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } else { + pStatus->bProcMemPreFileset = false; + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + uInfo("new duration notification. mem pre fileset : %d", fid); + } + if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) { + copyDataBlock(pBlock, pStatus->pNextFileBlock); + blockDataDestroy(pStatus->pNextFileBlock); + } + } + return code; +} + static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; @@ -4251,22 +4349,9 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { - int32_t fid = pReader->status.pCurrentFileset->fid; - STimeWindow win = {0}; - tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); - - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; - code = buildBlockFromBufferSequentially(pReader, endKey); + code = buildMemoryBlockPreFileset(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; - } else { - pStatus->bProcMemPreFileset = false; - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.filesetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - } - resetTableListIndex(pStatus); } } @@ -4274,6 +4359,17 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (code != TSDB_CODE_SUCCESS) { return code; } + if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) { + pStatus->pNextFileBlock = createOneDataBlock(pBlock, true); + blockDataCleanup(pBlock); + code = buildMemoryBlockPreFileset(pReader); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } + } + + + uInfo("zsl file block rows %ld", pBlock->info.rows); if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 43cd499aca..09ab064806 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -216,6 +216,9 @@ typedef struct SReaderStatus { int64_t prevFilesetStartKey; int64_t prevFilesetEndKey; bool bProcMemFirstFileset; + STableUidList procMemUidList; + STableBlockScanInfo** pProcMemTableIter; + SSDataBlock* pNextFileBlock; } SReaderStatus; struct STsdbReader {