opt stream row buff

This commit is contained in:
54liuyao 2024-11-26 15:23:57 +08:00
parent 39564a41ae
commit f37ae57a87
6 changed files with 88 additions and 44 deletions

View File

@ -475,6 +475,7 @@ typedef struct SStreamFillSupporter {
STimeWindow winRange; STimeWindow winRange;
int32_t pkColBytes; int32_t pkColBytes;
__compar_fn_t comparePkColFn; __compar_fn_t comparePkColFn;
int32_t* pOffsetInfo;
} SStreamFillSupporter; } SStreamFillSupporter;
typedef struct SStreamScanInfo { typedef struct SStreamScanInfo {
@ -884,6 +885,7 @@ typedef struct SStreamIntervalSliceOperatorInfo {
struct SOperatorInfo* pOperator; struct SOperatorInfo* pOperator;
bool hasFill; bool hasFill;
bool hasInterpoFunc; bool hasInterpoFunc;
int32_t* pOffsetInfo;
} SStreamIntervalSliceOperatorInfo; } SStreamIntervalSliceOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)

View File

@ -90,19 +90,20 @@ int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap);
int winPosCmprImpl(const void* pKey1, const void* pKey2); int winPosCmprImpl(const void* pKey1, const void* pKey2);
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index); SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo);
int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol); int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol);
void destroyFlusedppPos(void* ppRes); void destroyFlusedppPos(void* ppRes);
void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol); int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo);
int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull); int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull);
int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
struct SOperatorInfo** ppOptInfo); struct SOperatorInfo** ppOptInfo);
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated);
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -100,6 +100,8 @@ void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
taosMemoryFree(pFillSup->next.pRowVal); taosMemoryFree(pFillSup->next.pRowVal);
taosMemoryFree(pFillSup->nextNext.pRowVal); taosMemoryFree(pFillSup->nextNext.pRowVal);
taosMemoryFree(pFillSup->pOffsetInfo);
taosMemoryFree(pFillSup); taosMemoryFree(pFillSup);
} }
@ -1573,10 +1575,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->key = INT64_MIN;
pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
if (!pFillInfo->pResRow->pRowVal) { QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno);
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
@ -1590,6 +1589,21 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pCell->type = pColData->info.type; pCell->type = pColData->info.type;
} }
int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock);
if (numOfResCol < pFillSup->numOfAllCols) {
int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t));
QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno);
pFillSup->pOffsetInfo = pTmpBuf;
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1);
int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData);
for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) {
pFillSup->pOffsetInfo[i] = preLength;
pCell = getResultCell(pFillInfo->pResRow, i);
preLength += pCell->bytes + sizeof(SResultCellData);
}
}
pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData)); pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno); QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
pFillInfo->pNonFillRow->key = INT64_MIN; pFillInfo->pNonFillRow->key = INT64_MIN;

View File

