From 6274eea7c4401b2e1f6bc6d300221bb60fd98497 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Jan 2025 19:15:08 +0800 Subject: [PATCH 1/4] fix(stream): adjust the free stream meta position and check the close flag of streamMeta before starting scan wal. --- source/dnode/vnode/src/tq/tq.c | 20 +++++++++++++++----- source/dnode/vnode/src/tq/tqStreamTask.c | 23 +++++++++++++++++++---- source/libs/stream/src/streamHb.c | 1 - source/libs/stream/src/streamMeta.c | 1 + 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bfc50fcb2..fbfe0bee53 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -75,12 +75,14 @@ int32_t tqOpen(const char* path, SVnode* pVnode) { if (pTq == NULL) { return terrno; } + pVnode->pTq = pTq; + pTq->pVnode = pVnode; + pTq->path = taosStrdup(path); if (pTq->path == NULL) { return terrno; } - pTq->pVnode = pVnode; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (pTq->pHandle == NULL) { @@ -131,11 +133,19 @@ void tqClose(STQ* pTq) { return; } + int32_t vgId = 0; + if (pTq->pVnode != NULL) { + vgId = TD_VID(pTq->pVnode); + } else if (pTq->pStreamMeta != NULL) { + vgId = pTq->pStreamMeta->vgId; + } + + // close the stream meta firstly + streamMetaClose(pTq->pStreamMeta); + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - int32_t vgId = TD_VID(pTq->pVnode); - if (pHandle->msg != NULL) { tqPushEmptyDataRsp(pHandle, vgId); rpcFreeCont(pHandle->msg->pCont); @@ -151,8 +161,8 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); - qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); - streamMetaClose(pTq->pStreamMeta); + + qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index bc7e2e28e3..9eef07faf0 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -86,15 +86,30 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + if (pMeta->closeFlag) { + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("vgId:%d jump out of scan wal timer since closed", vgId); + } else { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + + taosMemoryFree(pParam); + return; + } + vgId = pMeta->vgId; pTq = pMeta->ahandle; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, pTq->pVnode->restored); - code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - if (code) { - tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + if (pTq->pVnode != NULL) { + code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + if (code) { + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + } } code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -330,13 +345,13 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t vgId = pStreamMeta->vgId; + SArray* pTaskList = NULL; int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } // clone the task list, to avoid the task update during scan wal files - SArray* pTaskList = NULL; streamMetaWLock(pStreamMeta); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); streamMetaWUnLock(pStreamMeta); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 53b6a38b35..7c157bb05e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -331,7 +331,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } else { stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } -// taosMemoryFree(param); return; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0de256d86d..9a2eeb9311 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -576,6 +576,7 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } + int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid); if (code) { stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code)); From c93fe999ba2a4a2d515d96e10f9e472c826cd19c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Jan 2025 01:26:49 +0800 Subject: [PATCH 2/4] refactor(stream): injection error. --- source/dnode/vnode/src/inc/vnodeInt.h | 3 +++ source/dnode/vnode/src/tq/tq.c | 6 +++++- source/dnode/vnode/src/tq/tqStreamTask.c | 8 ++++++++ source/libs/stream/src/streamErrorInjection.c | 2 +- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c..5bf0a9b199 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); +// injection error +void streamMetaFreeTQDuringScanWalError(STQ* pTq); + int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbfe0bee53..48c5360c01 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -161,8 +161,12 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); + qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); + +#if 0 + streamMetaFreeTQDuringScanWalError(pTq); +#endif - qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9eef07faf0..bb04cd8dba 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -462,3 +462,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) { return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); } + +void streamMetaFreeTQDuringScanWalError(STQ* pTq) { + SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); + p->metaId = pTq->pStreamMeta->rid; + p->numOfTasks = 0; + + doStartScanWal(p, 0); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c index 515845ba2b..8bbe403dcc 100644 --- a/source/libs/stream/src/streamErrorInjection.c +++ b/source/libs/stream/src/streamErrorInjection.c @@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode taosMsleep(65*1000); -} \ No newline at end of file +} From b4a6523aa660449daf884e79c761d57f363fc243 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Jan 2025 01:35:24 +0800 Subject: [PATCH 3/4] fix(stream): fix invalid read. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 48c5360c01..5b19d4cd87 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -161,7 +161,7 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); - qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); + qDebug("vgId:%d end to close tq", vgId); #if 0 streamMetaFreeTQDuringScanWalError(pTq); From 84eaed0bbb56abdca195d438f26ad946d5b10110 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 19 Jan 2025 00:13:58 +0800 Subject: [PATCH 4/4] fix(stream): fix invalid read. --- source/dnode/vnode/src/tq/tqStreamTask.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index bb04cd8dba..9ea84830f1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -22,6 +22,8 @@ typedef struct SBuildScanWalMsgParam { int64_t metaId; int32_t numOfTasks; + int8_t restored; + SMsgCb msgCb; } SBuildScanWalMsgParam; static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); @@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) { static void doStartScanWal(void* param, void* tmrId) { int32_t vgId = 0; - STQ* pTq = NULL; int32_t code = 0; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; @@ -100,16 +101,17 @@ static void doStartScanWal(void* param, void* tmrId) { } vgId = pMeta->vgId; - pTq = pMeta->ahandle; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pTq->pVnode->restored); + pParam->restored); +#if 0 + // wait for the vnode is freed, and invalid read may occur. + taosMsleep(10000); +#endif - if (pTq->pVnode != NULL) { - code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - if (code) { - tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); - } + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + if (code) { + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -135,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { pParam->metaId = pMeta->rid; pParam->numOfTasks = numOfTasks; + pParam->restored = pTq->pVnode->restored; + pParam->msgCb = pTq->pVnode->msgCb; code = streamTimerGetInstance(&pTimer); if (code) {