From 975cb9c907e5ff0e879b3b99dcdc73f2f363b5c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 15:54:15 +0800 Subject: [PATCH 1/8] fix(stream): add some logs. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 7 ++----- source/libs/stream/src/streamMeta.c | 4 ++++ source/libs/stream/src/streamSched.c | 6 +++++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 6b7e857120..3871011407 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t int32_t vgId = pMeta->vgId; int32_t code = 0; - if (pTask == NULL) { - return -1; - } - streamTaskResume(pTask); ETaskStatus status = streamTaskGetStatus(pTask).state; @@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } } - streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { + streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m streamMutexUnlock(&pHTask->lock); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); + streamMetaReleaseTask(pMeta, pHTask); } return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 44c9e76906..29152c6205 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -759,6 +759,10 @@ void streamMetaAcquireOneTask(SStreamTask* pTask) { } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { + if (pTask == NULL) { + return; + } + int32_t taskId = pTask->id.taskId; int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 98920e6f70..095a5af6d4 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 pRunReq->reqType = execType; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + } + return code; } void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } From 255a3e9258f139e4c3919e67b3092c0760f9efae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 16:21:11 +0800 Subject: [PATCH 2/8] fix(util): reset the returned length value. --- source/common/src/tdatablock.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7a67522231..98e58c8bd7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2610,7 +2610,10 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf if (code < 0) { uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); goto _exit; + } else { // reset the length value + code = TSDB_CODE_SUCCESS; } + len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) goto _exit; } break; From dfb47ac5a909875efc8688168a917cc0f044e3e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 18:28:14 +0800 Subject: [PATCH 3/8] fix(stream): only keep the latest pause operation status. --- source/libs/stream/src/streamTaskSm.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 3501d30be4..a10c4c30d5 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -485,6 +485,11 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; + // we only keep the latest pause state + if (pSM->prev.state.state == TASK_STATUS__PAUSE && pSM->current.state == TASK_STATUS__PAUSE) { + return; + } + pSM->prev.state = pSM->current; pSM->prev.evt = pTrans->event; } @@ -501,9 +506,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && - s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { - stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); + if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT && + s != TASK_STATUS__READY) { + stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, + GET_EVT_NAME(pSM->prev.evt)); } // the pSM->prev.evt may be 0, so print string is not appropriate. From 0c7b3cfb39bd5759c6bf50e2dd9e6b67a1b8456e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 18:34:52 +0800 Subject: [PATCH 4/8] fix(stream): avoid the later pause overwrite the previous pause state. --- source/libs/stream/src/streamTaskSm.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index a10c4c30d5..17d5d884a7 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -485,11 +485,6 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ static void keepPrevInfo(SStreamTaskSM* pSM) { STaskStateTrans* pTrans = pSM->pActiveTrans; - // we only keep the latest pause state - if (pSM->prev.state.state == TASK_STATUS__PAUSE && pSM->current.state == TASK_STATUS__PAUSE) { - return; - } - pSM->prev.state = pSM->current; pSM->prev.evt = pTrans->event; } @@ -527,10 +522,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even return TSDB_CODE_STREAM_INVALID_STATETRANS; } - keepPrevInfo(pSM); + // repeat pause will not overwrite the previous pause state + if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) { + keepPrevInfo(pSM); - pSM->current = pTrans->next; - pSM->pActiveTrans = NULL; + pSM->current = pTrans->next; + pSM->pActiveTrans = NULL; + } // todo remove it // todo: handle the error code From 2a3cb14d2e19f386eaaba267e24b05e6760eceaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Oct 2024 19:28:08 +0800 Subject: [PATCH 5/8] other: update logs. --- source/libs/stream/src/streamTimer.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8b77fe7cb1..0da9acfd1d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* } } - stDebug("vgId:%d start %s tmr succ", vgId, pMsg); + stTrace("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) { From a58371b0ee4fa69b32fef56f8ecd316a73b79d88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Oct 2024 19:46:54 +0800 Subject: [PATCH 6/8] fix(stream): reset the activeTrans if pause recv repeatly. --- source/libs/stream/src/streamTaskSm.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 17d5d884a7..c3a2742aa2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -525,11 +525,12 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even // repeat pause will not overwrite the previous pause state if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) { keepPrevInfo(pSM); - pSM->current = pTrans->next; - pSM->pActiveTrans = NULL; + } else { + stDebug("s-task:%s repeat pause evt recv, not update prev status", id); } + pSM->pActiveTrans = NULL; // todo remove it // todo: handle the error code // on success callback, add into lock if necessary, or maybe we should add an option for this? From 913c7c7831b7a6baca0f1b65ae8aac8cd1f00373 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Oct 2024 18:36:49 +0800 Subject: [PATCH 7/8] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 3 ++- source/dnode/mnode/impl/src/mndStreamTransAct.c | 9 +++------ source/dnode/vnode/src/tsdb/tsdbRead2.c | 1 + 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 69d3de25fc..a4327b777f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1783,7 +1783,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - int32_t code = 0; + int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return code; @@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return 0; } + mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); return -1; diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 3ecd192222..0f74571b34 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { - terrno = code; taosMemoryFree(pReq); - return terrno; + return code; } code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); - return terrno; + return code; } mDebug("set the resume action for trans:%d", pTrans->id); - return 0; + return code; } static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 36bfb56120..990373b87b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5909,6 +5909,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_ } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { code = TSDB_CODE_INVALID_PARA; + tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code)); metaReaderClear(&mr); return code; } From 4edbb2f91857d93f522b8050cb3b8610320d80fe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 16 Oct 2024 22:07:37 +0800 Subject: [PATCH 8/8] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamCheckpoint.c | 3 ++- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamMeta.c | 5 +++-- source/libs/stream/src/streamSched.c | 4 ++-- source/libs/stream/src/streamTask.c | 3 +++ 8 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6d750468e..58c1707e1f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -void streamMetaAcquireOneTask(SStreamTask* pTask); +int32_t streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3871011407..a00e92997c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if ((ppTask != NULL) && ((*ppTask) != NULL)) { - streamMetaAcquireOneTask(*ppTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask); SStreamTask* pTask = *ppTask; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 75bcc326b3..c1c54b3c0b 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { return; } - /*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e44bca123b..be914d9746 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 78cbd844a0..5dfcb39cb9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1158,7 +1158,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 29152c6205..7e9b60b61a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -753,9 +753,10 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task return code; } -void streamMetaAcquireOneTask(SStreamTask* pTask) { +int32_t streamMetaAcquireOneTask(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); + return ref; } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { @@ -866,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - // it is an fill-history task, remove the related stream task's id that points to it + // it is a fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 095a5af6d4..cdaa603e38 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delaySchema = pTask->info.delaySchedParam; if (delaySchema != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + int32_t ref = streamMetaAcquireOneTask(pTask); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); @@ -80,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 71a2ed3e4a..727701e03e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->inputq.queue) { streamQueueClose(pTask->inputq.queue, pTask->id.taskId); + pTask->inputq.queue = NULL; } if (pTask->outputq.queue) { streamQueueClose(pTask->outputq.queue, pTask->id.taskId); + pTask->outputq.queue = NULL; } if (pTask->exec.qmsg) { @@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->exec.pWalReader != NULL) { walCloseReader(pTask->exec.pWalReader); + pTask->exec.pWalReader = NULL; } streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);