Merge pull request #25193 from taosdata/fix/3_liaohj

fix(stream): acquire the stream task in exec buffer if not in mnode store.
This commit is contained in:
Haojun Liao 2024-03-27 13:17:14 +08:00 committed by GitHub
commit 0bfd606e5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 31 additions and 12 deletions

View File

@ -2153,41 +2153,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
}
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t));
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
int64_t checkpointId = mndStreamGenChkpId(pMnode);
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
// TODO:handle error
if (pStream != NULL) { // TODO:handle error
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
} else {
// todo: wait for the create stream trans completed, and launch the checkpoint trans
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
// sleep(500ms)
}
// remove this entry
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
}
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
{