diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index bb04cd8dba..9ea84830f1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -22,6 +22,8 @@ typedef struct SBuildScanWalMsgParam { int64_t metaId; int32_t numOfTasks; + int8_t restored; + SMsgCb msgCb; } SBuildScanWalMsgParam; static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); @@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) { static void doStartScanWal(void* param, void* tmrId) { int32_t vgId = 0; - STQ* pTq = NULL; int32_t code = 0; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; @@ -100,16 +101,17 @@ static void doStartScanWal(void* param, void* tmrId) { } vgId = pMeta->vgId; - pTq = pMeta->ahandle; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, - pTq->pVnode->restored); + pParam->restored); +#if 0 + // wait for the vnode is freed, and invalid read may occur. + taosMsleep(10000); +#endif - if (pTq->pVnode != NULL) { - code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - if (code) { - tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); - } + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + if (code) { + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } code = taosReleaseRef(streamMetaRefPool, pParam->metaId); @@ -135,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { pParam->metaId = pMeta->rid; pParam->numOfTasks = numOfTasks; + pParam->restored = pTq->pVnode->restored; + pParam->msgCb = pTq->pVnode->msgCb; code = streamTimerGetInstance(&pTimer); if (code) {