fix issue for value

This commit is contained in:
54liuyao 2024-10-31 17:47:54 +08:00
parent fde9eeb9ca
commit 5b9f434532
4 changed files with 41 additions and 18 deletions

View File

@ -119,6 +119,7 @@ typedef struct SStreamFillInfo {
int32_t delIndex; int32_t delIndex;
uint64_t curGroupId; uint64_t curGroupId;
bool hasNext; bool hasNext;
SResultRowData* pNonFillRow;
} SStreamFillInfo; } SStreamFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);

View File

@ -1541,14 +1541,12 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
} }
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
pFillInfo->pNonFillRow = NULL;
pFillInfo->pResRow = NULL; pFillInfo->pResRow = NULL;
if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F || if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) { pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData)); pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
if (!pFillInfo->pResRow) { QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno);
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->key = INT64_MIN;
pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
@ -1568,6 +1566,12 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pCell->bytes = pColData->info.bytes; pCell->bytes = pColData->info.bytes;
pCell->type = pColData->info.type; pCell->type = pColData->info.type;
} }
pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
pFillInfo->pNonFillRow->key = INT64_MIN;
pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize);
} }
pFillInfo->type = pFillSup->type; pFillInfo->type = pFillSup->type;

View File

@ -30,6 +30,8 @@
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
#define IS_FILL_CONST_VALUE(type) ((type == TSDB_FILL_NULL || type == TSDB_FILL_NULL_F || type == TSDB_FILL_SET_VALUE || type == TSDB_FILL_SET_VALUE_F))
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) {
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0);
} }
@ -371,7 +373,17 @@ SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) {
return pCell; return pCell;
} }
static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock, static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_GROUP_KEY);
}
static bool isSelectGroupConstValueFunc(SExprInfo* pExprInfo) {
int32_t functionType = pExprInfo->pExpr->_function.functionType;
return (functionType == FUNCTION_TYPE_GROUP_CONST_VALUE);
}
static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, SResultRowData* pNonFillRow, TSKEY ts, SSDataBlock* pBlock,
bool* pRes, bool isFilled) { bool* pRes, bool isFilled) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
@ -401,7 +413,12 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); SResultCellData* pCell = NULL;
if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) {
pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot);
} else {
pCell = getSliceResultCell(pResRow->pRowVal, srcSlot);
}
code = setRowCell(pDstCol, pBlock->info.rows, pCell); code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -424,7 +441,7 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
// if (inWinRange(&pFillSup->winRange, &st)) { // if (inWinRange(&pFillSup->winRange, &st)) {
bool res = true; bool res = true;
code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true); code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->pNonFillRow, pFillInfo->current, pBlock, &res, true);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
// } // }
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@ -533,13 +550,13 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
int32_t lino = 0; int32_t lino = 0;
bool res = true; bool res = true;
if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) { if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
return; return;
} }
if (pFillInfo->pos == FILL_POS_START) { if (pFillInfo->pos == FILL_POS_START) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (res) { if (res) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
@ -549,7 +566,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
fillNormalRange(pFillSup, pFillInfo, pRes); fillNormalRange(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) { if (pFillInfo->pos == FILL_POS_MID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (res) { if (res) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
@ -567,7 +584,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
fillLinearRange(pFillSup, pFillInfo, pRes); fillLinearRange(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) { if (pFillInfo->pos == FILL_POS_MID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (res) { if (res) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
@ -583,7 +600,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
} }
} }
if (pFillInfo->pos == FILL_POS_END) { if (pFillInfo->pos == FILL_POS_END) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false); code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (res) { if (res) {
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
@ -929,6 +946,7 @@ _end:
return code; return code;
} }
// partition key
static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
@ -936,7 +954,7 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo
!isIsfilledPseudoColumn(pFillCol->pExpr)) { !isIsfilledPseudoColumn(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot);
SResultCellData* pDestCell = getResultCell(pFillInfo->pResRow, srcSlot); SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot);
pDestCell->isNull = pSrcCell->isNull; pDestCell->isNull = pSrcCell->isNull;
if (!pDestCell->isNull) { if (!pDestCell->isNull) {
memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes);

View File

@ -532,13 +532,13 @@ class TDTestCase:
fill_value=fill_value, fill_value=fill_value,
) )
self.tdCom.check_query_data( self.tdCom.check_query_data(
f'select irowts, table_name, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts', f'select irowts, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts',
f'select _irowts as irowts ,tbname as table_name, c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts', f'select _irowts as irowts , c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts',
fill_value=fill_value, fill_value=fill_value,
) )
self.tdCom.check_query_data( self.tdCom.check_query_data(
f'select irowts, table_name, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts', f'select irowts, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts',
f'select _irowts as irowts ,tbname as table_name, c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts', f'select _irowts as irowts , c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} and ts >= {start_force_ts} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts',
fill_value=fill_value, fill_value=fill_value,
) )
else: else:
@ -622,7 +622,7 @@ class TDTestCase:
) )
def run(self): def run(self):
for fill_value in ["PREV", "NEXT", "VALUE"]: for fill_value in ["PREV", "VALUE"]:
self.force_window_close( self.force_window_close(
interval=10, interval=10,
partition="tbname", partition="tbname",