From b95ad74c7f72bc4e371bd2996daf5cae6d9bef61 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:43:14 +0800 Subject: [PATCH] fix(stream): not handle the check msg for follower tasks. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tq.c | 14 +++++++++++--- source/dnode/vnode/src/tq/tqStreamTask.c | 8 +------- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamRecover.c | 19 ++++++++++++++++--- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a19ebd67b0..a55d188978 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -661,6 +661,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); +int32_t streamSetStatusUnint(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0340f1bb25..95637fad69 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -877,6 +877,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamMeta* pMeta = pTq->pStreamMeta; SStreamTaskCheckReq req; SDecoder decoder; @@ -897,10 +898,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamTaskId = req.upstreamTaskId, }; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); + // only the leader node handle the check request + if (!pMeta->leader) { + tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + return -1; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", @@ -912,7 +920,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId); + return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3cba4567fe..3a5eeae561 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -95,7 +95,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { } pTask->taskExecInfo.init = taosGetTimestampMs(); - tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); + tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init); streamSetStatusNormal(pTask); streamTaskCheckDownstream(pTask); @@ -111,12 +111,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; -// taosWLockLatch(&pMeta->lock); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqDebug("vgId:%d no stream tasks existed to run", vgId); -// taosWUnLockLatch(&pMeta->lock); return 0; } @@ -124,7 +121,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); -// taosWUnLockLatch(&pMeta->lock); return -1; } @@ -135,8 +131,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); -// taosWUnLockLatch(&pMeta->lock); - return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fa545943eb..66f05367c2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -750,7 +750,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - ASSERT(pMeta->numOfStreamTasks <= numOfTasks); + ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d28ec85dd5..2689e9ee70 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); + const char* id = pTask->id.idStr; if (stage == -1) { - qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id, upstreamTaskId, stage); return 0; } if (pInfo->stage == -1) { pInfo->stage = stage; - qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr, + qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id, upstreamTaskId, stage); } if (pInfo->stage < stage) { qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64, - pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage); + id, upstreamTaskId, vgId, stage, pInfo->stage); } return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0; @@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } } +int32_t streamSetStatusUnint(SStreamTask* pTask) { + int32_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr); + return -1; + } else { + qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT); + return 0; + } +} + // source int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);