From 57a0c7b487d0a6fcdee2283a0c5953ff0ebd0556 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Aug 2024 23:20:37 +0800 Subject: [PATCH] fix(stream): init the in-mem task list when recv checkpoint-report. check the number of complete downstream check by checking it in hashmap, not just number. --- source/dnode/mnode/impl/src/mndStream.c | 4 ++++ source/libs/stream/src/streamMeta.c | 28 ++++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 35da6c379f..d2182f2c2c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2564,6 +2564,10 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) { } tDecoderClear(&decoder); + streamMutexLock(&execInfo.lock); + mndInitStreamExecInfo(pMnode, &execInfo); + streamMutexUnlock(&execInfo.lock); + mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " transId:%d", req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ed9f274a2..e8afe96339 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1589,11 +1589,33 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } +// check all existed tasks are received rsp +static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) { + for (int32_t i = 0; i < numOfTotal; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (pTaskId == NULL) { + continue; + } + + STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); + if (px == NULL) { + return false; + } + } + } + + return true; +} + int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; int32_t vgId = pMeta->vgId; + bool allRsp = true; streamMetaWLock(pMeta); SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); @@ -1618,9 +1640,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 return 0; } - SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { if (code == TSDB_CODE_DUP_KEY) { @@ -1637,7 +1658,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (numOfRecv == numOfTotal) { + allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal); + if (allRsp) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;