From fcf4cc716942fece5c0848b88681bf145dee766a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Sep 2023 15:58:24 +0800 Subject: [PATCH] fix(stream): fix the invalid counter for ready tasks in vnode, fix an invalid read. --- include/libs/stream/tstream.h | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 4 ++-- source/libs/stream/src/streamData.c | 3 ++- source/libs/stream/src/streamMeta.c | 5 +++++ source/libs/stream/src/streamRecover.c | 18 +++++++++++------- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4ab632837..ce4d7fa4b6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -392,10 +392,10 @@ struct SStreamTask { }; typedef struct STaskStartInfo { - int64_t ts; - int32_t startedAfterNodeUpdate; - int32_t readyTasks; - int32_t elapsedTime; + int64_t ts; + int32_t startedAfterNodeUpdate; + SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + int32_t elapsedTime; } STaskStartInfo; // meta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a105efddda..1d5b3e50f2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1847,7 +1847,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d restart all stream tasks", vgId); + vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } else { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 34cc74852f..c3ef52e96f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -72,6 +72,8 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { SArray* pTaskList = NULL; taosWLockLatch(&pMeta->lock); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + pMeta->startInfo.ts = taosGetTimestampMs(); taosWUnLockLatch(&pMeta->lock); // broadcast the check downstream tasks msg @@ -235,8 +237,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { return TSDB_CODE_SUCCESS; } - pMeta->startInfo.ts = taosGetTimestampMs(); - for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 51487e5588..a0fee6be8f 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -184,12 +184,13 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* return NULL; } + streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem)); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); - streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem)); return (SStreamQueueItem*)pMerged; } else { stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2b48e18de5..54f5c02716 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -151,6 +151,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pMeta->startInfo.pReadyTaskSet == NULL) { + + } + pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); if (pMeta->pHbInfo == NULL) { goto _err; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1bf4f2bf98..2a277f3fca 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -53,13 +53,17 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); taosWLockLatch(&pMeta->lock); - pMeta->startInfo.readyTasks += 1; + + STaskId id = extractStreamTaskKey(pTask); + taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - if (pMeta->startInfo.readyTasks == numOfTotal) { + + if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { // reset value for next time start - pMeta->startInfo.readyTasks = 0; + taosHashClear(pMeta->startInfo.pReadyTaskSet); pMeta->startInfo.startedAfterNodeUpdate = 0; pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts; + stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec", vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0); } @@ -174,7 +178,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 "check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -272,10 +276,10 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ id, upstreamTaskId, vgId, stage, pInfo->stage); } - if (pTask->status.downstreamReady != 1) { - return TASK_DOWNSTREAM_NOT_READY; - } else if (pInfo->stage != stage) { + if (pInfo->stage != stage) { return TASK_SELF_NEW_STAGE; + } else if (pTask->status.downstreamReady != 1) { + return TASK_DOWNSTREAM_NOT_READY; } else { return TASK_DOWNSTREAM_READY; }