From 53b2158c543148c2067be05c957400e8cb21b504 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Nov 2023 09:30:50 +0800 Subject: [PATCH 01/28] refactor: do some internal refactor. --- include/libs/executor/executor.h | 6 +----- source/libs/executor/src/executor.c | 26 -------------------------- source/libs/stream/src/streamExec.c | 18 ++++++++++-------- 3 files changed, 11 insertions(+), 39 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5990ae1c9c..f549f23c7f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -184,11 +184,7 @@ void qDestroyTask(qTaskInfo_t tinfo); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); - -int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); - -int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 60dc6f0185..4452eff8c7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -871,32 +871,6 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) { return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList); } -int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { - SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; - if (pTaskInfo->pRoot == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - int32_t nOptrWithVal = 0; - // int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); - // if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { - // taosMemoryFreeClear(*pOutput); - // *len = 0; - // } - return 0; -} - -int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { - SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo; - - if (pTaskInfo == NULL || pInput == NULL || len == 0) { - return TSDB_CODE_INVALID_PARA; - } - - return 0; - // return decodeOperator(pTaskInfo->pRoot, pInput, len); -} - int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 49f691c558..05a62140f9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -647,23 +647,25 @@ int32_t streamExecTask(SStreamTask* pTask) { int32_t streamTaskReleaseState(SStreamTask* pTask) { stDebug("s-task:%s release exec state", pTask->id.idStr); void* pExecutor = pTask->exec.pExecutor; + + int32_t code = TSDB_CODE_SUCCESS; if (pExecutor != NULL) { - int32_t code = qStreamOperatorReleaseState(pExecutor); - return code; - } else { - return TSDB_CODE_SUCCESS; + code = qStreamOperatorReleaseState(pExecutor); } + + return code; } int32_t streamTaskReloadState(SStreamTask* pTask) { stDebug("s-task:%s reload exec state", pTask->id.idStr); void* pExecutor = pTask->exec.pExecutor; + + int32_t code = TSDB_CODE_SUCCESS; if (pExecutor != NULL) { - int32_t code = qStreamOperatorReloadState(pExecutor); - return code; - } else { - return TSDB_CODE_SUCCESS; + code = qStreamOperatorReloadState(pExecutor); } + + return code; } int32_t streamAlignTransferState(SStreamTask* pTask) { From 675c9ca61337b3d5575b6e9b6d4e5aaeb6b8870b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Nov 2023 10:35:05 +0800 Subject: [PATCH 02/28] enh(tsdb): on-demand open stt file reader. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 130 +++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 16 +-- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 39 ++++--- 3 files changed, 105 insertions(+), 80 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d1919d95ba..dff0352195 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -565,9 +565,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); if (ASCENDING_TRAVERSE(pReader->info.order)) { - w.skey = pScanInfo->lastKey + step; + w.skey = pScanInfo->lastProcKey + step; } else { - w.ekey = pScanInfo->lastKey + step; + w.ekey = pScanInfo->lastProcKey + step; } if (isEmptyQueryTimeWindow(&w)) { @@ -607,14 +607,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN clearBrinBlockIter(&iter); - pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size; - int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; + pBlockNum->numOfSttFiles = pReader->status.pCurrentFileset->lvlArr->size; + int32_t total = pBlockNum->numOfSttFiles + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug( "load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed " "time:%.2f ms %s", - numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles, + numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles, sizeInDisk / 1000.0, el, pReader->idStr); pReader->cost.numOfBlocks += total; @@ -1200,13 +1200,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p } } -static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) { +static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, int32_t order) { bool ascScan = ASCENDING_TRAVERSE(order); - bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); int64_t key = 0; - if (bHasDataInLastBlock) { - int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt); } else { key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey; @@ -1215,10 +1214,10 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader return key; } -static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, - SLastBlockReader* pLastBlockReader, int32_t order) { +static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, + int32_t order) { bool ascScan = ASCENDING_TRAVERSE(order); - int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order); + int64_t key = getBoarderKeyInFiles(pBlock, pScanInfo, order); return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) || (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key)); @@ -1302,10 +1301,9 @@ typedef struct { } SDataBlockToLoadInfo; static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader, - STsdbReader* pReader) { - int32_t neighborIndex = 0; + STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) { SBrinRecord rec = {0}; + int32_t neighborIndex = 0; bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec); @@ -1319,9 +1317,11 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order); - if (hasDataInLastBlock(pLastBlockReader)) { - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); - pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast); + ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); + if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) { + int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey; + pInfo->overlapWithLastBlock = + !(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt); } pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity; @@ -1336,9 +1336,9 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* // 5. delete info should not overlap with current block data // 6. current block should not contain the duplicated ts static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, - TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { + TSDBKEY keyInBuf) { SDataBlockToLoadInfo info = {0}; - getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); + getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader); bool loadDataBlock = (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf || @@ -1358,9 +1358,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock } static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, - TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { + TSDBKEY keyInBuf) { SDataBlockToLoadInfo info = {0}; - getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); + getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader); bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || info.overlapWithDelInfo || info.overlapWithLastBlock); return isCleanFileBlock; @@ -1417,14 +1417,15 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return code; } -static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, - SVersionRange* pVerRange) { +static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, + SVersionRange* pVerRange) { int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1; while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); if (!hasVal) { // the next value will be the accessed key in stt - pScanInfo->lastKeyInStt += step; + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + pScanInfo->sttKeyInfo.nextProcKey += step; return false; } @@ -1433,10 +1434,11 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; pLastBlockReader->currentKey = key; - pScanInfo->lastKeyInStt = key; + pScanInfo->sttKeyInfo.nextProcKey = key; if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) { + pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; return true; } } @@ -1457,7 +1459,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas // avoid the fetch next row replace the referenced stt block in buffer doPinSttBlock(pLastBlockReader); - bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); + bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); doUnpinSttBlock(pLastBlockReader); if (hasVal) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); @@ -1694,7 +1696,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } if (copied) { - pBlockScanInfo->lastKey = tsLastBlock; + pBlockScanInfo->lastProcKey = tsLastBlock; return TSDB_CODE_SUCCESS; } else { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); @@ -2062,9 +2064,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea STbData* d = NULL; TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->info.order)) { - startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer}; } else { - startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer}; + startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer}; } int32_t code = @@ -2129,9 +2131,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan STimeWindow w = pLBlockReader->window; if (ASCENDING_TRAVERSE(pLBlockReader->order)) { - w.skey = pScanInfo->lastKeyInStt; + w.skey = pScanInfo->sttKeyInfo.nextProcKey; } else { - w.ekey = pScanInfo->lastKeyInStt; + w.ekey = pScanInfo->sttKeyInfo.nextProcKey; } int64_t st = taosGetTimestampUs(); @@ -2164,7 +2166,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan initMemDataIterator(pScanInfo, pReader); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); - code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); + code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); int64_t el = taosGetTimestampUs() - st; pReader->cost.initLastBlockReader += (el / 1000.0); @@ -2209,7 +2211,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { - pBlockScanInfo->lastKey = key; + pBlockScanInfo->lastProcKey = key; return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2354,7 +2356,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) && + if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= pReader->resBlockInfo.capacity)) { if (asc || (!hasDataInLastBlock(pLastBlockReader))) { code = copyBlockDataToSDataBlock(pReader); @@ -2363,7 +2365,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } // record the last key value - pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey; + pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey; goto _end; } } @@ -2527,7 +2529,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) { SReaderStatus* pStatus = &pReader->status; pBlockNum->numOfBlocks = 0; - pBlockNum->numOfLastFiles = 0; + pBlockNum->numOfSttFiles = 0; size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk)); @@ -2564,7 +2566,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return code; } - if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) { + if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) { break; } } @@ -2684,11 +2686,13 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } } -static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) { - if(!hasDataInLastBlock(pLastBlockReader)) { +static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { + ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); + + if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { return true; } else { - int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt); } } @@ -2717,10 +2721,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return terrno; } - initLastBlockReader(pLastBlockReader, pScanInfo, pReader); - TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); + if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { + initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + } - if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) { + TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); + if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf)) { code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2728,13 +2734,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // build composed data block code = buildComposedDataBlock(pReader); - } else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) { + } else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pScanInfo, pReader->info.order)) { // data in memory that are earlier than current file block and stt blocks // rows in buffer should be less than the file block in asc, greater than file block in desc - int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order); + int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) { + if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) { // whole block is required, return it directly SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; pInfo->rows = pBlockInfo->record.numRow; @@ -2745,7 +2751,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); // update the last key for the corresponding table - pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey; + pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", @@ -2760,8 +2766,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { tsdbDebug("load data in last block firstly %s", pReader->idStr); int64_t st = taosGetTimestampUs(); + // let's load data from stt files + initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + // no data in last block, no need to proceed. while (hasDataInLastBlock(pLastBlockReader)) { + ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); + code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2988,7 +2999,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) if (pBlockInfo) { STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pScanInfo) { - lastKey = pScanInfo->lastKey; + lastKey = pScanInfo->lastProcKey; } pDumpInfo->totalRows = pBlockInfo->record.numRow; @@ -3013,7 +3024,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl } // all data files are consumed, try data in buffer - if (num.numOfBlocks + num.numOfLastFiles == 0) { + if (num.numOfBlocks + num.numOfSttFiles == 0) { pReader->status.loadFromFile = false; taosArrayDestroy(pTableList); return code; @@ -3458,15 +3469,15 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { - while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) { + while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, - pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, - idStr); + pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), + pScanInfo->sttKeyInfo.nextProcKey, idStr); break; } } @@ -3722,7 +3733,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT pBlock->info.dataLoad = 1; pBlock->info.rows += 1; - pScanInfo->lastKey = pTSRow->ts; + pScanInfo->lastProcKey = pTSRow->ts; return TSDB_CODE_SUCCESS; } @@ -3856,14 +3867,15 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t // todo extract method if (ASCENDING_TRAVERSE(pReader->info.order)) { int64_t skey = pReader->info.window.skey; - pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; - pInfo->lastKeyInStt = skey; + pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; + pInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pReader->info.window.ekey; - pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pInfo->lastKeyInStt = ekey; + pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pInfo->sttKeyInfo.nextProcKey = ekey; } + pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } @@ -4224,7 +4236,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { if (pBlockScanInfo) { // save lastKey to restore memory iterator STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; - pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; + pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; // reset current current table's data block scan info, pBlockScanInfo->iterInit = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index a4d2eef093..af7cae33fc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -157,17 +157,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { int64_t skey = pTsdbReader->info.window.skey; - pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; - pScanInfo->lastKeyInStt = skey; + pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; + pScanInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pTsdbReader->info.window.ekey; - pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pScanInfo->lastKeyInStt = ekey; + pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pScanInfo->sttKeyInfo.nextProcKey = ekey; } + pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastKey, pTsdbReader->idStr); + pScanInfo->lastProcKey, pTsdbReader->idStr); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); @@ -200,8 +201,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - pInfo->lastKey = ts; - pInfo->lastKeyInStt = ts + step; + pInfo->lastProcKey = ts; + pInfo->sttKeyInfo.nextProcKey = ts + step; } } @@ -241,6 +242,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { taosArrayClear(pScanInfo->pBlockList); taosArrayClear(pScanInfo->pBlockIdxList); taosArrayClear(pScanInfo->pFileDelData); // del data from each file set + pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; } void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index ea8efe37ee..47585ea6e3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -63,20 +63,31 @@ typedef struct STableDataBlockIdx { int32_t globalIndex; } STableDataBlockIdx; +typedef enum ESttKeyStatus { + STT_FILE_READER_UNINIT = 0x0, + STT_FILE_NO_DATA = 0x1, + STT_FILE_HAS_DATA = 0x2, +} ESttKeyStatus; + +typedef struct SSttKeyInfo { + ESttKeyStatus status; // this value should be updated when switch to the next fileset + int64_t nextProcKey; +} SSttKeyInfo; + typedef struct STableBlockScanInfo { - uint64_t uid; - TSKEY lastKey; - TSKEY lastKeyInStt; // last accessed key in stt - SArray* pBlockList; // block data index list, SArray - SArray* pBlockIdxList; // SArray - SArray* pMemDelData; // SArray - SArray* pFileDelData; // SArray from each file set - SIterInfo iter; // mem buffer skip list iterator - SIterInfo iiter; // imem buffer skip list iterator - SArray* delSkyline; // delete info for this table - int32_t fileDelIndex; // file block delete index - int32_t sttBlockDelIndex; // delete index for last block - bool iterInit; // whether to initialize the in-memory skip list iterator or not + uint64_t uid; + TSKEY lastProcKey; + SSttKeyInfo sttKeyInfo; + SArray* pBlockList; // block data index list, SArray + SArray* pBlockIdxList; // SArray + SArray* pMemDelData; // SArray + SArray* pFileDelData; // SArray from each file set + SIterInfo iter; // mem buffer skip list iterator + SIterInfo iiter; // imem buffer skip list iterator + SArray* delSkyline; // delete info for this table + int32_t fileDelIndex; // file block delete index + int32_t sttBlockDelIndex; // delete index for last block + bool iterInit; // whether to initialize the in-memory skip list iterator or not } STableBlockScanInfo; typedef struct SResultBlockInfo { @@ -108,7 +119,7 @@ typedef struct STableUidList { typedef struct { int32_t numOfBlocks; - int32_t numOfLastFiles; + int32_t numOfSttFiles; } SBlockNumber; typedef struct SBlockIndex { From b4d14d3847640880ce803a45abd3d35cc3c11012 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Nov 2023 11:04:32 +0800 Subject: [PATCH 03/28] fix(tsdb): fix error during add on-demand opening stt file reader. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index dff0352195..4b5329220b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2356,9 +2356,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && - (pRecord->numRow <= pReader->resBlockInfo.capacity)) { - if (asc || (!hasDataInLastBlock(pLastBlockReader))) { + int64_t cap = pReader->resBlockInfo.capacity; + if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) { + if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) { code = copyBlockDataToSDataBlock(pReader); if (code) { goto _end; @@ -2380,6 +2380,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; + initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader); while (1) { bool hasBlockData = false; From 2cf6b3c1e472714be4265d309ba36371c02c6d8f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 3 Nov 2023 14:01:39 +0800 Subject: [PATCH 04/28] fix: release duplicate msgs in syncSnapBufferRecv --- source/libs/sync/src/syncSnapshot.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8e186af51f..95952c960e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -865,6 +865,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); if (pMsg->seq > pRcvBuf->cursor) { + if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) { + pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]); + } pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; ppMsg[0] = NULL; pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); @@ -1002,7 +1005,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { } if (pMsg->term < raftStoreGetTerm(pSyncNode)) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); + sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term, + pMsg->seq); terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } From de5c4d8d5d82ee01c3ab898323a9fa18cc0bc48c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Nov 2023 15:07:40 +0800 Subject: [PATCH 05/28] other(tsdb): adjust some log info. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 0e15107606..36675a771c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -846,14 +846,14 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) { if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { pInfo->blockData[0].pin = true; ASSERT(!pInfo->blockData[1].pin); - tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id); + tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id); return; } if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { pInfo->blockData[1].pin = true; ASSERT(!pInfo->blockData[0].pin); - tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); + tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); return; } From e13f003b902f1e7055fb33d03f762e3b80f27dd5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Nov 2023 01:23:23 +0800 Subject: [PATCH 06/28] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 40 +++++++++++++++++---------- source/libs/executor/src/executor.c | 2 +- source/libs/stream/src/streamMeta.c | 1 - source/libs/stream/src/streamQueue.c | 3 +- source/libs/stream/src/streamTask.c | 3 +- source/libs/stream/src/streamTaskSm.c | 1 - 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fff58feeb1..969edae30d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1064,7 +1064,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms return code; } -static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { +static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { const char* id = pTask->id.idStr; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; @@ -1105,7 +1105,7 @@ static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) } } -// this function should be executed by only one thread +// this function should be executed by only one thread, so we set an sentinel to protect this function int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1134,6 +1134,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } } + // let's decide which step should be executed now if (pTask->execInfo.step1Start == 0) { ASSERT(pTask->status.pauseAllowed == false); int64_t ts = taosGetTimestampMs(); @@ -1167,14 +1168,28 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - streamScanHistoryData(pTask); + EScanHistoryRet ret = streamScanHistoryData(pTask); + // todo update the step1 exec elapsed time double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) { - int8_t status = streamTaskSetSchedStatusInactive(pTask); - tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); + if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) { + int8_t status = streamTaskSetSchedStatusInactive(pTask); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); + + if (ret == TASK_SCANHISTORY_REXEC) { + streamStartScanHistoryAsync(pTask, 0); + } else { + char* p = NULL; + ETaskStatus s = streamTaskGetStatus(pTask, &p); + + if (s == TASK_STATUS__PAUSE) { + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); + } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { + tqDebug("s-task:%s status:%p not continue scan-history data", pTask->id.idStr, p); + } + } + streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1203,23 +1218,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); if (code == TSDB_CODE_SUCCESS) { - doStartStep2(pTask, pStreamTask, pTq); + doStartFillhistoryStep2(pTask, pStreamTask, pTq); } else { tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); } streamMetaReleaseTask(pMeta, pStreamTask); - } else { STimeWindow* pWindow = &pTask->dataRange.window; ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - // Not update the fill-history time window until the state transfer is completed if the related fill-history task - // exists. - tqDebug( - "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64 - ", window:%" PRId64 " - %" PRId64, - id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); + // Not update the fill-history time window until the state transfer is completed. + tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 + ", window:%" PRId64 " - %" PRId64, + id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); code = streamTaskScanHistoryDataComplete(pTask); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4452eff8c7..9ed7d6e033 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1046,7 +1046,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { } } -bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { +bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.recoverScanFinished; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 76945f17a9..7dfe88e6de 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1104,7 +1104,6 @@ void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); - } void streamMetaWLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wlock", pMeta->vgId); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index eae4605dbc..90e862005b 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -340,10 +340,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } -// the result should be put into the outputQ in any cases, otherwise, the result may be lost +// the result should be put into the outputQ in any cases, the result may be lost otherwise. int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputq.queue->pQueue; + // wait for the output queue is available for new data to dispatch while (streamQueueIsFull(pTask->outputq.queue)) { if (streamTaskShouldStop(pTask)) { stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f949d46315..a1245ba32c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -400,8 +400,7 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); - pTask->outputInfo.pDownstreamUpdateList = NULL; + pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 09656cbe97..04b449aaaf 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -315,7 +315,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); taosThreadMutexUnlock(&pTask->lock); - stDebug("s-task:%s unlockx", pTask->id.idStr); return TSDB_CODE_STREAM_INVALID_STATETRANS; } From c71f6713a78eea4ec6bb3f455c488f3d545d2a44 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Nov 2023 01:24:23 +0800 Subject: [PATCH 07/28] fix(stream): add need update node into update node list. --- source/libs/stream/src/streamStart.c | 60 +++++++++++++++------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 5ebc60dc13..4f757f09fb 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -318,6 +318,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { streamTaskOnHandleEventSuccess(pTask->status.pSM, event); } +static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { + int32_t vgId = pTask->pMeta->vgId; + + taosThreadMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + bool existed = false; + for (int i = 0; i < num; ++i) { + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + if (p->nodeId == nodeId) { + existed = true; + break; + } + } + + if (!existed) { + SDownstreamTaskEpset t = {.nodeId = nodeId}; + taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, + t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + } + + taosThreadMutexUnlock(&pTask->lock); +} + int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; @@ -372,35 +397,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " "not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); - } else { - if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { - stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " - "downstream again, nodeUpdate needed", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); - bool existed = false; - for (int i = 0; i < num; ++i) { - SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); - if (p->nodeId == pRsp->downstreamNodeId) { - existed = true; - break; - } - } - - if (!existed) { - SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId}; - taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); - stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId, - t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); - } - - taosThreadMutexUnlock(&pTask->lock); - return 0; - } + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } else if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + stError( + "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " + "downstream again, nodeUpdate needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); From d440da0d8c8ead2011e5f5bcb4a0a00e8f7fa49a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Nov 2023 01:26:53 +0800 Subject: [PATCH 08/28] enh(stream): execute scan history by using time slice 1s. --- include/libs/executor/executor.h | 2 +- include/libs/stream/tstream.h | 8 +++++- source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/streamExec.c | 44 +++++++++++++++++++---------- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f549f23c7f..d0e2b45937 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -213,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); -bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); +bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2e4204ab34..0605f4b2e5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -775,10 +775,16 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); +typedef enum { + TASK_SCANHISTORY_CONT = 0x1, + TASK_SCANHISTORY_QUIT = 0x2, + TASK_SCANHISTORY_REXEC = 0x3, +} EScanHistoryRet; + // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -int32_t streamScanHistoryData(SStreamTask* pTask); +EScanHistoryRet streamScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 969edae30d..04eb13423d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1178,6 +1178,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); if (ret == TASK_SCANHISTORY_REXEC) { + // todo wait for 100ms and retry streamStartScanHistoryAsync(pTask, 0); } else { char* p = NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 05a62140f9..fa042b5687 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,7 +18,8 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 #define STREAM_RESULT_DUMP_THRESHOLD 300 -#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) +#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data +#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); @@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl } streamDispatchStreamBlock(pTask); - return code; } - return 0; + return code; } static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, @@ -187,12 +187,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -int32_t streamScanHistoryData(SStreamTask* pTask) { +EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; + int64_t st = taosGetTimestampMs(); qSetStreamOpOpen(exec); @@ -200,13 +201,14 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { if (streamTaskShouldPause(pTask)) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - break; + return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2 } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, tstrerror(terrno)); + continue; } int32_t size = 0; @@ -214,26 +216,26 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { while (1) { if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return 0; + return TASK_SCANHISTORY_QUIT; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); - taosMsleep(10000); - continue; + stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return TASK_SCANHISTORY_REXEC; } SSDataBlock* output = NULL; uint64_t ts = 0; code = qExecTask(exec, &output, &ts); if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { - stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); + stError("s-task:%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); continue; } // the generated results before fill-history task been paused, should be dispatched to sink node if (output == NULL) { - finished = qStreamRecoverScanFinished(exec); + finished = qStreamScanhistoryFinished(exec); break; } @@ -243,8 +245,9 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { taosArrayPush(pRes, &block); size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; - if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); @@ -256,14 +259,25 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { - return code; + terrno = code; + stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); + return TASK_SCANHISTORY_REXEC; } } else { taosArrayDestroy(pRes); } + + int64_t el = taosGetTimestampMs() - st; + if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { + stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, + pTask->info.fillHistory, pTask->info.taskLevel); + + // todo exec scanhistory in 100ms + return TASK_SCANHISTORY_REXEC; + } } - return 0; + return TASK_SCANHISTORY_CONT; } // wait for the stream task to be idle From 8f1e3197b2e91f4f04504e37191fd884f605b243 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 6 Nov 2023 17:19:09 +0800 Subject: [PATCH 09/28] enh: print lastApplyTerm of snap sender in format of int64 --- source/libs/sync/src/syncUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 9e6ea94e78..06847c081c 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -268,7 +268,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla taosPrintLog(flags, level, dflag, "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 - " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 + " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64 ", seq:%d, ack:%d, " " buf:[%" PRId64 " %" PRId64 ", %" PRId64 "), finish:%d, as:%d, to-dnode:%d}" From d574b970153511958d0e40f9f5f3b98890f7c649 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Nov 2023 11:03:42 +0800 Subject: [PATCH 10/28] fix: tbname can not contain dot --- source/libs/parser/src/parInsertSql.c | 14 +++++++++----- tests/script/tsim/insert/insert_stb.sim | 1 + 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index f50655f8e0..2b8516d37b 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1584,8 +1584,7 @@ typedef union SRowsDataContext{ SStbRowsDataContext* pStbRowsCxt; } SRowsDataContext; -static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, - char* ctbName, bool* pFoundCtbName) { +static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) { *pFoundCtbName = false; int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); if (code == TSDB_CODE_SUCCESS){ @@ -1595,7 +1594,13 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* if (pToken->n > 0) { if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) { - memcpy(pStbRowsCxt->ctbName.tname, pToken->z, pToken->n); + for (int i = 0; i < pToken->n; ++i) { + if (pToken->z[i] == '.') { + return buildInvalidOperationMsg(&pCxt->msg, "tbname can not contain '.'"); + } else { + pStbRowsCxt->ctbName.tname[i] = pToken->z[i]; + } + } pStbRowsCxt->ctbName.tname[pToken->n] = '\0'; *pFoundCtbName = true; } else { @@ -1677,8 +1682,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* } } else if (pCols->pColIndex[i] == tbnameIdx) { - char ctbName[TSDB_TABLE_NAME_LEN]; - code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName); + code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName); } if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) { diff --git a/tests/script/tsim/insert/insert_stb.sim b/tests/script/tsim/insert/insert_stb.sim index 5d9e244565..4c0797e2a7 100644 --- a/tests/script/tsim/insert/insert_stb.sim +++ b/tests/script/tsim/insert/insert_stb.sim @@ -70,6 +70,7 @@ sql_error insert into d2.st values(now, 1, 1) sql_error insert into d2.st(ts, f) values(now, 1); sql_error insert into d2.st(ts, f, tbname) values(now, 1); sql_error insert into d2.st(ts, f, tbname) values(now, 1, ''); +sql_error insert into d2.st(ts, f, tbname) values(now, 1, 'd2.ct2'); sql_error insert into d2.st(ts, tbname) values(now, 1, 34) sql_error insert into st using st2 tags(2) values(now,1); system sh/exec.sh -n dnode1 -s stop -x SIGINT From 433d3b73540cb570deec2d80688091884234941c Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Nov 2023 15:47:36 +0800 Subject: [PATCH 11/28] fix: select tags const from table --- source/libs/planner/src/planLogicCreater.c | 2 +- tests/script/tsim/query/tag_scan.sim | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 222aec9813..e0d154a130 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -266,7 +266,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols if (NULL == pScanCols) { if (NULL == pScanPseudoCols) { - return SCAN_TYPE_TABLE; + return (!tagScan) ? SCAN_TYPE_TABLE : SCAN_TYPE_TAG; } return FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType ? SCAN_TYPE_BLOCK_INFO diff --git a/tests/script/tsim/query/tag_scan.sim b/tests/script/tsim/query/tag_scan.sim index 7404bd37fe..ee56a214a5 100644 --- a/tests/script/tsim/query/tag_scan.sim +++ b/tests/script/tsim/query/tag_scan.sim @@ -164,4 +164,9 @@ sql select count(*) from (select tags ts from stb34) if $data00 != 2 then return -1 endi + +sql select tags 2 from stb34 +if $rows != 1 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT From e38e526dda0fe57f94b25124182f4105feab1a0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 13:17:13 +0800 Subject: [PATCH 12/28] fix(stream):reset the starting flag for the followers. --- include/libs/stream/tstream.h | 4 +- source/dnode/vnode/src/tq/tqStreamTask.c | 65 +++++++++++++++++++++++- source/libs/stream/inc/streamInt.h | 2 - source/libs/stream/src/streamMeta.c | 6 +-- 4 files changed, 69 insertions(+), 8 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0605f4b2e5..a33a259ef7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -432,7 +432,8 @@ struct SStreamTask { typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; - int32_t startAllTasksFlag; + int32_t tasksWillRestart; + int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; @@ -816,6 +817,7 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); +void streamMetaResetStartInfo(STaskStartInfo* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2d94f23009..014a804abc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -115,7 +115,70 @@ int32_t tqStartStreamTask(STQ* pTq) { return code; } -int32_t tqLaunchStreamTaskAsync(STQ* pTq) { +int32_t tqRestartStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + int64_t el = taosGetTimestampMs() - st; + + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + + code = streamMetaLoadAllTasks(pTq->pStreamMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqResetStreamTaskStatus(pTq); + + streamMetaWUnLock(pMeta); + tqStartStreamTasks(pTq); + } else { + streamMetaResetStartInfo(&pMeta->startInfo); + streamMetaWUnLock(pMeta); + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + + code = terrno; + return code; +} + +int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c63b51d745..09d26119f1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -132,8 +132,6 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -void streamMetaResetStartInfo(STaskStartInfo* pMeta); - SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueProcessSuccess(SStreamQueue* queue); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7dfe88e6de..806ae0986b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -228,12 +228,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } int32_t streamMetaReopen(SStreamMeta* pMeta) { - // backup the restart flag - int32_t restartFlag = pMeta->startInfo.startAllTasksFlag; streamMetaClear(pMeta); - pMeta->startInfo.startAllTasksFlag = restartFlag; - // NOTE: role should not be changed during reopen meta pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; @@ -1095,6 +1091,8 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); pStartInfo->startAllTasksFlag = 0; pStartInfo->readyTs = 0; + // reset the sentinel flag value to be 0 + atomic_store_32(&pStartInfo->taskStarting, 0); } void streamMetaRLock(SStreamMeta* pMeta) { From 7370daa6e0a6730f381533aeb11d7d3b81dbc423 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 15:45:44 +0800 Subject: [PATCH 13/28] fix(stream): record the start failure tasks. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/tq/tqStreamTask.c | 3 +- source/libs/stream/src/streamMeta.c | 16 +++++- source/libs/stream/src/streamStart.c | 73 +++++++++++++++++------- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a33a259ef7..8eda5de7cb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -434,7 +434,8 @@ typedef struct STaskStartInfo { int64_t readyTs; int32_t tasksWillRestart; int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. - SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed int32_t elapsedTime; } STaskStartInfo; @@ -812,7 +813,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 014a804abc..2370890006 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -73,6 +73,7 @@ int32_t tqStartStreamTask(STQ* pTq) { streamMetaWLock(pMeta); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); pMeta->startInfo.startTs = taosGetTimestampMs(); streamMetaWUnLock(pMeta); @@ -97,7 +98,7 @@ int32_t tqStartStreamTask(STQ* pTq) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskReadyInfo(pTask); + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 806ae0986b..17cd9fac57 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -150,6 +150,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pReadyTaskSet == NULL) { + goto _err; + } + + pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); + if (pMeta->startInfo.pFailedTaskSet == NULL) { + goto _err; } pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); @@ -221,6 +227,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); + if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); taosMemoryFree(pMeta); stError("failed to open stream meta"); @@ -298,7 +305,10 @@ void streamMetaClear(SStreamMeta* pMeta) { pMeta->numOfPausedTasks = 0; pMeta->chkptNotReadyTasks = 0; - streamMetaResetStartInfo(&pMeta->startInfo); + // the willrestart/starting flag can NOT be cleared + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.readyTs = 0; } void streamMetaClose(SStreamMeta* pMeta) { @@ -338,6 +348,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); + taosHashCleanup(pMeta->startInfo.pFailedTaskSet); taosMemoryFree(pMeta->pHbInfo); taosMemoryFree(pMeta->path); @@ -1089,7 +1100,8 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { taosHashClear(pStartInfo->pReadyTaskSet); - pStartInfo->startAllTasksFlag = 0; + taosHashClear(pStartInfo->pFailedTaskSet); + pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; // reset the sentinel flag value to be 0 atomic_store_32(&pStartInfo->taskStarting, 0); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 4f757f09fb..fdad5124ed 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -57,7 +57,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskReadyInfo(pTask); + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -392,20 +392,22 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs doProcessDownstreamReadyRsp(pTask); } } else { // not ready, wait for 100ms and retry - if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { - stError( - "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " - "not check wait for downstream task nodeUpdate, and all tasks restart", - id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { + stError( + "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " + "not check wait for downstream task nodeUpdate, and all tasks restart", + id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + } else { + stError( + "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " + "downstream again, nodeUpdate needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); - } else if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { - stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " - "downstream again, nodeUpdate needed", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); - addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -981,28 +983,59 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { + int32_t vgId = pMeta->vgId; + void* pIter = NULL; + size_t keyLen = 0; + + stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet), + succ ? "success" : "failed"); + + while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { + STaskInitTs* pInfo = pIter; + void* key = taosHashGetKey(pIter, &keyLen); + + SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } +} + +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { SStreamMeta* pMeta = pTask->pMeta; streamMetaWLock(pMeta); STaskId id = streamTaskExtractKey(pTask); - taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); + STaskStartInfo* pStartInfo = &pMeta->startInfo; + + SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; + + STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { - STaskStartInfo* pStartInfo = &pMeta->startInfo; - + if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { pStartInfo->readyTs = pTask->execInfo.start; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - streamMetaResetStartInfo(pStartInfo); - - stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); + + // print the initialization elapsed time and info + displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); + displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); + + streamMetaResetStartInfo(pStartInfo); } streamMetaWUnLock(pMeta); From 8c2f6bab0e60a6b364314c699466e38b5c516b5a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 18:55:53 +0800 Subject: [PATCH 14/28] fix(stream): exec scan-history in timer. --- include/libs/stream/tstream.h | 43 ++++++++++++------- source/dnode/vnode/src/tq/tq.c | 13 +++--- source/dnode/vnode/src/tq/tqSink.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 7 ++-- source/libs/stream/inc/streamInt.h | 3 ++ source/libs/stream/src/streamDispatch.c | 1 - source/libs/stream/src/streamExec.c | 16 ++++--- source/libs/stream/src/streamStart.c | 53 ++++++++++++++++++++++-- 8 files changed, 102 insertions(+), 36 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8eda5de7cb..a063f39d92 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -241,6 +241,24 @@ typedef struct { SEpSet epset; } SDownstreamTaskEpset; +typedef enum { + TASK_SCANHISTORY_CONT = 0x1, + TASK_SCANHISTORY_QUIT = 0x2, + TASK_SCANHISTORY_REXEC = 0x3, +} EScanHistoryRet; + +typedef struct { + EScanHistoryRet ret; + int32_t idleTime; +} SScanhistoryDataInfo; + +typedef struct { + int32_t idleDuration; // idle time before use time slice the continue execute scan-history + int32_t numOfTicks; + tmr_h pTimer; + int32_t execCount; +} SScanhistorySchedInfo; + typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; @@ -378,9 +396,10 @@ typedef struct STaskOutputInfo { union { STaskDispatcherFixed fixedDispatcher; STaskDispatcherShuffle shuffleDispatcher; - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; + + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; }; int8_t type; STokenBucket* pTokenBucket; @@ -414,7 +433,10 @@ struct SStreamTask { SStreamState* pState; // state backend SArray* pRspMsgList; SUpstreamInfo upstreamInfo; + // the followings attributes don't be serialized + SScanhistorySchedInfo schedHistoryInfo; + int32_t notReadyTasks; int32_t numOfWaitingUpstream; int64_t checkReqId; @@ -734,8 +756,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t onNormalTaskReady(SStreamTask* pTask); -int32_t onScanhistoryTaskReady(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); @@ -757,7 +777,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); +int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); + int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common @@ -777,21 +799,14 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); -typedef enum { - TASK_SCANHISTORY_CONT = 0x1, - TASK_SCANHISTORY_QUIT = 0x2, - TASK_SCANHISTORY_REXEC = 0x3, -} EScanHistoryRet; - // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -EScanHistoryRet streamScanHistoryData(SStreamTask* pTask); +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, - SRpcHandleInfo* pRpcInfo); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 04eb13423d..4bdb7a2032 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1105,6 +1105,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask } } +static void ddxx() { + +} + // this function should be executed by only one thread, so we set an sentinel to protect this function int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -1168,18 +1172,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - EScanHistoryRet ret = streamScanHistoryData(pTask); + SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask); // todo update the step1 exec elapsed time double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; - if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) { + if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) { int8_t status = streamTaskSetSchedStatusInactive(pTask); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); - if (ret == TASK_SCANHISTORY_REXEC) { - // todo wait for 100ms and retry - streamStartScanHistoryAsync(pTask, 0); + if (retInfo.ret == TASK_SCANHISTORY_REXEC) { + streamReExecScanHistoryFuture(pTask, retInfo.idleTime); } else { char* p = NULL; ETaskStatus s = streamTaskGetStatus(pTask, &p); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 51d51ebbef..4b64737936 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,7 +25,7 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; -static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); +static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2370890006..52e67ef456 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -22,6 +22,8 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); +static bool taskReadyForDataFromWal(SStreamTask* pTask); +static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -384,14 +386,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { return false; } -static bool taskReadyForDataFromWal(SStreamTask* pTask) { +bool taskReadyForDataFromWal(SStreamTask* pTask) { // non-source or fill-history tasks don't need to response the WAL scan action. if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { return false; } // not in ready state, do not handle the data from wal -// int32_t status = pTask->status.taskStatus; char* p = NULL; int32_t status = streamTaskGetStatus(pTask, &p); if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { @@ -423,7 +424,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 09d26119f1..aa9b26381e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -140,6 +140,9 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +int32_t onNormalTaskReady(SStreamTask* pTask); +int32_t onScanhistoryTaskReady(SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index edfc66762d..5665e7a917 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, info.msg.info = *pRpcInfo; taosThreadMutexLock(&pTask->lock); - stDebug("s-task:%s lock", pTask->id.idStr); if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fa042b5687..d81a5f0c2f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -187,7 +187,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS; @@ -201,7 +201,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (streamTaskShouldPause(pTask)) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2 + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2 } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -216,13 +216,13 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { while (1) { if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return TASK_SCANHISTORY_QUIT; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } SSDataBlock* output = NULL; @@ -261,7 +261,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { terrno = code; stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } else { taosArrayDestroy(pRes); @@ -271,13 +271,11 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, pTask->info.fillHistory, pTask->info.taskLevel); - - // todo exec scanhistory in 100ms - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } - return TASK_SCANHISTORY_CONT; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};; } // wait for the stream task to be idle diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index fdad5124ed..65a14aff6b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -19,6 +19,10 @@ #include "wal.h" #include "streamsm.h" +#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms +#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec +#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) + typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; @@ -81,6 +85,50 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } +static void doReExecScanhistory(void* param, void* tmrId) { + SStreamTask* pTask = param; + pTask->schedHistoryInfo.numOfTicks -= 1; + + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref); + } + + if (pTask->schedHistoryInfo.numOfTicks <= 0) { + streamStartScanHistoryAsync(pTask, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, + pTask->info.fillHistory, ref); + } else { + taosTmrReset(doReExecScanhistory, 100, pTask, NULL, pTask->schedHistoryInfo.pTimer); + } +} + +int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) { + int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; + if (numOfTicks <= 0) { + numOfTicks = 1; + } else if (numOfTicks > SCANHISTORY_IDLE_TICK) { + numOfTicks = SCANHISTORY_IDLE_TICK; + } + + pTask->schedHistoryInfo.numOfTicks = numOfTicks; + + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); + + if (pTask->schedHistoryInfo.pTimer == NULL) { + pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); + } else { + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, pTask->schedHistoryInfo.pTimer); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t doStartScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { @@ -684,9 +732,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t hTaskId = pHTaskInfo->id.taskId; streamTaskGetStatus(pTask, &p); - stDebug( - "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", - pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); + stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); From 9125fe44c6ba94cb2e2d75d5f7e4d530ff016c89 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 19:46:47 +0800 Subject: [PATCH 15/28] fix(stream): fix error. --- source/libs/stream/src/streamStart.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 65a14aff6b..e782ec84ba 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -103,7 +103,8 @@ static void doReExecScanhistory(void* param, void* tmrId) { stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, pTask->info.fillHistory, ref); } else { - taosTmrReset(doReExecScanhistory, 100, pTask, NULL, pTask->schedHistoryInfo.pTimer); + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, + &pTask->schedHistoryInfo.pTimer); } } @@ -123,7 +124,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) if (pTask->schedHistoryInfo.pTimer == NULL) { pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, pTask->schedHistoryInfo.pTimer); + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer); } return TSDB_CODE_SUCCESS; From c9a1cc50fb015aac73215fbe77df1e59421050ef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 19:49:25 +0800 Subject: [PATCH 16/28] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index e782ec84ba..9044e65eb6 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -34,6 +34,12 @@ typedef struct STaskRecheckInfo { void* checkTimer; } STaskRecheckInfo; +typedef struct STaskInitTs { + int64_t start; + int64_t end; + bool success; +} STaskInitTs; + static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); @@ -1031,12 +1037,6 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -typedef struct STaskInitTs { - int64_t start; - int64_t end; - bool success; -} STaskInitTs; - static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { int32_t vgId = pMeta->vgId; void* pIter = NULL; From 1301b92844163e660d815094dcb855ac834edd54 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 16:19:09 +0800 Subject: [PATCH 17/28] fix(stream): fix errors in scan-history, introduced by refactor --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 19 ++-- source/libs/stream/src/streamExec.c | 151 ++++++++++++++++------------ 3 files changed, 100 insertions(+), 72 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a063f39d92..2f80b9787b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -802,7 +802,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask); +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4bdb7a2032..bb4d7d42b9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1152,7 +1152,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } } else { if (pTask->execInfo.step2Start == 0) { - tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start); + tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs", + id, pTask->execInfo.step1Start, pTask->execInfo.step1El); } else { tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1172,10 +1173,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask); + int64_t st = taosGetTimestampMs(); + SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st); - // todo update the step1 exec elapsed time - double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; + double el = (taosGetTimestampMs() - st) / 1000.0; + pTask->execInfo.step1El += el; if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -1188,9 +1190,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ETaskStatus s = streamTaskGetStatus(pTask, &p); if (s == TASK_STATUS__PAUSE) { - tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, + el, pTask->execInfo.step1El, status); } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { - tqDebug("s-task:%s status:%p not continue scan-history data", pTask->id.idStr, p); + tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p, + pTask->execInfo.step1El); } } @@ -1199,7 +1203,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // the following procedure should be executed, no matter status is stop/pause or not - tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); + tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El); if (pTask->info.fillHistory) { SStreamTask* pStreamTask = NULL; @@ -1508,6 +1512,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, streamSchedExec(pTask); } } else if (status == TASK_STATUS__UNINIT) { + // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, event); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d81a5f0c2f..a6101b0932 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -187,95 +187,118 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); +static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { + int32_t code = TSDB_CODE_SUCCESS; + if (taosArrayGetSize(pRes) > 0) { + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); + code = doOutputResultBlockImpl(pTask, pStreamBlocks); + if (code != TSDB_CODE_SUCCESS) { + stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); + } + } else { + taosArrayDestroy(pRes); + } + return code; +} +static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) { int32_t code = TSDB_CODE_SUCCESS; + void* exec = pTask->exec.pExecutor; + int32_t numOfBlocks = 0; + + while (1) { + if (streamTaskShouldStop(pTask)) { + break; + } + + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel); + break; + } + + SSDataBlock* output = NULL; + uint64_t ts = 0; + code = qExecTask(exec, &output, &ts); + if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { + stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr, + tstrerror(code)); + continue; + } + + // the generated results before fill-history task been paused, should be dispatched to sink node + if (output == NULL) { + (*pFinish) = qStreamScanhistoryFinished(exec); + break; + } + + SSDataBlock block = {0}; + assignOneDataBlock(&block, output); + block.info.childId = pTask->info.selfChildId; + taosArrayPush(pRes, &block); + + (*pSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; + + if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { + stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", + pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD, + SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); + break; + } + } +} + +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + void* exec = pTask->exec.pExecutor; bool finished = false; - int64_t st = taosGetTimestampMs(); qSetStreamOpOpen(exec); - while (!finished) { + while (1) { if (streamTaskShouldPause(pTask)) { - double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; - stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2 + stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); + // quit from step1, not continue to handle the step2 + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, tstrerror(terrno)); + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, + tstrerror(terrno)); continue; } int32_t size = 0; - int32_t numOfBlocks = 0; - while (1) { - if (streamTaskShouldStop(pTask)) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; - } + streamScanHistoryDataImpl(pTask, pRes, &size, &finished); - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; - } - - SSDataBlock* output = NULL; - uint64_t ts = 0; - code = qExecTask(exec, &output, &ts); - if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { - stError("s-task:%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); - continue; - } - - // the generated results before fill-history task been paused, should be dispatched to sink node - if (output == NULL) { - finished = qStreamScanhistoryFinished(exec); - break; - } - - SSDataBlock block = {0}; - assignOneDataBlock(&block, output); - block.info.childId = pTask->info.selfChildId; - taosArrayPush(pRes, &block); - - size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); - numOfBlocks += 1; - - if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached", - pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD, - SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD)); - break; - } + if(streamTaskShouldStop(pTask)) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } - if (taosArrayGetSize(pRes) > 0) { - SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); - code = doOutputResultBlockImpl(pTask, pStreamBlocks); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; - } - } else { - taosArrayDestroy(pRes); - } + // dispatch the generated results + int32_t code = handleResultBlocks(pTask, pRes, size); int64_t el = taosGetTimestampMs() - st; + + // downstream task input queue is full, try in 5sec + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; + } + + if (finished) { + return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; + } + if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { - stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, - pTask->info.fillHistory, pTask->info.taskLevel); + stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", + pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } - - return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};; } // wait for the stream task to be idle @@ -285,7 +308,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { int64_t st = taosGetTimestampMs(); while (!streamTaskIsIdle(pStreamTask)) { stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, - pStreamTask->id.idStr); + pStreamTask->id.idStr); taosMsleep(100); } From ed0c580d1173f3c3d66c91f9025e69214745fa4c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:04:55 +0800 Subject: [PATCH 18/28] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamQueue.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index aa9b26381e..4a7d3a2a05 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -127,7 +127,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 90e862005b..e004c9a403 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -374,7 +374,8 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc return TSDB_CODE_SUCCESS; } -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) { +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, + const char* id) { if (numCap < 10 || numRate < 10 || pBucket == NULL) { stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); return TSDB_CODE_INVALID_PARA; @@ -389,6 +390,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->quotaRemain = pBucket->quotaCapacity; pBucket->fillTimestamp = taosGetTimestampMs(); + stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); return TSDB_CODE_SUCCESS; } From 884770dab1f800d800038bbb2f3a82794fec71ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:11:36 +0800 Subject: [PATCH 19/28] fix(stream): fix syntax error. --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a1245ba32c..a7fb590d1b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -446,7 +446,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); From 2bdde122fcd2bdc37d1a9717b3173781160ea86d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:18:58 +0800 Subject: [PATCH 20/28] refactor: wait a while when no token available. --- source/libs/stream/src/streamQueue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index e004c9a403..239ab883ae 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -159,7 +159,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { - stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id); + stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); + taosMsleep(10); return TSDB_CODE_SUCCESS; } From 0979ff944b6627aaa63b3a9efb5938109e7b022a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:33:30 +0800 Subject: [PATCH 21/28] fix(stream): update the fill-time for quota limitation. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 239ab883ae..22a1c22be4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(10); +// taosMsleep(10); return TSDB_CODE_SUCCESS; } @@ -410,10 +410,11 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { double incSize = (delta / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); + pBucket->fillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 + stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle for %.2f Sec, %s", pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id); } From 26b96b149d5771c54e45a7fab1500528dcfd3819 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:49:49 +0800 Subject: [PATCH 22/28] refactor: wait for a while when no quota available. --- source/libs/stream/src/streamQueue.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 22a1c22be4..63ee702ada 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); -// taosMsleep(10); + taosMsleep(10); return TSDB_CODE_SUCCESS; } @@ -414,9 +414,8 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { } if (incNum > 0 || incSize > 0) { - stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 - " idle for %.2f Sec, %s", - pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id); + stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", + pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); } } From 73d75aac25e234f2c3aa7873a99998f8850170b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 18:18:58 +0800 Subject: [PATCH 23/28] fix(stream): add ref for task. --- source/libs/stream/src/streamStart.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9044e65eb6..18a8106443 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -98,8 +98,10 @@ static void doReExecScanhistory(void* param, void* tmrId) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + streamMetaReleaseTask(pTask->pMeta, pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref); + return; } if (pTask->schedHistoryInfo.numOfTicks <= 0) { @@ -108,6 +110,9 @@ static void doReExecScanhistory(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, pTask->info.fillHistory, ref); + + // release the task. + streamMetaReleaseTask(pTask->pMeta, pTask); } else { taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer); @@ -122,6 +127,10 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) numOfTicks = SCANHISTORY_IDLE_TICK; } + // add ref for task + SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + ASSERT(p != NULL); + pTask->schedHistoryInfo.numOfTicks = numOfTicks; int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); From 2674698b36dce7f92cdb63a6fa90b2bc07b4dd56 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 11:57:21 +0800 Subject: [PATCH 24/28] fix(stream): restart tasks in stream threads, instead of write thread. --- source/dnode/vnode/src/inc/tq.h | 7 +--- source/dnode/vnode/src/inc/vnodeInt.h | 7 +++- source/dnode/vnode/src/tq/tq.c | 51 +++++------------------- source/dnode/vnode/src/tq/tqStreamTask.c | 15 +++---- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- 5 files changed, 25 insertions(+), 59 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 76d89be802..675bfa334a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -43,9 +43,9 @@ extern "C" { typedef struct STqOffsetStore STqOffsetStore; -// tqPush #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec @@ -156,9 +156,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); -int32_t tqScanWal(STQ* pTq); -int32_t tqStartStreamTask(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 89d70bfabb..2a9510f6ad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqLaunchStreamTaskAsync(STQ* pTq); + +int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); +int32_t tqRestartStreamTasks(STQ* pTq); +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); +int32_t tqScanWal(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bb4d7d42b9..1c1a4a192c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1317,14 +1317,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; int32_t vgId = TD_VID(pTq->pVnode); - if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) { - tqStartStreamTask(pTq); - return 0; - } - if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; + } else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + tqStartStreamTasks(pTq); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + tqRestartStreamTasks(pTq); + return 0; } SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); @@ -1911,7 +1912,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - pMeta->startInfo.startAllTasksFlag = 1; + pMeta->startInfo.tasksWillRestart = 1; if (updateTasks < numOfTasks) { tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, @@ -1920,45 +1921,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->startInfo.startAllTasksFlag = 0; + pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { - tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId); - terrno = 0; - - streamMetaWUnLock(pMeta); - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqResetStreamTaskStatus(pTq); - tqLaunchStreamTaskAsync(pTq); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); + tqStartStreamTaskAsync(pTq, true); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 52e67ef456..5f7a33499b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -60,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) { return 0; } -int32_t tqStartStreamTask(STQ* pTq) { +int32_t tqStartStreamTasks(STQ* pTq) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -125,7 +125,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t st = taosGetTimestampMs(); while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); if (startVal == 0) { break; } @@ -155,7 +155,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); code = streamMetaLoadAllTasks(pTq->pStreamMeta); if (code != TSDB_CODE_SUCCESS) { @@ -168,15 +168,12 @@ int32_t tqRestartStreamTasks(STQ* pTq) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); - - streamMetaWUnLock(pMeta); tqStartStreamTasks(pTq); } else { - streamMetaResetStartInfo(&pMeta->startInfo); - streamMetaWUnLock(pMeta); tqInfo("vgId:%d, follower node not start stream tasks", vgId); } + streamMetaWUnLock(pMeta); code = terrno; return code; } @@ -198,10 +195,10 @@ int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) { return -1; } - tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); + tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4a0c987e57..3944f8ed91 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -557,7 +557,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; streamMetaWLock(pMeta); - if (pMeta->startInfo.startAllTasksFlag) { + if (pMeta->startInfo.tasksWillRestart) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); streamMetaWUnLock(pMeta); return; @@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqResetStreamTaskStatus(pVnode->pTq); - tqLaunchStreamTaskAsync(pVnode->pTq); + tqStartStreamTaskAsync(pVnode->pTq, false); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); From e558d8e497bcf9735aec2a2b51504503ebda2ea6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 18:53:21 +0800 Subject: [PATCH 25/28] fix: fix syntax error. --- include/libs/stream/tstream.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2f80b9787b..eab3ecf04e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -372,7 +372,9 @@ typedef struct STaskExecStatisInfo { int64_t init; int64_t start; int64_t step1Start; + double step1El; int64_t step2Start; + double step2El; int32_t updateCount; int64_t latestUpdateTs; int32_t processDataBlocks; From 0f983698018fc6d6d5adb548ab025524736471ed Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 7 Nov 2023 19:39:30 +0800 Subject: [PATCH 26/28] fix: alter local keepAliveIdle convert type --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cb67fc1ba3..e3ccf93940 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1276,7 +1276,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { if (strcasecmp("keepColumnName", name) == 0) { tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; } else if (strcasecmp("keepAliveIdle", name) == 0) { - tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->bval; + tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; } else { matchItem = false; } From 7bfea5a82e69b596f23c0048d174e519b550dec1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 22:31:35 +0800 Subject: [PATCH 27/28] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 5f7a33499b..dde8614867 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -125,7 +125,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t st = taosGetTimestampMs(); while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); if (startVal == 0) { break; } From e052d3cd625050b7364e89a42520d03bce113360 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Nov 2023 00:10:08 +0800 Subject: [PATCH 28/28] fix(stream): check null. --- source/dnode/vnode/src/tq/tqStreamTask.c | 1 - source/libs/stream/src/streamStart.c | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index dde8614867..26849f8578 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -511,7 +511,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); - tqDebug("s-task:%s lock", pTask->id.idStr); char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 18a8106443..e672b256da 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1059,8 +1059,12 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) void* key = taosHashGetKey(pIter, &keyLen); SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + if (pTask1 == NULL) { + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + } else { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, + (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + } } }