From c93fe999ba2a4a2d515d96e10f9e472c826cd19c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Jan 2025 01:26:49 +0800 Subject: [PATCH] refactor(stream): injection error. --- source/dnode/vnode/src/inc/vnodeInt.h | 3 +++ source/dnode/vnode/src/tq/tq.c | 6 +++++- source/dnode/vnode/src/tq/tqStreamTask.c | 8 ++++++++ source/libs/stream/src/streamErrorInjection.c | 2 +- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c..5bf0a9b199 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); +// injection error +void streamMetaFreeTQDuringScanWalError(STQ* pTq); + int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbfe0bee53..48c5360c01 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -161,8 +161,12 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); + qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); + +#if 0 + streamMetaFreeTQDuringScanWalError(pTq); +#endif - qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9eef07faf0..bb04cd8dba 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -462,3 +462,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) { return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); } + +void streamMetaFreeTQDuringScanWalError(STQ* pTq) { + SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); + p->metaId = pTq->pStreamMeta->rid; + p->numOfTasks = 0; + + doStartScanWal(p, 0); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c index 515845ba2b..8bbe403dcc 100644 --- a/source/libs/stream/src/streamErrorInjection.c +++ b/source/libs/stream/src/streamErrorInjection.c @@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode taosMsleep(65*1000); -} \ No newline at end of file +}