diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ea8801bf1f..ca1ce02210 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -76,7 +76,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static void resetProcMemTableListIndex(SReaderStatus* pStatus); +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { @@ -2567,9 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { } if (pReader->status.bProcMemPreFileset) { - uInfo("has mem preset"); + tsdbDebug("will start pre-fileset %d buffer processing. %s", fid, pReader->idStr); pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList; - resetProcMemTableListIndex(&pReader->status); + resetPreFilesetMemTableListIndex(&pReader->status); } if (!pReader->status.bProcMemPreFileset) { @@ -2577,7 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - uInfo("new duration notification. %d", fid); + tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); } } @@ -2648,7 +2648,7 @@ static void resetTableListIndex(SReaderStatus* pStatus) { pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } -static void resetProcMemTableListIndex(SReaderStatus* pStatus) { +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) { STableUidList* pList = &pStatus->procMemUidList; pList->currentIndex = 0; @@ -2668,14 +2668,15 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } -static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { - pOrderedCheckInfo->currentIndex += 1; - if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { +static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { + STableUidList* pUidList = &pStatus->procMemUidList; + pUidList->currentIndex += 1; + if (pUidList->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { pStatus->pProcMemTableIter = NULL; return false; } - uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; + uint64_t uid = pUidList->tableUidList[pUidList->currentIndex]; pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); return (pStatus->pProcMemTableIter != NULL); } @@ -2914,11 +2915,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } -static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) { +static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; - STableUidList* pUidList = &pStatus->procMemUidList; - tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr); while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { @@ -2929,7 +2929,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { - bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; } @@ -2949,7 +2949,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, } // current table is exhausted, let's try next table - bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; } @@ -4311,7 +4311,7 @@ _err: return code; } -static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { +static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SReaderStatus* pStatus = &pReader->status; @@ -4322,21 +4322,22 @@ static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; - code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey); - uInfo("zsl mem block rows: %ld", pBlock->info.rows); + code = buildBlockFromBufferSeqForPreFileset(pReader, endKey); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } else { + tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr); pStatus->bProcMemPreFileset = false; if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - uInfo("new duration notification. mem pre fileset : %d", fid); + tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); } - if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) { - copyDataBlock(pBlock, pStatus->pNextFileBlock); - blockDataDestroy(pStatus->pNextFileBlock); + if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) { + tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr); + copyDataBlock(pBlock, pStatus->pNextFilesetBlock); + blockDataDestroy(pStatus->pNextFilesetBlock); } } return code; @@ -4349,7 +4350,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { - code = buildMemoryBlockPreFileset(pReader); + code = buildFromPreFilesetBuffer(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } @@ -4359,18 +4360,16 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (code != TSDB_CODE_SUCCESS) { return code; } + if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) { - pStatus->pNextFileBlock = createOneDataBlock(pBlock, true); + pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); blockDataCleanup(pBlock); - code = buildMemoryBlockPreFileset(pReader); + code = buildFromPreFilesetBuffer(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } } - - uInfo("zsl file block rows %ld", pBlock->info.rows); - if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 09ab064806..fe3441d058 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -210,6 +210,8 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + // the following for preceeds fileset memory processing + // TODO: refactor into seperate struct bool bProcMemPreFileset; int64_t memTableMaxKey; int64_t memTableMinKey; @@ -218,7 +220,7 @@ typedef struct SReaderStatus { bool bProcMemFirstFileset; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; - SSDataBlock* pNextFileBlock; + SSDataBlock* pNextFilesetBlock; } SReaderStatus; struct STsdbReader {