From d397c9a9b75e0f9cc0c3d46653952ee2ca98dcb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Nov 2024 19:22:37 +0800 Subject: [PATCH 1/3] fix(stream): remove failed task in hash table and array list. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamMeta.c | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 326e8d4ada..1ea524dc78 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, + tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 86f305df60..a6f87711bf 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)); - if (code) { // todo remove it from task list + if (code) { stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); + void* pUnused = taosArrayPop(pMeta->pTaskList); int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret != 0) { @@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); @@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaCommit(pMeta)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); From 96d4a2908e4f9cb608966db29306469a101d1e25 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Nov 2024 02:12:15 +0800 Subject: [PATCH 2/3] fix(stream): check failedId before sending checkpoint msg --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 41 ++++++++++++----------- source/libs/stream/src/streamDispatch.c | 10 +++++- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index de10d6844e..53b8e0e0b9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -714,7 +714,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId); bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId); void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); -int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pInfo, int32_t code); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2280b7f06f..9eab33cee8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1208,34 +1208,37 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_ // record the dispatch checkpoint trigger info in the list // memory insufficient may cause the stream computing stopped -int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { +int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t now = taosGetTimestampMs(); int32_t code = 0; streamMutexLock(&pInfo->lock); - pInfo->dispatchTrigger = true; - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; + if (sendingChkptId > pInfo->failedId) { + pInfo->dispatchTrigger = true; + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; - STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); - if (px == NULL) { // pause the stream task, if memory not enough - code = terrno; - } - } else { - for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { - SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); - if (pVgInfo == NULL) { - continue; - } - - STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; - void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + STaskTriggerSendInfo p = { + .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough code = terrno; - break; + } + } else { + for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { + SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i); + if (pVgInfo == NULL) { + continue; + } + + STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; + void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); + if (px == NULL) { // pause the stream task, if memory not enough + code = terrno; + break; + } } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5807240f5e..b4fcf1edc9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -845,7 +845,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { streamMutexUnlock(&pTask->msgInfo.lock); code = doBuildDispatchMsg(pTask, pBlock); + + int64_t chkptId = 0; if (code == 0) { + if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pBlock->blocks, 0); + if (pBlock != NULL) { + chkptId = p->info.version; + } + } destroyStreamDataBlock(pBlock); } else { // todo handle build dispatch msg failed } @@ -862,7 +870,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { continue; } - code = streamTaskInitTriggerDispatchInfo(pTask); + code = streamTaskInitTriggerDispatchInfo(pTask, chkptId); if (code != TSDB_CODE_SUCCESS) { // todo handle error } } From 99cab908275e015566a00d40df7bce3bc40400a5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Nov 2024 18:46:02 +0800 Subject: [PATCH 3/3] refactor: send success rsp to vnode if hb already processed. --- source/dnode/mnode/impl/src/mndStreamHb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 941956ae2b..8e4b307276 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -379,9 +379,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { - mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId); + mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - terrno = TSDB_CODE_INVALID_MSG; + terrno = TSDB_CODE_SUCCESS; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock);