From 4dc39e74c5944aaebe57b1a9535f652f7a5cd4ef Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 14 Dec 2023 16:57:04 +0800 Subject: [PATCH 1/6] fix: the block return from buildBlockFromFiles should be of new fileset --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 126 ++++++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 3 + 2 files changed, 114 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 52ee6d0b14..ea8801bf1f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -76,6 +76,8 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } +static void resetProcMemTableListIndex(SReaderStatus* pStatus); + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; @@ -2565,7 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { } if (pReader->status.bProcMemPreFileset) { - resetTableListIndex(&pReader->status); + uInfo("has mem preset"); + pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList; + resetProcMemTableListIndex(&pReader->status); } if (!pReader->status.bProcMemPreFileset) { @@ -2573,6 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + uInfo("new duration notification. %d", fid); } } @@ -2643,6 +2648,14 @@ static void resetTableListIndex(SReaderStatus* pStatus) { pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } +static void resetProcMemTableListIndex(SReaderStatus* pStatus) { + STableUidList* pList = &pStatus->procMemUidList; + + pList->currentIndex = 0; + uint64_t uid = pList->tableUidList[0]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); +} + static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { pOrderedCheckInfo->currentIndex += 1; if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { @@ -2655,6 +2668,18 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } +static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { + pOrderedCheckInfo->currentIndex += 1; + if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { + pStatus->pProcMemTableIter = NULL; + return false; + } + + uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; + pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + return (pStatus->pProcMemTableIter != NULL); +} + static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; @@ -2889,6 +2914,48 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } +static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) { + SReaderStatus* pStatus = &pReader->status; + STableUidList* pUidList = &pStatus->procMemUidList; + + tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + + while (1) { + if (pReader->code != TSDB_CODE_SUCCESS) { + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + return pReader->code; + } + + STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter; + if (pReader->pIgnoreTables && + taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { + bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + continue; + } + + initMemDataIterator(*pBlockScanInfo, pReader); + initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + + int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + + // current table is exhausted, let's try next table + bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + if (!hasNexTable) { + return TSDB_CODE_SUCCESS; + } + } +} + static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; @@ -4244,6 +4311,37 @@ _err: return code; } +static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SReaderStatus* pStatus = &pReader->status; + + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; + + int32_t fid = pReader->status.pCurrentFileset->fid; + STimeWindow win = {0}; + tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); + + int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; + code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey); + uInfo("zsl mem block rows: %ld", pBlock->info.rows); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } else { + pStatus->bProcMemPreFileset = false; + if (pReader->notifyFn) { + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); + uInfo("new duration notification. mem pre fileset : %d", fid); + } + if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) { + copyDataBlock(pBlock, pStatus->pNextFileBlock); + blockDataDestroy(pStatus->pNextFileBlock); + } + } + return code; +} + static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; @@ -4251,22 +4349,9 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { - int32_t fid = pReader->status.pCurrentFileset->fid; - STimeWindow win = {0}; - tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); - - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; - code = buildBlockFromBufferSequentially(pReader, endKey); + code = buildMemoryBlockPreFileset(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; - } else { - pStatus->bProcMemPreFileset = false; - if (pReader->notifyFn) { - STsdReaderNotifyInfo info = {0}; - info.duration.filesetId = fid; - pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - } - resetTableListIndex(pStatus); } } @@ -4274,6 +4359,17 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (code != TSDB_CODE_SUCCESS) { return code; } + if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) { + pStatus->pNextFileBlock = createOneDataBlock(pBlock, true); + blockDataCleanup(pBlock); + code = buildMemoryBlockPreFileset(pReader); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } + } + + + uInfo("zsl file block rows %ld", pBlock->info.rows); if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 43cd499aca..09ab064806 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -216,6 +216,9 @@ typedef struct SReaderStatus { int64_t prevFilesetStartKey; int64_t prevFilesetEndKey; bool bProcMemFirstFileset; + STableUidList procMemUidList; + STableBlockScanInfo** pProcMemTableIter; + SSDataBlock* pNextFileBlock; } SReaderStatus; struct STsdbReader { From 5bf364cc0a1ff50ce911aa53b412b2974d403a49 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 15 Dec 2023 08:23:25 +0800 Subject: [PATCH 2/6] enhance: refactor code --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 53 +++++++++++----------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 4 +- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ea8801bf1f..ca1ce02210 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -76,7 +76,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, ST static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static void resetProcMemTableListIndex(SReaderStatus* pStatus); +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { @@ -2567,9 +2567,9 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { } if (pReader->status.bProcMemPreFileset) { - uInfo("has mem preset"); + tsdbDebug("will start pre-fileset %d buffer processing. %s", fid, pReader->idStr); pReader->status.procMemUidList.tableUidList = pReader->status.uidList.tableUidList; - resetProcMemTableListIndex(&pReader->status); + resetPreFilesetMemTableListIndex(&pReader->status); } if (!pReader->status.bProcMemPreFileset) { @@ -2577,7 +2577,7 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - uInfo("new duration notification. %d", fid); + tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); } } @@ -2648,7 +2648,7 @@ static void resetTableListIndex(SReaderStatus* pStatus) { pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); } -static void resetProcMemTableListIndex(SReaderStatus* pStatus) { +static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus) { STableUidList* pList = &pStatus->procMemUidList; pList->currentIndex = 0; @@ -2668,14 +2668,15 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } -static bool moveToNextTablePreFileSet(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) { - pOrderedCheckInfo->currentIndex += 1; - if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { +static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { + STableUidList* pUidList = &pStatus->procMemUidList; + pUidList->currentIndex += 1; + if (pUidList->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) { pStatus->pProcMemTableIter = NULL; return false; } - uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; + uint64_t uid = pUidList->tableUidList[pUidList->currentIndex]; pStatus->pProcMemTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid)); return (pStatus->pProcMemTableIter != NULL); } @@ -2914,11 +2915,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } -static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, int64_t endKey) { +static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) { SReaderStatus* pStatus = &pReader->status; - STableUidList* pUidList = &pStatus->procMemUidList; - tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr); while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { @@ -2929,7 +2929,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, STableBlockScanInfo** pBlockScanInfo = pStatus->pProcMemTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { - bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; } @@ -2949,7 +2949,7 @@ static int32_t buildBlockFromBufferPreFilesetSequentially(STsdbReader* pReader, } // current table is exhausted, let's try next table - bool hasNexTable = moveToNextTablePreFileSet(pUidList, pStatus); + bool hasNexTable = moveToNextTableForPreFileSetMem(pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; } @@ -4311,7 +4311,7 @@ _err: return code; } -static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { +static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SReaderStatus* pStatus = &pReader->status; @@ -4322,21 +4322,22 @@ static int32_t buildMemoryBlockPreFileset(STsdbReader* pReader) { tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; - code = buildBlockFromBufferPreFilesetSequentially(pReader, endKey); - uInfo("zsl mem block rows: %ld", pBlock->info.rows); + code = buildBlockFromBufferSeqForPreFileset(pReader, endKey); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } else { + tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr); pStatus->bProcMemPreFileset = false; if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - uInfo("new duration notification. mem pre fileset : %d", fid); + tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); } - if (pStatus->pNextFileBlock && pStatus->pNextFileBlock->info.rows > 0) { - copyDataBlock(pBlock, pStatus->pNextFileBlock); - blockDataDestroy(pStatus->pNextFileBlock); + if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) { + tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr); + copyDataBlock(pBlock, pStatus->pNextFilesetBlock); + blockDataDestroy(pStatus->pNextFilesetBlock); } } return code; @@ -4349,7 +4350,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (pStatus->loadFromFile) { if (pStatus->bProcMemPreFileset) { - code = buildMemoryBlockPreFileset(pReader); + code = buildFromPreFilesetBuffer(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } @@ -4359,18 +4360,16 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { if (code != TSDB_CODE_SUCCESS) { return code; } + if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) { - pStatus->pNextFileBlock = createOneDataBlock(pBlock, true); + pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); blockDataCleanup(pBlock); - code = buildMemoryBlockPreFileset(pReader); + code = buildFromPreFilesetBuffer(pReader); if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { return code; } } - - uInfo("zsl file block rows %ld", pBlock->info.rows); - if (pBlock->info.rows <= 0) { resetTableListIndex(&pReader->status); int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 09ab064806..fe3441d058 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -210,6 +210,8 @@ typedef struct SReaderStatus { SArray* pLDataIterArray; SRowMerger merger; SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data + // the following for preceeds fileset memory processing + // TODO: refactor into seperate struct bool bProcMemPreFileset; int64_t memTableMaxKey; int64_t memTableMinKey; @@ -218,7 +220,7 @@ typedef struct SReaderStatus { bool bProcMemFirstFileset; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; - SSDataBlock* pNextFileBlock; + SSDataBlock* pNextFilesetBlock; } SReaderStatus; struct STsdbReader { From d3146a5bec2e89ca4cc30d9e2b5664df3f776f4e Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 15 Dec 2023 09:51:06 +0800 Subject: [PATCH 3/6] fix: when no file block, no proc mem before fileset --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ca1ce02210..e1dc226bb9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4361,12 +4361,18 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { return code; } - if (pStatus->bProcMemPreFileset && pBlock->info.rows > 0) { - pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); - blockDataCleanup(pBlock); - code = buildFromPreFilesetBuffer(pReader); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { - return code; + tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", + pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); + if (pStatus->bProcMemPreFileset) { + if ( pBlock->info.rows > 0) { + pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); + blockDataCleanup(pBlock); + code = buildFromPreFilesetBuffer(pReader); + if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { + return code; + } + } else { + pStatus->bProcMemPreFileset = false; } } From cc4a6c6d55a3940452a7cf86283dd07e8889a0cb Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 15 Dec 2023 15:04:47 +0800 Subject: [PATCH 4/6] fix: add new duration block event --- include/libs/executor/storageapi.h | 3 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 19 ++- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 - source/libs/executor/inc/executorInt.h | 9 +- source/libs/executor/src/scanoperator.c | 133 ++++++++++++++------- 5 files changed, 105 insertions(+), 60 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 712ae7c95b..2402182eae 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -153,7 +153,8 @@ typedef struct { // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef enum { - TSD_READER_NOTIFY_DURATION_START + TSD_READER_NOTIFY_DURATION_START, + TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, } ETsdReaderNotifyType; typedef union { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e1dc226bb9..6f9f81773a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4332,12 +4332,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; pReader->notifyFn(TSD_READER_NOTIFY_DURATION_START, &info, pReader->notifyParam); - tsdbDebug("new duration %d start notification when no buffer preceeding fileset, %s", fid, pReader->idStr); - } - if (pStatus->pNextFilesetBlock && pStatus->pNextFilesetBlock->info.rows > 0) { - tsdbDebug("return the saved block from fileset %d files, %s", fid, pReader->idStr); - copyDataBlock(pBlock, pStatus->pNextFilesetBlock); - blockDataDestroy(pStatus->pNextFilesetBlock); + tsdbDebug("new duration %d start notification when buffer pre-fileset, %s", fid, pReader->idStr); } } return code; @@ -4364,12 +4359,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); if (pStatus->bProcMemPreFileset) { - if ( pBlock->info.rows > 0) { - pStatus->pNextFilesetBlock = createOneDataBlock(pBlock, true); - blockDataCleanup(pBlock); - code = buildFromPreFilesetBuffer(pReader); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows > 0) { - return code; + if (pBlock->info.rows > 0) { + if (pReader->notifyFn) { + int32_t fid = pReader->status.pCurrentFileset->fid; + STsdReaderNotifyInfo info = {0}; + info.duration.filesetId = fid; + pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam); } } else { pStatus->bProcMemPreFileset = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index fe3441d058..3679015e9c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -220,7 +220,6 @@ typedef struct SReaderStatus { bool bProcMemFirstFileset; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; - SSDataBlock* pNextFilesetBlock; } SReaderStatus; struct STsdbReader { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f7e55b71be..e3e504cdbc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -298,9 +298,14 @@ typedef struct STableMergeScanInfo { SHashObj* mSkipTables; int64_t mergeLimit; SSortExecInfo sortExecInfo; - bool bNewFileset; - bool bOnlyRetrieveBlock; + bool filesetDelimited; + bool bNewFilesetEvent; + bool bNextDurationBlockEvent; + int32_t numNextDurationBlocks; + SSDataBlock* nextDurationBlocks[2]; + bool rtnNextDurationBlocks; + int32_t nextDurationBlocksIdx; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ea73f60468..bce8325195 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3240,6 +3240,53 @@ static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock return TSDB_CODE_SUCCESS; } +static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + SSDataBlock* pBlock = pInfo->pReaderBlock; + int32_t code = 0; + bool hasNext = false; + STsdbReader* reader = pInfo->base.dataReader; + + code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (!hasNext || isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo)) { + qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); + pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + } + *pFinished = true; + return; + } + + uint32_t status = 0; + code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); + + if (code != TSDB_CODE_SUCCESS) { + qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, code); + } + + if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { + *pFinished = true; + return; + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + *pSkipped = true; + return; + } + return; +} + static SSDataBlock* getBlockForTableMergeScan(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -3255,53 +3302,42 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { - if (!pInfo->bOnlyRetrieveBlock) { - code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); - if (code != 0) { - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (!hasNext || isTaskKilled(pTaskInfo)) { - pInfo->bNewFileset = false; - if (isTaskKilled(pTaskInfo)) { - qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo)); - pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); + if (pInfo->rtnNextDurationBlocks) { + if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { + copyDataBlock(pBlock, pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); + blockDataDestroy(pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); + ++pInfo->nextDurationBlocksIdx; + if (pInfo->nextDurationBlocksIdx >= pInfo->numNextDurationBlocks) { + pInfo->rtnNextDurationBlocks = false; + pInfo->nextDurationBlocksIdx = 0; } + } + } else { + bool bFinished = false; + bool bSkipped = false; + doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + if (bFinished) { + pInfo->bNewFilesetEvent = false; break; } - if (pInfo->bNewFileset) { - pInfo->bOnlyRetrieveBlock = true; - return NULL; + if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { + if (!bSkipped) { + pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); + ++pInfo->numNextDurationBlocks; + ASSERT(pInfo->numNextDurationBlocks <= 2); + } + if (pInfo->bNewFilesetEvent) { + pInfo->rtnNextDurationBlocks = true; + return NULL; + } + if (pInfo->bNextDurationBlockEvent) { + pInfo->bNextDurationBlockEvent = false; + continue; + } } + if (bSkipped) continue; } - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pInfo->sample); - if (!processThisBlock) { - continue; - } - - uint32_t status = 0; - code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - if (pInfo->bOnlyRetrieveBlock) { - pInfo->bOnlyRetrieveBlock = false; - } - if (code != TSDB_CODE_SUCCESS) { - qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, code); - } - - if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { - break; - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); if (pInfo->mergeLimit != -1) { @@ -3317,6 +3353,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { return NULL; } + SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { @@ -3348,7 +3385,11 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) { STableMergeScanInfo* pTmsInfo = param; - pTmsInfo->bNewFileset = true; + if (type == TSD_READER_NOTIFY_DURATION_START) { + pTmsInfo->bNewFilesetEvent = true; + } else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) { + pTmsInfo->bNextDurationBlockEvent = true; + } return; } @@ -3358,7 +3399,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; - pInfo->bNewFileset = false; + pInfo->bNewFilesetEvent = false; + pInfo->bNextDurationBlockEvent = false; + pInfo->numNextDurationBlocks = 0; + pInfo->nextDurationBlocksIdx = 0; + pInfo->rtnNextDurationBlocks = false; pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -3535,7 +3580,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += pBlock->info.rows; return pBlock; } else { - if (pInfo->bNewFileset) { + if (pInfo->bNewFilesetEvent) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); } else { From ce48598f0d246eb1d74068bf71160f4d33f66d1f Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 15 Dec 2023 17:16:20 +0800 Subject: [PATCH 5/6] fix: pass first round test --- docs/en/12-taos-sql/05-insert.md | 6 +++--- docs/zh/12-taos-sql/05-insert.md | 6 +++--- source/libs/executor/src/scanoperator.c | 23 ++++++++++++----------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/docs/en/12-taos-sql/05-insert.md b/docs/en/12-taos-sql/05-insert.md index e5f502c563..f6e39a9734 100644 --- a/docs/en/12-taos-sql/05-insert.md +++ b/docs/en/12-taos-sql/05-insert.md @@ -158,8 +158,8 @@ Automatically creating table and the table name is specified through the `tbname ```sql INSERT INTO meters(tbname, location, groupId, ts, current, phase) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33) - values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33) + values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32) + ('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33) + ('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33) ``` diff --git a/docs/zh/12-taos-sql/05-insert.md b/docs/zh/12-taos-sql/05-insert.md index efcd5dd962..583d047c43 100644 --- a/docs/zh/12-taos-sql/05-insert.md +++ b/docs/zh/12-taos-sql/05-insert.md @@ -158,7 +158,7 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c 自动建表, 表名通过tbname列指定 ```sql INSERT INTO meters(tbname, location, groupId, ts, current, phase) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32) - values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33) - values('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33) + values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 0.32) + ('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 0.33) + ('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 0.33) ``` diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bce8325195..6480aa9104 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3268,7 +3268,6 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo)); T_LONG_JMP(pTaskInfo->env, code); @@ -3294,7 +3293,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = pInfo->pReaderBlock; + SSDataBlock* pBlock = NULL; int32_t code = 0; int64_t st = taosGetTimestampUs(); @@ -3304,18 +3303,24 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { while (true) { if (pInfo->rtnNextDurationBlocks) { if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { - copyDataBlock(pBlock, pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); - blockDataDestroy(pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]); + pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; - if (pInfo->nextDurationBlocksIdx >= pInfo->numNextDurationBlocks) { - pInfo->rtnNextDurationBlocks = false; - pInfo->nextDurationBlocksIdx = 0; + } else { + for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { + blockDataDestroy(pInfo->nextDurationBlocks[i]); } + pInfo->rtnNextDurationBlocks = false; + pInfo->nextDurationBlocksIdx = 0; + pInfo->numNextDurationBlocks = 0; + continue; } } else { + bool bFinished = false; bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); + pBlock = pInfo->pReaderBlock; + if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -3346,7 +3351,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - return pBlock; } @@ -3401,9 +3405,6 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->bNewFilesetEvent = false; pInfo->bNextDurationBlockEvent = false; - pInfo->numNextDurationBlocks = 0; - pInfo->nextDurationBlocksIdx = 0; - pInfo->rtnNextDurationBlocks = false; pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; From b2ee43540b85f29e945a0640563441852e221ab6 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 15 Dec 2023 21:55:25 +0800 Subject: [PATCH 6/6] enhance: add log and clean up --- source/libs/executor/src/scanoperator.c | 36 ++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6480aa9104..3e8574f49f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3302,12 +3302,15 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (true) { if (pInfo->rtnNextDurationBlocks) { + qDebug("%s table merge scan return already fetched new duration blocks. index %d num of blocks %d", + GET_TASKID(pTaskInfo), pInfo->nextDurationBlocksIdx, pInfo->numNextDurationBlocks); if (pInfo->nextDurationBlocksIdx < pInfo->numNextDurationBlocks) { pBlock = pInfo->nextDurationBlocks[pInfo->nextDurationBlocksIdx]; ++pInfo->nextDurationBlocksIdx; } else { for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; } pInfo->rtnNextDurationBlocks = false; pInfo->nextDurationBlocksIdx = 0; @@ -3320,7 +3323,8 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { bool bSkipped = false; doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped); pBlock = pInfo->pReaderBlock; - + qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d", + GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent); if (bFinished) { pInfo->bNewFilesetEvent = false; break; @@ -3330,7 +3334,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (!bSkipped) { pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); ++pInfo->numNextDurationBlocks; - ASSERT(pInfo->numNextDurationBlocks <= 2); + if (pInfo->numNextDurationBlocks > 2) { + qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), pInfo->numNextDurationBlocks); + pInfo->bNewFilesetEvent = false; + break; + } } if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; @@ -3394,6 +3402,8 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* } else if (type == TSD_READER_NOTIFY_NEXT_DURATION_BLOCK) { pTmsInfo->bNextDurationBlockEvent = true; } + qDebug("table merge scan receive notification. type %d, fileset %d", type, info->duration.filesetId); + return; } @@ -3403,6 +3413,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + qDebug("%s table merge scan start duration ", GET_TASKID(pTaskInfo)); pInfo->bNewFilesetEvent = false; pInfo->bNextDurationBlockEvent = false; @@ -3434,6 +3445,8 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + qDebug("%s table merge scan stop duration ", GET_TASKID(pTaskInfo)); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; @@ -3451,6 +3464,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SReadHandle* pHandle = &pInfo->base.readHandle; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + qDebug("%s table merge scan start group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); { size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -3498,10 +3512,19 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); pInfo->base.dataReader = NULL; } - + for (int32_t i = 0; i < pInfo->numNextDurationBlocks; ++i) { + if (pInfo->nextDurationBlocks[i]) { + blockDataDestroy(pInfo->nextDurationBlocks[i]); + pInfo->nextDurationBlocks[i] = NULL; + } + pInfo->numNextDurationBlocks = 0; + pInfo->nextDurationBlocksIdx = 0; + } resetLimitInfoForNextGroup(&pInfo->limitInfo); taosHashCleanup(pInfo->mSkipTables); pInfo->mSkipTables = NULL; + qDebug("%s table merge scan stop group %"PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId); + return TSDB_CODE_SUCCESS; } @@ -3612,6 +3635,13 @@ void destroyTableMergeScanOperatorInfo(void* param) { pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; + for (int32_t i = 0; i < pTableScanInfo->numNextDurationBlocks; ++i) { + if (pTableScanInfo->nextDurationBlocks[i] != NULL) { + blockDataDestroy(pTableScanInfo->nextDurationBlocks[i]); + pTableScanInfo->nextDurationBlocks[i] = NULL; + } + } + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL;