diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 940116317c..5bf0a9b199 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); +// injection error +void streamMetaFreeTQDuringScanWalError(STQ* pTq); + int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bfc50fcb2..5b19d4cd87 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -75,12 +75,14 @@ int32_t tqOpen(const char* path, SVnode* pVnode) { if (pTq == NULL) { return terrno; } + pVnode->pTq = pTq; + pTq->pVnode = pVnode; + pTq->path = taosStrdup(path); if (pTq->path == NULL) { return terrno; } - pTq->pVnode = pVnode; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (pTq->pHandle == NULL) { @@ -131,11 +133,19 @@ void tqClose(STQ* pTq) { return; } + int32_t vgId = 0; + if (pTq->pVnode != NULL) { + vgId = TD_VID(pTq->pVnode); + } else if (pTq->pStreamMeta != NULL) { + vgId = pTq->pStreamMeta->vgId; + } + + // close the stream meta firstly + streamMetaClose(pTq->pStreamMeta); + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - int32_t vgId = TD_VID(pTq->pVnode); - if (pHandle->msg != NULL) { tqPushEmptyDataRsp(pHandle, vgId); rpcFreeCont(pHandle->msg->pCont); @@ -151,8 +161,12 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); - qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); - streamMetaClose(pTq->pStreamMeta); + qDebug("vgId:%d end to close tq", vgId); + +#if 0 + streamMetaFreeTQDuringScanWalError(pTq); +#endif + taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index bc7e2e28e3..9ea84830f1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -22,6 +22,8 @@ typedef struct SBuildScanWalMsgParam { int64_t metaId; int32_t numOfTasks; + int8_t restored; + SMsgCb msgCb; } SBuildScanWalMsgParam; static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); @@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) { static void doStartScanWal(void* param, void* tmrId) { int32_t vgId = 0; - STQ* pTq = NULL; int32_t code = 0; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; @@ -86,13 +87,29 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + if (pMeta->closeFlag) { + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); + if (code == TSDB_CODE_SUCCESS) { + tqDebug("vgId:%d jump out of scan wal timer since closed", vgId); + } else { + tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, + tstrerror(code)); + } + + taosMemoryFree(pParam); + return; + } + vgId = pMeta->vgId; - pTq = pMeta->ahandle; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pTq->pVnode->restored); + pParam->restored); +#if 0 + // wait for the vnode is freed, and invalid read may occur. + taosMsleep(10000); +#endif - code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); if (code) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } @@ -120,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { pParam->metaId = pMeta->rid; pParam->numOfTasks = numOfTasks; + pParam->restored = pTq->pVnode->restored; + pParam->msgCb = pTq->pVnode->msgCb; code = streamTimerGetInstance(&pTimer); if (code) { @@ -330,13 +349,13 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t vgId = pStreamMeta->vgId; + SArray* pTaskList = NULL; int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } // clone the task list, to avoid the task update during scan wal files - SArray* pTaskList = NULL; streamMetaWLock(pStreamMeta); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); streamMetaWUnLock(pStreamMeta); @@ -447,3 +466,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) { return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); } + +void streamMetaFreeTQDuringScanWalError(STQ* pTq) { + SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); + p->metaId = pTq->pStreamMeta->rid; + p->numOfTasks = 0; + + doStartScanWal(p, 0); +} \ No newline at end of file diff --git a/source/libs/stream/src/streamErrorInjection.c b/source/libs/stream/src/streamErrorInjection.c index 515845ba2b..8bbe403dcc 100644 --- a/source/libs/stream/src/streamErrorInjection.c +++ b/source/libs/stream/src/streamErrorInjection.c @@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId) // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode taosMsleep(65*1000); -} \ No newline at end of file +} diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 53b6a38b35..7c157bb05e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -331,7 +331,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } else { stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } -// taosMemoryFree(param); return; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0de256d86d..9a2eeb9311 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -576,6 +576,7 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } + int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid); if (code) { stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));