refactor: update logs, and set correct vgId.

This commit is contained in:
Haojun Liao 2024-11-15 13:48:42 +08:00
parent a79f63caeb
commit b74a1bc726
2 changed files with 11 additions and 11 deletions

View File

@ -1003,7 +1003,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask); int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
if (pTask == NULL || (code != 0)) { if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64 tqError("vgId:%d process retrieve checkpoint-trigger, checkpointId:%" PRId64
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already", " from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId); pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -1098,8 +1098,8 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
} }
tqDebug( tqDebug(
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, " "s-task:%s recv re-send checkpoint-trigger msg through retrieve/rsp channel, upstream:0x%x, checkpointId:%" PRId64
"checkpointId:%" PRId64 ", transId:%d", ", transId:%d",
pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId); pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp); code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);

View File

@ -351,6 +351,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
pActiveInfo->activeId = checkpointId; pActiveInfo->activeId = checkpointId;
pActiveInfo->transId = transId; pActiveInfo->transId = transId;
if (pTask->chkInfo.startTs == 0) {
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
}
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
@ -407,11 +412,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
if (pTask->chkInfo.startTs == 0) {
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
}
// todo: handle this // todo: handle this
// update the child Id for downstream tasks // update the child Id for downstream tasks
code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
@ -1149,16 +1149,16 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret); tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
if (ret < 0) { if (ret < 0) {
stError("encode stream hb msg rsp failed, code:%s", tstrerror(code)); stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
} }
buf = rpcMallocCont(tlen + sizeof(SMsgHead)); buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) { if (buf == NULL) {
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId); stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
continue; continue;
} }
((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId); ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);