From b1ba716299bdbbc3a6239ad4229a94c6137d7343 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 09:24:36 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 2 ++ source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/stream.c | 4 ++++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 627036c00d..12bc76a004 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1335,6 +1335,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { tDecodeStreamDispatchReq(&decoder, &req); tDecoderClear(&decoder); + tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 854478f41e..8992d07879 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -355,7 +355,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { const char* pStatus = streamGetTaskStatusStr(status); if (status != TASK_STATUS__NORMAL) { - tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus); + tqTrace("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6c12fbb822..2f3bf147d1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -218,6 +218,10 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); + if (!pTask->pMeta->leader) { + ASSERT(0); + } + // upstream task has restarted/leader-follower switch/transferred to other dnodes if (pReq->stage > pInfo->stage) { stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64