From 5b9f43453295759b3e0f64f1aca83962e072944e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 31 Oct 2024 17:47:54 +0800 Subject: [PATCH] fix issue for value --- source/libs/executor/inc/tfill.h | 1 + source/libs/executor/src/streamfilloperator.c | 12 ++++--- .../executor/src/streamtimesliceoperator.c | 36 ++++++++++++++----- .../8-stream/force_window_close_interval.py | 10 +++--- 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index c58a2a5d5b..6072063bbf 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -119,6 +119,7 @@ typedef struct SStreamFillInfo { int32_t delIndex; uint64_t curGroupId; bool hasNext; + SResultRowData* pNonFillRow; } SStreamFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 3b6f77ad41..385d1e5776 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1541,14 +1541,12 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* } pFillInfo->pLinearInfo->winIndex = 0; + pFillInfo->pNonFillRow = NULL; pFillInfo->pResRow = NULL; if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F || pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) { pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData)); - if (!pFillInfo->pResRow) { - code = terrno; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno); pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); @@ -1568,6 +1566,12 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pCell->bytes = pColData->info.bytes; pCell->type = pColData->info.type; } + + pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData)); + QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno); + pFillInfo->pNonFillRow->key = INT64_MIN; + pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); + memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize); } pFillInfo->type = pFillSup->type; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index b49b42f0b2..f61c731523 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -30,6 +30,8 @@ #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" +#define IS_FILL_CONST_VALUE(type) ((type == TSDB_FILL_NULL || type == TSDB_FILL_NULL_F || type == TSDB_FILL_SET_VALUE || type == TSDB_FILL_SET_VALUE_F)) + int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); } @@ -371,7 +373,17 @@ SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { return pCell; } -static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock, +static bool isGroupKeyFunc(SExprInfo* pExprInfo) { + int32_t functionType = pExprInfo->pExpr->_function.functionType; + return (functionType == FUNCTION_TYPE_GROUP_KEY); +} + +static bool isSelectGroupConstValueFunc(SExprInfo* pExprInfo) { + int32_t functionType = pExprInfo->pExpr->_function.functionType; + return (functionType == FUNCTION_TYPE_GROUP_CONST_VALUE); +} + +static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, SResultRowData* pNonFillRow, TSKEY ts, SSDataBlock* pBlock, bool* pRes, bool isFilled) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -401,7 +413,12 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p QUERY_CHECK_CODE(code, lino, _end); } else { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); + SResultCellData* pCell = NULL; + if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) { + pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot); + } else { + pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); + } code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); } @@ -424,7 +441,7 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; // if (inWinRange(&pFillSup->winRange, &st)) { bool res = true; - code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true); + code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->pNonFillRow, pFillInfo->current, pBlock, &res, true); QUERY_CHECK_CODE(code, lino, _end); // } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, @@ -533,13 +550,13 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p int32_t lino = 0; bool res = true; if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) { - code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); return; } if (pFillInfo->pos == FILL_POS_START) { - code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); if (res) { pFillInfo->pos = FILL_POS_INVALID; @@ -549,7 +566,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p fillNormalRange(pFillSup, pFillInfo, pRes); if (pFillInfo->pos == FILL_POS_MID) { - code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); if (res) { pFillInfo->pos = FILL_POS_INVALID; @@ -567,7 +584,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p fillLinearRange(pFillSup, pFillInfo, pRes); if (pFillInfo->pos == FILL_POS_MID) { - code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); if (res) { pFillInfo->pos = FILL_POS_INVALID; @@ -583,7 +600,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p } } if (pFillInfo->pos == FILL_POS_END) { - code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false); QUERY_CHECK_CODE(code, lino, _end); if (res) { pFillInfo->pos = FILL_POS_INVALID; @@ -929,6 +946,7 @@ _end: return code; } +// partition key static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; @@ -936,7 +954,7 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo !isIsfilledPseudoColumn(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); - SResultCellData* pDestCell = getResultCell(pFillInfo->pResRow, srcSlot); + SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot); pDestCell->isNull = pSrcCell->isNull; if (!pDestCell->isNull) { memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); diff --git a/tests/system-test/8-stream/force_window_close_interval.py b/tests/system-test/8-stream/force_window_close_interval.py index 34e03dbc70..b97aa8ff81 100644 --- a/tests/system-test/8-stream/force_window_close_interval.py +++ b/tests/system-test/8-stream/force_window_close_interval.py @@ -532,13 +532,13 @@ class TDTestCase: fill_value=fill_value, ) self.tdCom.check_query_data( - f'select irowts, table_name, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts', - f'select _irowts as irowts ,tbname as table_name, c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts', + f'select irowts, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts', + f'select _irowts as irowts , c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts', fill_value=fill_value, ) self.tdCom.check_query_data( - f'select irowts, table_name, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts', - f'select _irowts as irowts ,tbname as table_name, c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts', + f'select irowts, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts', + f'select _irowts as irowts , c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts', fill_value=fill_value, ) else: @@ -622,7 +622,7 @@ class TDTestCase: ) def run(self): - for fill_value in ["PREV", "NEXT", "VALUE"]: + for fill_value in ["PREV", "VALUE"]: self.force_window_close( interval=10, partition="tbname",