From af9dc441b20e646882a8cbb4a23bfb34f9518f8f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Jan 2024 18:40:06 +0800 Subject: [PATCH] fix(stream): reset the saved value for fill-history task. --- include/libs/executor/executor.h | 1 + include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tqCommon/tqCommon.c | 13 ++---- source/libs/executor/src/executor.c | 51 ++++++++++++++++++++++ source/libs/stream/src/streamStart.c | 5 +++ 5 files changed, 62 insertions(+), 9 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index f78b7a3126..be11d04ff8 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,6 +210,7 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); +int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index dbcdef57b0..d11a4ad23b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -825,6 +825,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); +int32_t streamResetParamForScanHistory(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 5467e3dadd..2e83cd5bc5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -728,6 +728,10 @@ int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); + + if ((*pTask)->info.fillHistory == 1) { + streamResetParamForScanHistory(*pTask); + } } return 0; @@ -760,19 +764,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); -// streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); -// code = streamMetaLoadAllTasks(pMeta); -// if (code != TSDB_CODE_SUCCESS) { -// tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); -// streamMetaWUnLock(pMeta); -// code = terrno; -// return code; -// } - { STaskStartInfo* pStartInfo = &pMeta->startInfo; taosHashClear(pStartInfo->pReadyTaskSet); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6cee79bff2..c29fae0cef 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1023,6 +1023,57 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { return 0; } +int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + + while (1) { + int32_t type = pOperator->operatorType; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + pSup->calTriggerSaved = 0; + pSup->deleteMarkSaved = 0; + qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + pSup->calTriggerSaved = 0; + pSup->deleteMarkSaved = 0; + qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + pSup->calTriggerSaved = 0; + pSup->deleteMarkSaved = 0; + qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + pSup->calTriggerSaved = 0; + pSup->deleteMarkSaved = 0; + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + } + + // iterate operator tree + if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { + return 0; + } else { + pOperator = pOperator->pDownstream[0]; + } + } +} + int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; const char* id = GET_TASKID(pTaskInfo); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0dcc94a530..0c2993e296 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -544,6 +544,11 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) { return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } +int32_t streamResetParamForScanHistory(SStreamTask* pTask) { + stDebug("s-task:%s reset operator option for scan-history data", pTask->id.idStr); + return qResetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); +} + int32_t streamRestoreParam(SStreamTask* pTask) { stDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr); return qRestoreStreamOperatorOption(pTask->exec.pExecutor);