@ -74,6 +74,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(pInfo->pOffsetInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -163,7 +164,7 @@ _end:
} }
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock, void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) { int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t k = 0; k < pSup->numOfExprs; ++k) { for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
@ -175,7 +176,7 @@ void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId); SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
double prevVal = 0, curVal = 0, winVal = 0; double prevVal = 0, curVal = 0, winVal = 0;
SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId); SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData); GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
@ -278,7 +279,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp); doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
0, pBlock->info.rows, numOfOutput); 0, pBlock->info.rows, numOfOutput);
@ -294,7 +295,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
} }
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
@ -302,7 +303,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) {
int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
TSKEY endRowTs = tsCols[endRowId]; TSKEY endRowTs = tsCols[endRowId];
transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
} }
SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId}; SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
@ -596,6 +597,9 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
code = getDownstreamRes(downstream, &pDownRes, &pPkCol); code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
QUERY_CHECK_CODE(code, lino, _error);
int32_t keyBytes = sizeof(TSKEY); int32_t keyBytes = sizeof(TSKEY);
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool); keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
if (pPkCol) { if (pPkCol) {

View File

@ -281,8 +281,34 @@ static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SColumnInfo* pPkCol, int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes) {
SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t numOfCol = taosArrayGetSize(pRes->pDataBlock);
int32_t preLength = 0;
int32_t* pOffsetInfo = taosMemoryCalloc(numOfCol, sizeof(int32_t));
QUERY_CHECK_NULL(pOffsetInfo, code, lino, _end, lino);
for (int32_t i = 0; i < numOfCol; i++) {
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, i);
pOffsetInfo[i] = preLength;
int32_t bytes = 1;
if (pColInfo != NULL) {
bytes = pColInfo->info.bytes;
}
preLength += bytes + sizeof(SResultCellData);
}
(*ppOffset) = pOffsetInfo;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs,
SSDataBlock* pInputRes, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
@ -320,6 +346,9 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE
pFillSup->comparePkColFn = NULL; pFillSup->comparePkColFn = NULL;
} }
code = initOffsetInfo(&pFillSup->pOffsetInfo, pInputRes);
QUERY_CHECK_CODE(code, lino, _end);
(*ppResFillSup) = pFillSup; (*ppResFillSup) = pFillSup;
_end: _end:
@ -359,17 +388,11 @@ _end:
} }
} }
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo) {
if (!pRowVal) { if (!pRowVal) {
return NULL; return NULL;
} }
char* pData = (char*)pRowVal; return POINTER_SHIFT(pRowVal, pCellOffsetInfo[index]);
SResultCellData* pCell = pRowVal;
for (int32_t i = 0; i < index; i++) {
pData += (pCell->bytes + sizeof(SResultCellData));
pCell = (SResultCellData*)pData;
}
return pCell;
} }
static bool isGroupKeyFunc(SExprInfo* pExprInfo) { static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
@ -414,9 +437,9 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = NULL; SResultCellData* pCell = NULL;
if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) { if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) {
pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot); pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
} else { } else {
pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); pCell = getSliceResultCell(pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
} }
code = setRowCell(pDstCol, pBlock->info.rows, pCell); code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -475,7 +498,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else if (isInterpFunc(pFillCol->pExpr)) { } else if (isInterpFunc(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
colDataSetNULL(pDstCol, index); colDataSetNULL(pDstCol, index);
continue; continue;
@ -498,7 +521,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
destroySPoint(&cur); destroySPoint(&cur);
} else { } else {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
code = setRowCell(pDstCol, pBlock->info.rows, pCell); code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -952,8 +975,8 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo
if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) && if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) &&
!isIsfilledPseudoColumn(pFillCol->pExpr)) { !isIsfilledPseudoColumn(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); SResultCellData* pSrcCell = getSliceResultCell(pFillSup->cur.pRowVal, srcSlot, pFillSup->pOffsetInfo);
SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot); SResultCellData* pDestCell = getSliceResultCell(pFillInfo->pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
pDestCell->isNull = pSrcCell->isNull; pDestCell->isNull = pSrcCell->isNull;
if (!pDestCell->isNull) { if (!pDestCell->isNull) {
memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes);
@ -962,11 +985,11 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo
} }
} }
static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol, int32_t* pOffsetInfo) {
for (int32_t i = 0; i < numOfCol; i++) { for (int32_t i = 0; i < numOfCol; i++) {
if (isInterpFunc(pFillCol[i].pExpr)) { if (isInterpFunc(pFillCol[i].pExpr)) {
int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId; int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pECell = getResultCell(pEndRow, slotId); SResultCellData* pECell = getSliceResultCell(pEndRow->pRowVal, slotId, pOffsetInfo);
SPoint* pPoint = taosArrayGet(pEndPoins, slotId); SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
pPoint->key = pEndRow->key; pPoint->key = pEndRow->key;
memcpy(pPoint->val, pECell->pData, pECell->bytes); memcpy(pPoint->val, pECell->pData, pECell->bytes);
@ -1108,7 +1131,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey; pFillSup->next.key = pFillSup->nextOriginKey;
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillSup->prev.key = pFillSup->prevOriginKey; pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
@ -1117,7 +1140,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillSup->prev.key = pFillSup->prevOriginKey; pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
@ -1128,7 +1151,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey; pFillSup->next.key = pFillSup->nextOriginKey;
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} }
@ -1249,11 +1272,11 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream
} }
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i); SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i, pCellOffsetInfo);
if (!colDataIsNull_s(pColData, rowId)) { if (!colDataIsNull_s(pColData, rowId)) {
pCell->isNull = false; pCell->isNull = false;
pCell->type = pColData->info.type; pCell->type = pColData->info.type;
@ -1374,7 +1397,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) { if (right) {
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap); pInfo->pDeletedMap);
@ -1393,7 +1416,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type);
if (left) { if (left) {
transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap,
needDel, pInfo->pDeletedMap); needDel, pInfo->pDeletedMap);
@ -1418,7 +1441,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) { if (right) {
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap); pInfo->pDeletedMap);
@ -1946,7 +1969,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
continue; continue;
} }
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex; SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex;
SVariant* pVar = &(pValueCol->fillVal); SVariant* pVar = &(pValueCol->fillVal);
if (pCell->type == TSDB_DATA_TYPE_FLOAT) { if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
@ -1970,7 +1993,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
int32_t slotId = GET_DEST_SLOT_ID(pFillCol); int32_t slotId = GET_DEST_SLOT_ID(pFillCol);
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, slotId, pFillSup->pOffsetInfo);
pCell->isNull = true; pCell->isNull = true;
} }
} }
@ -2090,7 +2113,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = NULL; pInfo->pFillSup = NULL;
code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pPkCol, &pInfo->pFillSup); code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pDownRes, pPkCol, &pInfo->pFillSup);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
int32_t ratio = 1; int32_t ratio = 1;

View File

@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (pSelect->hasInterpFunc) { if (pSelect->hasInterpFunc) {
// Temporary code // Temporary code
if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp function only support force window close"); // "Stream interp function only support force window close");
} // }
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) { if (pStmt->pOptions->fillHistory) {