diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c..5bf0a9b199 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); +// injection error +void streamMetaFreeTQDuringScanWalError(STQ* pTq); + int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fbfe0bee53..48c5360c01 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -161,8 +161,12 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); + qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); + +#if 0 + streamMetaFreeTQDuringScanWalError(pTq); +#endif - qDebug("vgId:%d end to close tq", vgId); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9eef07faf0..bb04cd8dba 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -462,3 +462,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) { return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); } + +void streamMetaFreeTQDuringScanWalError(STQ* pTq) { + SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); + p->metaId = pTq->pStreamMeta->rid; + p->numOfTasks = 0; + + doStartScanWal(p, 0); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c index 515845ba2b..8bbe403dcc 100644 --- a/source/libs/stream/src/streamErrorInjection.c +++ b/source/libs/stream/src/streamErrorInjection.c @@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode taosMsleep(65*1000); -} \ No newline at end of file +}