From 77961ea79102b7b318dc29038a559ea8392f69e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 09:45:42 +0800 Subject: [PATCH 1/6] fix(stream): handle disorder node Update trans Id. --- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 21 +++++++++++++++------ source/libs/stream/src/streamStart.c | 1 - 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ba96dc0adf..d34e23c0ba 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch int32_t tsTransPullupInterval = 2; int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointInterval = 60; +int32_t tsStreamCheckpointInterval = 300; float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ee4f5366d6..04c0c0d204 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -195,13 +195,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM const char* idstr = pTask->id.idStr; if (pMeta->updateInfo.transId != req.transId) { - ASSERT(req.transId > pMeta->updateInfo.transId); - tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); + if (req.transId < pMeta->updateInfo.transId) { + tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, + pMeta->updateInfo.transId, req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = req.transId; + taosArrayDestroy(req.pNodeList); + return rsp.code; + } else { + tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, + vgId, req.transId, pMeta->updateInfo.transId); + + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = req.transId; + } } else { tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 1f6c5add42..7b8e6e2129 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -432,7 +432,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskId* pId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, From fae53efed9cb0beb42f4168cb8b0599164bcef7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 09:58:20 +0800 Subject: [PATCH 2/6] fix(stream): add some logs for retry for notready/timeout downstream tasks. and do some internal refactor. --- include/libs/stream/tstream.h | 4 +++- source/libs/stream/src/streamMeta.c | 6 +++--- source/libs/stream/src/streamStart.c | 8 ++++---- source/libs/stream/src/streamTask.c | 21 ++++++++++++++------- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b30fdcb01..565f6b5938 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -424,7 +424,7 @@ typedef struct STaskOutputInfo { }; int8_t type; STokenBucket* pTokenBucket; - SArray* pDownstreamUpdateList; + SArray* pNodeEpsetUpdateList; } STaskOutputInfo; typedef struct SUpstreamInfo { @@ -445,6 +445,8 @@ typedef struct STaskCheckInfo { int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; + int32_t notReadyRetryCount; + int32_t timeoutRetryCount; tmr_h checkRspTmr; TdThreadMutex checkInfoLock; } STaskCheckInfo; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f7f790fbe7..a464594233 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1073,9 +1073,9 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j); + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); bool exist = existInHbMsg(pMsg, pTaskEpset); if (!exist) { @@ -1085,7 +1085,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { } } - taosArrayClear(pTask->outputInfo.pDownstreamUpdateList); + taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); taosThreadMutexUnlock(&pTask->lock); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 7b8e6e2129..90d0de5a0e 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -356,10 +356,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); bool existed = false; for (int i = 0; i < num; ++i) { - SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); if (p->nodeId == nodeId) { existed = true; break; @@ -368,10 +368,10 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { if (!existed) { SDownstreamTaskEpset t = {.nodeId = nodeId}; - taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, - t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + t.nodeId, (num + 1)); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5849e1f00e..0827b5afe4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -470,7 +470,7 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree(pTask->outputInfo.pTokenBucket); taosThreadMutexDestroy(&pTask->lock); - pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); + pTask->outputInfo.pNodeEpsetUpdateList = taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList); taosMemoryFree(pTask); stDebug("s-task:0x%x free task completed", taskId); @@ -571,8 +571,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); - pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); - if (pOutputInfo->pDownstreamUpdateList == NULL) { + pOutputInfo->pNodeEpsetUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); + if (pOutputInfo->pNodeEpsetUpdateList == NULL) { stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -1098,8 +1098,11 @@ static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; - taosArrayClear(pInfo->pList); + pInfo->notReadyRetryCount = 0; + pInfo->timeoutRetryCount = 0; + + taosArrayClear(pInfo->pList); return 0; } @@ -1292,11 +1295,13 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady); + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); } + // todo add into node update list and send to mnode if (numOfTimeout > 0) { - pInfo->startTs = now; ASSERT(pTask->status.downstreamReady == 0); for (int32_t i = 0; i < numOfTimeout; ++i) { @@ -1309,7 +1314,9 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); + pInfo->timeoutRetryCount += 1; + stDebug("s-task:%s %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); } taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); From ae4e2bf67133fbaeabbfe0eb69930b2d2fc98cbb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 10:00:54 +0800 Subject: [PATCH 3/6] fix(stream): suppress some warnings by ide. --- source/client/test/clientTests.cpp | 6 +++++- source/libs/stream/src/streamTask.c | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b534671acb..1498476634 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -827,7 +827,11 @@ TEST(clientCase, projection_query_tables) { // } // taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use cache_1"); + TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'"); + if (taos_errno(pRes) != 0) { + printf("failed to exec query, %s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1"); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0827b5afe4..962cadfbd6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -261,8 +261,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &taskId)) return -1; pTask->streamTaskId.taskId = taskId; - if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; - if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; From 815489d20ad2b3768264e0f774c6ac2d0e63d9c6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 11:10:20 +0800 Subject: [PATCH 4/6] fix(tsdb): set the pk info when converting fileblockinfo to binrecord. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 69 +++++++++++++++---------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8a569e0260..cf95dd3f6f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1054,14 +1054,30 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo } } -static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) { +static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo, SBlockLoadSuppInfo* pSupp) { record->uid = pBlockInfo->uid; - record->firstKey = (STsdbRowKey){ - .key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0}, - }; - record->lastKey = (STsdbRowKey){ - .key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0}, - }; + record->firstKey = (STsdbRowKey){.key = {.ts = pBlockInfo->firstKey, .numOfPKs = pSupp->numOfPks}}; + record->lastKey = (STsdbRowKey){.key = {.ts = pBlockInfo->lastKey, .numOfPKs = pSupp->numOfPks}}; + + if (pSupp->numOfPks > 0) { + SValue* pFirst = &record->firstKey.key.pks[0]; + SValue* pLast = &record->lastKey.key.pks[0]; + + pFirst->type = pSupp->pk.type; + pLast->type = pSupp->pk.type; + + if (IS_VAR_DATA_TYPE(pFirst->type)) { + pFirst->pData = (uint8_t*) varDataVal(pBlockInfo->firstPk.pData); + pFirst->nData = varDataLen(pBlockInfo->firstPk.pData); + + pLast->pData = (uint8_t*) varDataVal(pBlockInfo->lastPk.pData); + pLast->nData = varDataLen(pBlockInfo->lastPk.pData); + } else { + pFirst->val = pBlockInfo->firstPk.val; + pLast->val = pBlockInfo->lastPk.val; + } + } + record->minVer = pBlockInfo->minVer; record->maxVer = pBlockInfo->maxVer; record->blockOffset = pBlockInfo->blockOffset; @@ -1091,7 +1107,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro int32_t step = asc ? 1 : -1; SBrinRecord tmp; - blockInfoToRecord(&tmp, pBlockInfo); + blockInfoToRecord(&tmp, pBlockInfo, pSupInfo); SBrinRecord* pRecord = &tmp; // no data exists, return directly. @@ -1290,7 +1306,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBrinRecord tmp; - blockInfoToRecord(&tmp, pBlockInfo); + blockInfoToRecord(&tmp, pBlockInfo, pSup); SBrinRecord* pRecord = &tmp; code = tsdbDataFileReadBlockDataByColumn(pReader->pFileReader, pRecord, pBlockData, pSchema, &pSup->colId[1], pSup->numOfCols - 1); @@ -1327,7 +1343,7 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, - SBrinRecord* pRecord) { + SBrinRecord* pRecord, SBlockLoadSuppInfo* pSupInfo) { bool asc = ASCENDING_TRAVERSE(order); int32_t step = asc ? 1 : -1; @@ -1341,7 +1357,7 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); - blockInfoToRecord(pRecord, p); + blockInfoToRecord(pRecord, p, pSupInfo); *nextIndex = pBlockInfo->tbBlockIdx + step; return true; @@ -1464,7 +1480,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; bool hasNeighbor = - getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec); + getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec, pSupInfo); // overlap with neighbor if (hasNeighbor) { @@ -1473,7 +1489,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* } SBrinRecord pRecord; - blockInfoToRecord(&pRecord, pBlockInfo); + blockInfoToRecord(&pRecord, pBlockInfo, pSupInfo); // has duplicated ts of different version in this block pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0); @@ -2411,15 +2427,16 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, bool* loadNeighbor) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t order = pReader->info.order; - SDataBlockIter* pIter = &pReader->status.blockIter; - int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; - int32_t nextIndex = -1; - SBrinRecord rec = {0}; + int32_t code = TSDB_CODE_SUCCESS; + int32_t order = pReader->info.order; + SDataBlockIter* pIter = &pReader->status.blockIter; + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; + int32_t nextIndex = -1; + SBrinRecord rec = {0}; *loadNeighbor = false; - bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec); + bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec, pSupInfo); if (!hasNeighbor) { // do nothing return code; } @@ -4868,11 +4885,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, return TSDB_CODE_SUCCESS; } - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - if (pResBlock->info.id.uid != pFBlock->uid) { + if (pResBlock->info.id.uid != pBlockInfo->uid) { return TSDB_CODE_SUCCESS; } @@ -4880,10 +4897,10 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, TARRAY2_CLEAR(&pSup->colAggArray, 0); SBrinRecord pRecord; - blockInfoToRecord(&pRecord, pFBlock); + blockInfoToRecord(&pRecord, pBlockInfo, pSup); code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pRecord, &pSup->colAggArray); if (code != TSDB_CODE_SUCCESS) { - tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code), + tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pBlockInfo->uid, tstrerror(code), pReader->idStr); return code; } @@ -4912,7 +4929,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, } // do fill all null column value SMA info - doFillNullColSMA(pSup, pFBlock->numRow, numOfCols, pTsAgg); + doFillNullColSMA(pSup, pBlockInfo->numRow, numOfCols, pTsAgg); size_t size = pSup->colAggArray.size; @@ -4938,7 +4955,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, // double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; pReader->cost.smaLoadTime += 0; // elapsedTime; - tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); + tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pBlockInfo->uid, pReader->idStr); return code; } From a0e68c2bfb038920ae1412470d14269fc66c276c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 11:14:53 +0800 Subject: [PATCH 5/6] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- .../{streamStart.c => streamStartHistory.c} | 244 +---------- source/libs/stream/src/streamTask.c | 387 +----------------- 3 files changed, 7 insertions(+), 626 deletions(-) rename source/libs/stream/src/{streamStart.c => streamStartHistory.c} (69%) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 565f6b5938..260c1c54d2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -850,7 +850,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); -void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStartHistory.c similarity index 69% rename from source/libs/stream/src/streamStart.c rename to source/libs/stream/src/streamStartHistory.c index 90d0de5a0e..04a99feab0 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -32,12 +32,12 @@ typedef struct SLaunchHTaskInfo { static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, + int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); -static void doProcessDownstreamReadyRsp(SStreamTask* pTask); -static void doExecScanhistoryInFuture(void* param, void* tmrId); -static int32_t doStartScanHistoryTask(SStreamTask* pTask); -static int32_t streamTaskStartScanHistory(SStreamTask* pTask); +static void doExecScanhistoryInFuture(void* param, void* tmrId); +static int32_t doStartScanHistoryTask(SStreamTask* pTask); +static int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); @@ -165,67 +165,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { return 0; } -// check status -void streamTaskCheckDownstream(SStreamTask* pTask) { - SDataRange* pRange = &pTask->dataRange; - STimeWindow* pWindow = &pRange->window; - - SStreamTaskCheckReq req = { - .streamId = pTask->id.streamId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->info.nodeId, - .childId = pTask->info.selfChildId, - .stage = pTask->pMeta->stage, - }; - - ASSERT(pTask->status.downstreamReady == 0); - - // serialize streamProcessScanHistoryFinishRsp - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - streamTaskStartMonitorCheckRsp(pTask); - - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); - - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 - " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, - pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); - - streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamTaskStartMonitorCheckRsp(pTask); - - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); - - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.reqId = tGenIdPI64(); - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - - streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); - - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); - streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { // for sink task, set it ready directly. - stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); - doProcessDownstreamReadyRsp(pTask); - } -} - int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); @@ -327,121 +266,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -void doProcessDownstreamReadyRsp(SStreamTask* pTask) { - EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; - streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); - - int64_t checkTs = pTask->execInfo.checkTs; - int64_t readyTs = pTask->execInfo.readyTs; - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); - - if (pTask->status.taskStatus == TASK_STATUS__HALT) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); - - // halt it self for count window stream task until the related fill history task completed. - stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, - pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); - } - - // start the related fill-history task, when current task is ready - // not invoke in success callback due to the deadlock. - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr); - streamLaunchFillHistoryTask(pTask); - } -} - -static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { - int32_t vgId = pTask->pMeta->vgId; - - taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - bool existed = false; - for (int i = 0; i < num; ++i) { - SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); - if (p->nodeId == nodeId) { - existed = true; - break; - } - } - - if (!existed) { - SDownstreamTaskEpset t = {.nodeId = nodeId}; - taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); - - stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, - t.nodeId, (num + 1)); - } - - taosThreadMutexUnlock(&pTask->lock); -} - -int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { - ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); - - int64_t now = taosGetTimestampMs(); - const char* id = pTask->id.idStr; - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - int32_t total = streamTaskGetNumOfDownstream(pTask); - int32_t left = -1; - - if (streamTaskShouldStop(pTask)) { - stDebug("s-task:%s should stop, do not do check downstream again", id); - return TSDB_CODE_SUCCESS; - } - - if (pRsp->status == TASK_DOWNSTREAM_READY) { - int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_SUCCESS; - } - - if (left == 0) { - doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag - streamTaskStopMonitorCheckRsp(pInfo, id); - } else { - stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); - } - } else { // not ready, wait for 100ms and retry - int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_SUCCESS; // return success in any cases. - } - - if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { - if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { - stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 - ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", - id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); - addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); - } else { - stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " - "downstream again, nodeUpdate needed", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); - } - - int32_t startTs = pTask->execInfo.checkTs; - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); - - // automatically set the related fill-history task to be failed. - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); - } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms - ASSERT(left > 0); - stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); - } - } - - return 0; -} - int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; @@ -797,64 +621,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe } } -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 962cadfbd6..9f4e6aaea1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -21,8 +21,6 @@ #include "ttimer.h" #include "wal.h" -#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec - static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated); static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate); @@ -442,7 +440,7 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); } - streamTaskCleanCheckInfo(&pTask->taskCheckInfo); + streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); if (pTask->pState) { stDebug("s-task:0x%x start to free task state", taskId); @@ -994,386 +992,3 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { tmsgSendReq(&pTask->info.mnodeEpset, &msg); return 0; } - -static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { - taosArrayClear(pInfo->pList); - - if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { - pInfo->notReadyTasks = 1; - } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos); - ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum); - } - - pInfo->startTs = startTs; - return TSDB_CODE_SUCCESS; -} - -static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) { - for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); - if (p->taskId == taskId) { - return p; - } - } - - return NULL; -} - -int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { - SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; - - taosThreadMutexLock(&pInfo->checkInfoLock); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } - - taosArrayPush(pInfo->pList, &info); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} - -int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, - int32_t* pNotReady, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", - id, reqId, p->reqId, taskId); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_FAILED; - } - - // subtract one not-ready-task, since it is ready now - if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { - *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); - } else { - *pNotReady = pInfo->notReadyTasks; - } - - p->status = status; - p->rspTs = rspTs; - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId, - reqId); - return TSDB_CODE_FAILED; -} - -static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { - if (pInfo->inCheckProcess == 0) { - pInfo->inCheckProcess = 1; - } else { - ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); - return TSDB_CODE_FAILED; - } - - stDebug("s-task:%s set the in-check-procedure flag", id); - return 0; -} - -static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) { - if (!pInfo->inCheckProcess) { - stWarn("s-task:%s already not in-check-procedure", id); - } - - int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; - stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); - - pInfo->startTs = 0; - pInfo->notReadyTasks = 0; - pInfo->inCheckProcess = 0; - pInfo->stopCheckProcess = 0; - - pInfo->notReadyRetryCount = 0; - pInfo->timeoutRetryCount = 0; - - taosArrayClear(pInfo->pList); - return 0; -} - -static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { - SStreamTaskCheckReq req = { - .streamId = pTask->id.streamId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->info.nodeId, - .childId = pTask->info.selfChildId, - .stage = pTask->pMeta->stage, - }; - - STaskOutputInfo* pOutputInfo = &pTask->outputInfo; - if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { - req.reqId = p->reqId; - req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; - req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); - - streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); - } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); - - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - - if (p->taskId == pVgInfo->taskId) { - req.reqId = p->reqId; - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; - - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, - p->reqId); - streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - break; - } - } - } else { - ASSERT(0); - } -} - -static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, - int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->status == TASK_DOWNSTREAM_READY) { - (*numOfReady) += 1; - } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, - p->taskId); - (*numOfFault) += 1; - } else { // TASK_DOWNSTREAM_NOT_READY - if (p->rspTs == 0) { // not response yet - ASSERT(p->status == -1); - if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. - taosArrayPush(pTimeoutList, &p->taskId); - } else { // el < CHECK_NOT_RSP_DURATION - (*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp - } - } else { - taosArrayPush(pNotReadyList, &p->taskId); - } - } - } -} - -static void rspMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - SStreamTaskState* pStat = streamTaskGetStatus(pTask); - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - int64_t el = now - pInfo->startTs; - ETaskStatus state = pStat->state; - const char* id = pTask->id.idStr; - int32_t numOfReady = 0; - int32_t numOfFault = 0; - int32_t numOfNotRsp = 0; - int32_t numOfNotReady = 0; - int32_t numOfTimeout = 0; - - stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); - - if (state == TASK_STATUS__STOP) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - - taosThreadMutexLock(&pInfo->checkInfoLock); - streamTaskCompleteCheckRsp(pInfo, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - return; - } - - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - - taosThreadMutexLock(&pInfo->checkInfoLock); - streamTaskCompleteCheckRsp(pInfo, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return; - } - - taosThreadMutexLock(&pInfo->checkInfoLock); - if (pInfo->notReadyTasks == 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, - vgId, ref); - - streamTaskCompleteCheckRsp(pInfo, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return; - } - - SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); - SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); - - if (pStat->state == TASK_STATUS__UNINIT) { - getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); - } else { // unexpected status - stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); - } - - numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); - numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); - - // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); - if (numOfFault > 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); - - streamTaskCompleteCheckRsp(pInfo, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); - return; - } - - // checking of downstream tasks has been stopped by other threads - if (pInfo->stopCheckProcess == 1) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " - "fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); - - streamTaskCompleteCheckRsp(pInfo, id); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - - // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); - } - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); - return; - } - - if (numOfNotReady > 0) { // check to make sure not in recheck timer - ASSERT(pTask->status.downstreamReady == 0); - - // reset the info, and send the check msg to failure downstream again - for (int32_t i = 0; i < numOfNotReady; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } - } - - pInfo->notReadyRetryCount += 1; - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, - numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); - } - - // todo add into node update list and send to mnode - if (numOfTimeout > 0) { - ASSERT(pTask->status.downstreamReady == 0); - - for (int32_t i = 0; i < numOfTimeout; ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - - SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); - if (p != NULL) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - } - } - - pInfo->timeoutRetryCount += 1; - stDebug("s-task:%s %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); - } - - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - - stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, - numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); -} - -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - - taosThreadMutexLock(&pInfo->checkInfoLock); - int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); - if (code != TSDB_CODE_SUCCESS) { - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_FAILED; - } - - streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); - - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); - - if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); - } else { - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - } - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return 0; -} - -int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); - streamTaskCompleteCheckRsp(pInfo, id); - - pInfo->stopCheckProcess = 1; - taosThreadMutexUnlock(&pInfo->checkInfoLock); - - stDebug("s-task:%s set stop check rsp mon", id); - return TSDB_CODE_SUCCESS; -} - -void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { - ASSERT(pInfo->inCheckProcess == 0); - - pInfo->pList = taosArrayDestroy(pInfo->pList); - if (pInfo->checkRspTmr != NULL) { - /*bool ret = */ taosTmrStop(pInfo->checkRspTmr); - pInfo->checkRspTmr = NULL; - } - - taosThreadMutexDestroy(&pInfo->checkInfoLock); -} \ No newline at end of file From e5069669cb54c6f8e4bcfad64bbf3eb962d2aa31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 11:16:40 +0800 Subject: [PATCH 6/6] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 665 +++++++++++++++++++++ 1 file changed, 665 insertions(+) create mode 100644 source/libs/stream/src/streamCheckStatus.c diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c new file mode 100644 index 0000000000..42bf334fa3 --- /dev/null +++ b/source/libs/stream/src/streamCheckStatus.c @@ -0,0 +1,665 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cos.h" +#include "rsync.h" +#include "streamBackendRocksdb.h" +#include "streamInt.h" + +#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec + +static void processDownstreamReadyRsp(SStreamTask* pTask); +static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); +static void rspMonitorFn(void* param, void* tmrId); +static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); +static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); +static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id); +static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p); +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); + +static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); + +// check status +void streamTaskCheckDownstream(SStreamTask* pTask) { + SDataRange* pRange = &pTask->dataRange; + STimeWindow* pWindow = &pRange->window; + + SStreamTaskCheckReq req = { + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->info.nodeId, + .childId = pTask->info.selfChildId, + .stage = pTask->pMeta->stage, + }; + + ASSERT(pTask->status.downstreamReady == 0); + + // serialize streamProcessScanHistoryFinishRsp + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + + req.reqId = tGenIdPI64(); + req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; + req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 + " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, + pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); + + streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); + + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskStartMonitorCheckRsp(pTask); + + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, + pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.reqId = tGenIdPI64(); + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + + streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); + + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); + streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } else { // for sink task, set it ready directly. + stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); + processDownstreamReadyRsp(pTask); + } +} + +int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { + ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); + + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + int32_t total = streamTaskGetNumOfDownstream(pTask); + int32_t left = -1; + + if (streamTaskShouldStop(pTask)) { + stDebug("s-task:%s should stop, do not do check downstream again", id); + return TSDB_CODE_SUCCESS; + } + + if (pRsp->status == TASK_DOWNSTREAM_READY) { + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; + } + + if (left == 0) { + processDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag + streamTaskStopMonitorCheckRsp(pInfo, id); + } else { + stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); + } + } else { // not ready, wait for 100ms and retry + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; // return success in any cases. + } + + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { + stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 + ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", + id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); + addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); + } else { + stError( + "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " + "downstream again, nodeUpdate needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } + + int32_t startTs = pTask->execInfo.checkTs; + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); + } + } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms + ASSERT(left > 0); + stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); + } + } + + return 0; +} + +int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { + SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; + + taosThreadMutexLock(&pInfo->checkInfoLock); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pInfo->pList, &info); + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} + +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); + + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); + + if (pInfo->checkRspTmr == NULL) { + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + } else { + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return 0; +} + +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, false, id); + + pInfo->stopCheckProcess = 1; + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + stDebug("s-task:%s set stop check rsp mon", id); + return TSDB_CODE_SUCCESS; +} + +void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { + ASSERT(pInfo->inCheckProcess == 0); + + pInfo->pList = taosArrayDestroy(pInfo->pList); + if (pInfo->checkRspTmr != NULL) { + /*bool ret = */ taosTmrStop(pInfo->checkRspTmr); + pInfo->checkRspTmr = NULL; + } + + taosThreadMutexDestroy(&pInfo->checkInfoLock); +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +void processDownstreamReadyRsp(SStreamTask* pTask) { + EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; + streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); + + int64_t checkTs = pTask->execInfo.checkTs; + int64_t readyTs = pTask->execInfo.readyTs; + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); + + if (pTask->status.taskStatus == TASK_STATUS__HALT) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); + + // halt it self for count window stream task until the related fill history task completed. + stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, + pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + } + + // start the related fill-history task, when current task is ready + // not invoke in success callback due to the deadlock. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } +} + +void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { + int32_t vgId = pTask->pMeta->vgId; + + taosThreadMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); + bool existed = false; + for (int i = 0; i < num; ++i) { + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); + if (p->nodeId == nodeId) { + existed = true; + break; + } + } + + if (!existed) { + SDownstreamTaskEpset t = {.nodeId = nodeId}; + taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); + + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, + t.nodeId, (num + 1)); + } + + taosThreadMutexUnlock(&pTask->lock); +} + +int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { + taosArrayClear(pInfo->pList); + + if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { + pInfo->notReadyTasks = 1; + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos); + ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum); + } + + pInfo->startTs = startTs; + return TSDB_CODE_SUCCESS; +} + +SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) { + for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p->taskId == taskId) { + return p; + } + } + + return NULL; +} + +int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, + int32_t* pNotReady, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + + if (reqId != p->reqId) { + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", + id, reqId, p->reqId, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + // subtract one not-ready-task, since it is ready now + if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { + *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); + } else { + *pNotReady = pInfo->notReadyTasks; + } + + p->status = status; + p->rspTs = rspTs; + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId, + reqId); + return TSDB_CODE_FAILED; +} + +int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { + if (pInfo->inCheckProcess == 0) { + pInfo->inCheckProcess = 1; + } else { + ASSERT(pInfo->startTs > 0); + stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); + return TSDB_CODE_FAILED; + } + + stDebug("s-task:%s set the in-check-procedure flag", id); + return 0; +} + +int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) { + if (lock) { + taosThreadMutexLock(&pInfo->checkInfoLock); + } + + if (!pInfo->inCheckProcess) { + stWarn("s-task:%s already not in-check-procedure", id); + } + + int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; + stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); + + pInfo->startTs = 0; + pInfo->notReadyTasks = 0; + pInfo->inCheckProcess = 0; + pInfo->stopCheckProcess = 0; + + pInfo->notReadyRetryCount = 0; + pInfo->timeoutRetryCount = 0; + + taosArrayClear(pInfo->pList); + + if (lock) { + taosThreadMutexUnlock(&pInfo->checkInfoLock); + } + + return 0; +} + +void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { + SStreamTaskCheckReq req = { + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->info.nodeId, + .childId = pTask->info.selfChildId, + .stage = pTask->pMeta->stage, + }; + + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = p->reqId; + req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; + req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); + + streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgs = taosArrayGetSize(vgInfo); + + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + + if (p->taskId == pVgInfo->taskId) { + req.reqId = p->reqId; + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, + p->reqId); + streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + break; + } + } + } else { + ASSERT(0); + } +} + +void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->status == TASK_DOWNSTREAM_READY) { + (*numOfReady) += 1; + } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { + stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, + p->taskId); + (*numOfFault) += 1; + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet + ASSERT(p->status == -1); + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + taosArrayPush(pTimeoutList, &p->taskId); + } else { // el < CHECK_NOT_RSP_DURATION + (*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp + } + } else { + taosArrayPush(pNotReadyList, &p->taskId); + } + } + } +} + +void rspMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + SStreamTaskState* pStat = streamTaskGetStatus(pTask); + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + int64_t el = now - pInfo->startTs; + ETaskStatus state = pStat->state; + const char* id = pTask->id.idStr; + int32_t numOfReady = 0; + int32_t numOfFault = 0; + int32_t numOfNotRsp = 0; + int32_t numOfNotReady = 0; + int32_t numOfTimeout = 0; + + stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); + + if (state == TASK_STATUS__STOP) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + + streamTaskCompleteCheckRsp(pInfo, true, id); + + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + } + return; + } + + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + + streamTaskCompleteCheckRsp(pInfo, true, id); + return; + } + + taosThreadMutexLock(&pInfo->checkInfoLock); + if (pInfo->notReadyTasks == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, + vgId, ref); + + streamTaskCompleteCheckRsp(pInfo, false, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return; + } + + SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); + SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); + + if (pStat->state == TASK_STATUS__UNINIT) { + getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + } else { // unexpected status + stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); + } + + numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); + numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); + + // fault tasks detected, not try anymore + ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); + if (numOfFault > 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " + "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, false, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + return; + } + + // checking of downstream tasks has been stopped by other threads + if (pInfo->stopCheckProcess == 1) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug( + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " + "fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, false, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + } + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + return; + } + + if (numOfNotReady > 0) { // check to make sure not in recheck timer + ASSERT(pTask->status.downstreamReady == 0); + + // reset the info, and send the check msg to failure downstream again + for (int32_t i = 0; i < numOfNotReady; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); + } + } + + pInfo->notReadyRetryCount += 1; + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again, retry:%d start time:%" PRId64, id, + numOfNotReady, pInfo->notReadyRetryCount, pInfo->startTs); + } + + // todo add into node update list and send to mnode + if (numOfTimeout > 0) { + ASSERT(pTask->status.downstreamReady == 0); + + for (int32_t i = 0; i < numOfTimeout; ++i) { + int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); + + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); + } + } + + pInfo->timeoutRetryCount += 1; + + // timeout more than 100 sec, add into node update list + if (pInfo->timeoutRetryCount > 10) { + pInfo->timeoutRetryCount = 0; + stDebug("s-task:%s vgId:%d %d downstream task(s) timeout more than 100sec, add into nodeUpate list", id, vgId, + numOfTimeout); + } else { + stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + } + } + + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id, + numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); +} + +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->stage) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; + tEndDecode(pDecoder); + return 0; +}