diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 93b4e70de7..bed946bc24 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2170,34 +2170,39 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } } - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(pList, req.taskId, req.streamId, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks); } int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + if (pStream != NULL) { + // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + mndReleaseStream(pMnode, pStream); + } else { + // todo: wait for the create stream trans completed, and launch the checkpoint trans + } // remove this entry taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } - mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); {