diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ba366a2e02..bf223e8c28 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -681,7 +681,8 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal); void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask); void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId); -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, + SRpcHandleInfo* pInfo, int32_t code); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 688be61015..0f52483149 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -883,7 +883,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready", pTask->id.idStr, (int32_t)pReq->downstreamTaskId); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; @@ -901,7 +901,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) // re-send the lost checkpoint-trigger msg to downstream task tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr, (int32_t)pReq->downstreamTaskId, checkpointId, transId); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS); } else { // not send checkpoint-trigger yet, wait int32_t recv = 0, total = 0; streamTaskGetTriggerRecvStatus(pTask, &recv, &total); @@ -914,7 +914,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "sending checkpoint-source/trigger", pTask->id.idStr, recv, total); } - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT); @@ -922,7 +922,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", pTask->id.idStr); - streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); + streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS); } streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d986a36343..fc281e9c79 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -104,8 +104,15 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } -int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) { - SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp)); +int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, + SRpcHandleInfo* pRpcInfo, int32_t code) { + int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); + + void* pBuf = rpcMallocCont(size); + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + + ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); + pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; @@ -120,7 +127,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->rspCode = code; - SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; }