diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 686635cfe2..3f67503454 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1003,7 +1003,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 + tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64 " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -1098,8 +1098,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) } tqDebug( - "s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " - "checkpointId:%" PRId64 ", transId:%d", + "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64 + ", transId:%d", pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 73495f5741..20f49216e7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -351,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock pActiveInfo->activeId = checkpointId; pActiveInfo->transId = transId; + if (pTask->chkInfo.startTs == 0) { + pTask->chkInfo.startTs = taosGetTimestampMs(); + pTask->execInfo.checkpoint += 1; + } + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); @@ -407,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - if (pTask->chkInfo.startTs == 0) { - pTask->chkInfo.startTs = taosGetTimestampMs(); - pTask->execInfo.checkpoint += 1; - } - // todo: handle this // update the child Id for downstream tasks code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); @@ -1149,16 +1149,16 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); if (ret < 0) { - stError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); + stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code)); } buf = rpcMallocCont(tlen + sizeof(SMsgHead)); if (buf == NULL) { - stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); + stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId); continue; } - ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId); + ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, tlen);