From 5158bf6b96833752ee87a69af54498d92448b802 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 31 Oct 2024 15:50:56 +0800 Subject: [PATCH] fix issue --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/streamexecutorInt.h | 2 + source/libs/executor/src/streamfilloperator.c | 3 +- .../src/streamintervalsliceoperator.c | 7 +++ .../executor/src/streamtimesliceoperator.c | 47 +++++++++++++++---- .../8-stream/force_window_close_interval.py | 3 +- 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index aa7344758a..1e59f3f620 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -856,6 +856,7 @@ typedef struct SStreamTimeSliceOperatorInfo { SGroupResInfo groupResInfo; bool ignoreNull; bool isHistoryOp; + SArray* pCloseTs; struct SOperatorInfo* pOperator; } SStreamTimeSliceOperatorInfo; diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 1acfb4d205..3ebf6726c7 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -100,6 +100,8 @@ int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, struct SOperatorInfo** ppOptInfo); +int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); +void removeDuplicateTs(SArray* pTsArrray); #ifdef __cplusplus } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 08be9a4a64..3b6f77ad41 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1272,7 +1272,7 @@ _end: return code; } -static int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { +int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int64_t groupId = 0; @@ -1367,6 +1367,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); } + removeDuplicateTs(pInfo->pCloseTs); for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated); diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index 2b53159cb9..1179d8ce0c 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -19,6 +19,7 @@ #include "storageapi.h" #include "streamexecutorInt.h" #include "tcommon.h" +#include "tcompare.h" #include "tdatablock.h" #include "ttime.h" @@ -627,3 +628,9 @@ _error: (*ppOptInfo) = NULL; return code; } + +void removeDuplicateTs(SArray* pTsArrray) { + __compar_fn_t fn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, TSDB_ORDER_ASC); + taosArraySort(pTsArrray, fn); + taosArrayRemoveDuplicate(pTsArrray, fn, NULL); +} \ No newline at end of file diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 07c96d38df..113d5d5f5a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -177,6 +177,8 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); + taosArrayDestroy(pInfo->pCloseTs); + taosMemoryFreeClear(param); } @@ -1449,11 +1451,15 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); - qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRId64, pKey->ts, pKey->groupId); + qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRIu64, pKey->ts, pKey->groupId); if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; } else if (pBlock->info.id.groupId != pKey->groupId) { - break; + if (pBlock->info.rows > 0) { + break; + } else { + pBlock->info.id.groupId = pKey->groupId; + } } SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint prevPoint = {0}; @@ -1813,8 +1819,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR goto _end; } break; case STREAM_GET_RESULT: { - code = setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap); - QUERY_CHECK_CODE(code, lino, _end); + void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey); + QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); continue; } default: @@ -1831,12 +1837,29 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); } - void* pIte = NULL; - int32_t iter = 0; - while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { - SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL); - void* tmp = taosArrayPush(pInfo->pUpdated, pKey); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + if (taosArrayGetSize(pInfo->pCloseTs) > 0) { + removeDuplicateTs(pInfo->pCloseTs); + int32_t size = taosArrayGetSize(pInfo->pCloseTs); + qDebug("build stream result, ts count:%d", size); + for (int32_t i = 0; i < size; i++) { + TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); + code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated); + QUERY_CHECK_CODE(code, lino, _end); + } + qDebug("build stream result, ts count:%d", taosArrayGetSize(pInfo->pUpdated)); + taosArrayClear(pInfo->pCloseTs); + if (size > 1024) { + taosArrayDestroy(pInfo->pCloseTs); + pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY)); + } + } else { + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { + SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL); + void* tmp = taosArrayPush(pInfo->pUpdated, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + } } taosArraySort(pInfo->pUpdated, winKeyCmprImpl); @@ -2088,6 +2111,10 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } + + pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY)); + QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno); + pInfo->pOperator = pOperator; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; diff --git a/tests/system-test/8-stream/force_window_close_interval.py b/tests/system-test/8-stream/force_window_close_interval.py index 88b1628b25..d0e87847a9 100644 --- a/tests/system-test/8-stream/force_window_close_interval.py +++ b/tests/system-test/8-stream/force_window_close_interval.py @@ -275,8 +275,9 @@ class TDTestCase: time.sleep(self.tdCom.dataDict["interval"]) tdSql.query("show streams") tdLog.info(f"tdSql.queryResult:{tdSql.queryResult},tdSql.queryRows:{tdSql.queryRows}") + localQueryResult = tdSql.queryResult for stream_number in range(tdSql.queryRows): - stream_name = tdSql.queryResult[stream_number][0] + stream_name = localQueryResult[stream_number][0] tdCom.check_stream_task_status( stream_name=stream_name, vgroups=2, stream_timeout=20,check_wal_info=False )