From 53f9af06ffd788c2782c00f58947997a22d8a375 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jun 2024 15:44:38 +0800 Subject: [PATCH] fix(stream): fix invalid write. --- source/dnode/mnode/impl/src/mndStream.c | 5 ++--- source/libs/stream/src/streamCheckpoint.c | 6 +++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7aa95202dd..af20617457 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1934,9 +1934,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) { - mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 - " in checkpoint/uninit status, not ready for pause", - pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId); + mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name, + pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status)); readyToPause = false; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fc281e9c79..6dee0d363b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -813,6 +813,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { const char* pId = pTask->id.idStr; int32_t size = taosArrayGetSize(pNotSendList); int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask); + int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId; if (size <= 0) { stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId); @@ -838,15 +839,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { pReq->downstreamNodeId = vgId; pReq->upstreamTaskId = pUpstreamTask->taskId; pReq->upstreamNodeId = pUpstreamTask->nodeId; - pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId; - + pReq->checkpointId = checkpointId; SRpcMsg rpcMsg = {0}; initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq)); code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg); stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId, - pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId); + pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId); } return TSDB_CODE_SUCCESS;