From 62cfcefbb0e26c29c06ff1d22f414a71018274c8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Dec 2023 00:34:04 +0800 Subject: [PATCH] refactor: remove stream-scan-history event for stream task. --- include/libs/stream/tstream.h | 19 +++-- source/dnode/mnode/impl/src/mndStream.c | 88 +++++++++++----------- source/dnode/vnode/src/tq/tq.c | 27 +++---- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/parser/src/parTranslater.c | 2 +- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamStart.c | 20 +++-- source/libs/stream/src/streamTaskSm.c | 32 ++++---- 9 files changed, 99 insertions(+), 95 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2d2db5c1dc..0c09e0e1ea 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -81,7 +81,7 @@ typedef enum ETaskStatus { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore - TASK_STATUS__STREAM_SCAN_HISTORY, +// TASK_STATUS__STREAM_SCAN_HISTORY, } ETaskStatus; enum { @@ -138,15 +138,14 @@ enum { typedef enum EStreamTaskEvent { TASK_EVENT_INIT = 0x1, TASK_EVENT_INIT_SCANHIST = 0x2, - TASK_EVENT_INIT_STREAM_SCANHIST = 0x3, - TASK_EVENT_SCANHIST_DONE = 0x4, - TASK_EVENT_STOP = 0x5, - TASK_EVENT_GEN_CHECKPOINT = 0x6, - TASK_EVENT_CHECKPOINT_DONE = 0x7, - TASK_EVENT_PAUSE = 0x8, - TASK_EVENT_RESUME = 0x9, - TASK_EVENT_HALT = 0xA, - TASK_EVENT_DROPPING = 0xB, + TASK_EVENT_SCANHIST_DONE = 0x3, + TASK_EVENT_STOP = 0x4, + TASK_EVENT_GEN_CHECKPOINT = 0x5, + TASK_EVENT_CHECKPOINT_DONE = 0x6, + TASK_EVENT_PAUSE = 0x7, + TASK_EVENT_RESUME = 0x8, + TASK_EVENT_HALT = 0x9, + TASK_EVENT_DROPPING = 0xA, } EStreamTaskEvent; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 25f51b85df..2464a9207e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2886,38 +2886,38 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { return NULL; } -static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { - if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { - if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { - int32_t numOfReady = 0; - int32_t numOfTotal = 0; - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pTaskEntry->id.streamId == pId->streamId) { - numOfTotal++; - - if (pTaskEntry->id.taskId != pId->taskId) { - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); - if (pEntry->status == TASK_STATUS__READY) { - numOfReady++; - } - } - } - } - - if (numOfReady > 0) { - mDebug("stream:0x%" PRIx64 - " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", - pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); - return true; - } else { - return false; - } - } - } - - return false; -} +//static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { +// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { +// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { +// int32_t numOfReady = 0; +// int32_t numOfTotal = 0; +// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { +// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); +// if (pTaskEntry->id.streamId == pId->streamId) { +// numOfTotal++; +// +// if (pTaskEntry->id.taskId != pId->taskId) { +// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); +// if (pEntry->status == TASK_STATUS__READY) { +// numOfReady++; +// } +// } +// } +// } +// +// if (numOfReady > 0) { +// mDebug("stream:0x%" PRIx64 +// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", +// pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); +// return true; +// } else { +// return false; +// } +// } +// } +// +// return false; +//} // currently only handle the sink task // 1. sink task, drop related fill-history task msg is missing @@ -3091,18 +3091,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); - if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { - bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); - if (drop) { - SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); - if (pStreamObj == NULL) { - mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); - } else { - mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); - mndReleaseStream(pMnode, pStreamObj); - } - } - } +// if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { +// bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); +// if (drop) { +// SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); +// if (pStreamObj == NULL) { +// mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); +// } else { +// mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); +// mndReleaseStream(pMnode, pStreamObj); +// } +// } +// } } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8cc4f9d54..a8b5842b93 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1037,15 +1037,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pStreamTask); } else { - STimeWindow* pWindow = &pTask->dataRange.window; - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); - - // Not update the fill-history time window until the state transfer is completed. - tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 - ", window:%" PRId64 " - %" PRId64, - id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); - - code = streamTaskScanHistoryDataComplete(pTask); + ASSERT(0); +// STimeWindow* pWindow = &pTask->dataRange.window; +// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); +// +// // Not update the fill-history time window until the state transfer is completed. +// tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 +// ", window:%" PRId64 " - %" PRId64, +// id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); +// +// code = streamTaskScanHistoryDataComplete(pTask); } atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1170,7 +1171,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } else if (status == TASK_STATUS__UNINIT) { // todo: fill-history task init ? if (pTask->info.fillHistory == 0) { - EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, event); } } @@ -1363,9 +1364,9 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - } +// if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { +// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); +// } SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f576345f64..2991e3cef5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -545,7 +545,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3bb24566c2..6f3915c4d7 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7989,7 +7989,7 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void } if (TSDB_CODE_SUCCESS == code) { if (interval.interval > 0) { - pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval); + pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision); } else { pStmt->pReq->lastTs = lastTs; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 25f32195be..7d2a572cda 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -646,7 +646,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { ETaskStatus st = streamTaskGetStatus(pTask, NULL); - return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__STREAM_SCAN_HISTORY || + return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__STREAM_SCAN_HISTORY*/ || st == TASK_STATUS__CK); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d09d370a36..f2dcc3bcc6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1484,7 +1484,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { continue; } - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, event); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index ea5e2edc09..70f83e27a5 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -52,7 +52,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); ETaskStatus status = streamTaskGetStatus(pTask, &p); - if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && + if ((status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", @@ -158,7 +158,7 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); ASSERT(pTask->status.downstreamReady == 1 && - ((status == TASK_STATUS__SCAN_HISTORY) || (status == TASK_STATUS__STREAM_SCAN_HISTORY))); + ((status == TASK_STATUS__SCAN_HISTORY)/* || (status == TASK_STATUS__STREAM_SCAN_HISTORY)*/)); if (level == TASK_LEVEL__SOURCE) { return doStartScanHistoryTask(pTask); @@ -374,10 +374,14 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); + ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); - stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); - streamTaskStartScanHistory(pTask); + if (pTask->info.fillHistory == 1) { + stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p); + streamTaskStartScanHistory(pTask); + } else { + stDebug("s-task:%s scan wal data, status:%s", id, p); + } // NOTE: there will be an deadlock if launch fill history here. // // start the related fill-history task, when current task is ready @@ -391,7 +395,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event; if (pTask->info.fillHistory == 0) { - event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; } else { event = TASK_EVENT_INIT_SCANHIST; } @@ -631,7 +635,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) { + if (status != TASK_STATUS__SCAN_HISTORY /*&& status != TASK_STATUS__STREAM_SCAN_HISTORY*/) { stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, pReq->upstreamTaskId); @@ -693,7 +697,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { return TSDB_CODE_INVALID_MSG; } - ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY); + ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); SStreamMeta* pMeta = pTask->pMeta; // execute in the scan history complete call back msg, ready to process data from inputQ diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 51e6ad23fe..4bd6483f7f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -31,14 +31,14 @@ SStreamTaskState StreamTaskStatusList[9] = { {.state = TASK_STATUS__HALT, .name = "halt"}, {.state = TASK_STATUS__PAUSE, .name = "paused"}, {.state = TASK_STATUS__CK, .name = "checkpoint"}, - {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, +// {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"}, }; SStreamEventInfo StreamTaskEventList[12] = { {.event = 0, .name = ""}, // dummy event, place holder {.event = TASK_EVENT_INIT, .name = "initialize"}, {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"}, - {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"}, +// {.event = TASK_EVENT_INIT_STREAM_SCANHIST, .name = "stream-scan-history-init"}, {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"}, {.event = TASK_EVENT_STOP, .name = "stopping"}, {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"}, @@ -110,12 +110,12 @@ int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) { // todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) { - if (event == TASK_EVENT_INIT_STREAM_SCANHIST || event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { + if (/*event == TASK_EVENT_INIT_STREAM_SCANHIST || */event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) { return (state != TASK_STATUS__UNINIT); } if (event == TASK_EVENT_SCANHIST_DONE) { - return (state != TASK_STATUS__SCAN_HISTORY && state != TASK_STATUS__STREAM_SCAN_HISTORY); + return (state != TASK_STATUS__SCAN_HISTORY/* && state != TASK_STATUS__STREAM_SCAN_HISTORY*/); } if (event == TASK_EVENT_GEN_CHECKPOINT) { @@ -482,14 +482,14 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); +// taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); +// taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); @@ -499,8 +499,8 @@ void doInitStateTransferTable(void) { SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); +// taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); @@ -519,8 +519,8 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE}; - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); +// taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true); @@ -554,8 +554,8 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true); +// taosArrayPush(streamTaskSMTrans, &trans); // dropping related event trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); @@ -574,7 +574,7 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); - taosArrayPush(streamTaskSMTrans, &trans); +// trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); +// taosArrayPush(streamTaskSMTrans, &trans); } //clang-format on \ No newline at end of file