fix(stream): fix invalid write.

This commit is contained in:
Haojun Liao 2024-06-21 15:44:38 +08:00
parent 53b51b9b71
commit 53f9af06ff
2 changed files with 5 additions and 6 deletions

View File

@ -1934,9 +1934,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64
" in checkpoint/uninit status, not ready for pause",
pStream->name, pStream->uid, pEntry->nodeId, pEntry->id.taskId);
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
readyToPause = false;
}

View File

@ -813,6 +813,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
const char* pId = pTask->id.idStr;
int32_t size = taosArrayGetSize(pNotSendList);
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId;
if (size <= 0) {
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
@ -838,15 +839,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
pReq->downstreamNodeId = vgId;
pReq->upstreamTaskId = pUpstreamTask->taskId;
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pReq->checkpointId = checkpointId;
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId);
pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
}
return TSDB_CODE_SUCCESS;