fix(stream): acquire the stream task in exec buffer if not in mnode store.
This commit is contained in:
parent
a11b2c614e
commit
530e0133b9
|
@ -2153,11 +2153,20 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||
if (pStream == NULL) {
|
||||
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId);
|
||||
|
||||
return -1;
|
||||
// not in meta-store yet, try to acquire the task in exec buffer
|
||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||
if (p == NULL) {
|
||||
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
return -1;
|
||||
} else {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
||||
req.streamId, req.taskId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
|
@ -2175,7 +2184,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||
if (total == numOfTasks) { // all tasks has send the reqs
|
||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||
|
||||
// TODO:handle error
|
||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
|
|
Loading…
Reference in New Issue