diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b522f6fa..ceb239ee96 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2153,11 +2153,20 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); - return -1; + // not in meta-store yet, try to acquire the task in exec buffer + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); @@ -2175,7 +2184,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);