fix(stream): check null ptr.

This commit is contained in:
Haojun Liao 2024-03-26 23:27:38 +08:00
parent 46280bfee9
commit 3c7fe5fd01
1 changed files with 13 additions and 8 deletions

View File

@ -2170,34 +2170,39 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
} }
} }
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) { if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t)); SArray *pList = taosArrayInit(4, sizeof(int32_t));
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
} else { } else {
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
} }
int32_t total = taosArrayGetSize(*pReqTaskList); int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs if (total == numOfTasks) { // all tasks has send the reqs
int64_t checkpointId = mndStreamGenChkpId(pMnode); int64_t checkpointId = mndStreamGenChkpId(pMnode);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
// TODO:handle error if (pStream != NULL) {
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); // TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
mndReleaseStream(pMnode, pStream);
} else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans
}
// remove this entry // remove this entry
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
} }
mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
{ {