diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 897a53ac81..f0e212efc3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1777,7 +1777,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); - tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code)); + tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); return code; } tDecoderClear(&decoder); @@ -1790,18 +1790,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } - code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode - // todo: when generating checkpoint, leader of mnode has transfer to other DNode? - - // set the initial value for generating check point int32_t total = 0; taosWLockLatch(&pMeta->lock); + // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo opt perf pMeta->mgmtInfo.epset = req.mgmtEps; pMeta->mgmtInfo.mnodeId = req.mnodeId; @@ -1813,9 +1805,16 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { total = taosArrayGetSize(pMeta->pTaskList); taosWUnLockLatch(&pMeta->lock); - qDebug("s-task:%s level:%d receive checkpoint source msg from mnode id:%" PRId64 ", total source checkpoint req:%d", - pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total); + qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d", + pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total); + code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode + // todo: when generating checkpoint, leader of mnode has transfer to other DNode? streamProcessCheckpointSourceReq(pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1934,7 +1933,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d, restart to all stream tasks", vgId); + vInfo("vgId:%d, restart all stream tasks", vgId); tqCheckStreamStatus(pTq); } } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index f2e3b8529a..3c731df071 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -250,7 +250,7 @@ const char* streamGetBlockTypeStr(int32_t type) { case STREAM_INPUT__CHECKPOINT: return "checkpoint"; case STREAM_INPUT__CHECKPOINT_TRIGGER: - return "checkpoint-triggre"; + return "checkpoint-trigger"; case STREAM_INPUT__TRANS_STATE: return "trans-state"; default: diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index af283c590a..9320fa29d5 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -178,10 +178,9 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p .stage = pTask->pMeta->stage, }; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, - pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, + pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e805bf0e72..10fb844117 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -419,7 +419,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); for (int32_t i = 0; i < numOfUpstream; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); if (pInfo->nodeId == nodeId) { epsetAssign(&pInfo->epSet, pEpSet); qDebug("s-task:0x%x update the upstreamInfo, nodeId:%d newEpset:%s", pTask->id.taskId, nodeId, buf);