From 530e0133b9bba75f7ed33ccda676e3ba108ce1f3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 19:39:58 +0800 Subject: [PATCH 1/5] fix(stream): acquire the stream task in exec buffer if not in mnode store. --- source/dnode/mnode/impl/src/mndStream.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 60b522f6fa..ceb239ee96 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2153,11 +2153,20 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); if (pStream == NULL) { - mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId); - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - taosThreadMutexUnlock(&execInfo.lock); + mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); - return -1; + // not in meta-store yet, try to acquire the task in exec buffer + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (p == NULL) { + mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } else { + mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", + req.streamId, req.taskId); + } } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); @@ -2175,7 +2184,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); From 46280bfee95e634e4bf36f1197d77f046faa3526 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 19:40:47 +0800 Subject: [PATCH 2/5] fix(stream): add some comments. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ceb239ee96..93b4e70de7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2156,6 +2156,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId); // not in meta-store yet, try to acquire the task in exec buffer + // the checkpoint req arrives too soon before the completion of the create stream trans. STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (p == NULL) { From 3c7fe5fd01b464693f5fe2ac429084ba5f2af906 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 23:27:38 +0800 Subject: [PATCH 3/5] fix(stream): check null ptr. --- source/dnode/mnode/impl/src/mndStream.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 93b4e70de7..bed946bc24 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2170,34 +2170,39 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { } } - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream); + SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); - doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(pList, req.taskId, req.streamId, numOfTasks); taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *)); pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); } else { - doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks); + doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks); } int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs int64_t checkpointId = mndStreamGenChkpId(pMnode); - mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, pStream->uid, checkpointId); + mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - // TODO:handle error - int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + if (pStream != NULL) { + // TODO:handle error + int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); + mndReleaseStream(pMnode, pStream); + } else { + // todo: wait for the create stream trans completed, and launch the checkpoint trans + } // remove this entry taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); - mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); + mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } - mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); { From f3c306d582b1e2d7005ae3d3340b93d04df085d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Mar 2024 23:28:20 +0800 Subject: [PATCH 4/5] fix(stream): add some comments. --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bed946bc24..9a79f6ff5b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2194,6 +2194,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans + // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); + // sleep(500ms) } // remove this entry From e776cde461a11151549c1e9f746f676d843daef0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Mar 2024 09:02:21 +0800 Subject: [PATCH 5/5] fix(stream):fix unrelease stream obj. --- source/dnode/mnode/impl/src/mndStream.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9a79f6ff5b..6067af199e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2188,10 +2188,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int64_t checkpointId = mndStreamGenChkpId(pMnode); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - if (pStream != NULL) { - // TODO:handle error + if (pStream != NULL) { // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); - mndReleaseStream(pMnode, pStream); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); @@ -2205,6 +2203,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + taosThreadMutexUnlock(&execInfo.lock); {