diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 271e9c91a1..89ab8ead21 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -475,6 +475,7 @@ typedef struct SStreamFillSupporter { STimeWindow winRange; int32_t pkColBytes; __compar_fn_t comparePkColFn; + int32_t* pOffsetInfo; } SStreamFillSupporter; typedef struct SStreamScanInfo { @@ -884,6 +885,7 @@ typedef struct SStreamIntervalSliceOperatorInfo { struct SOperatorInfo* pOperator; bool hasFill; bool hasInterpoFunc; + int32_t* pOffsetInfo; } SStreamIntervalSliceOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 27686b0081..4bd931c567 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -90,19 +90,20 @@ int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap); int winPosCmprImpl(const void* pKey1, const void* pKey2); void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); -SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index); +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo); int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol); void destroyFlusedppPos(void* ppRes); void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo); void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, - int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol); + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo); int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull); int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, struct SOperatorInfo** ppOptInfo); int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); +int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); #ifdef __cplusplus } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index ccf1f7c9e5..d00f95e5a9 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -100,6 +100,8 @@ void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { taosMemoryFree(pFillSup->next.pRowVal); taosMemoryFree(pFillSup->nextNext.pRowVal); + taosMemoryFree(pFillSup->pOffsetInfo); + taosMemoryFree(pFillSup); } @@ -1573,10 +1575,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); - if (!pFillInfo->pResRow->pRowVal) { - code = terrno; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno); for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); @@ -1590,6 +1589,21 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pCell->type = pColData->info.type; } + int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock); + if (numOfResCol < pFillSup->numOfAllCols) { + int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t)); + QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno); + pFillSup->pOffsetInfo = pTmpBuf; + + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1); + int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData); + for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) { + pFillSup->pOffsetInfo[i] = preLength; + pCell = getResultCell(pFillInfo->pResRow, i); + preLength += pCell->bytes + sizeof(SResultCellData); + } + } + pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData)); QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno); pFillInfo->pNonFillRow->key = INT64_MIN; diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index bc35b58a99..f26cbce119 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -74,6 +74,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pCheckpointRes); + taosMemoryFreeClear(pInfo->pOffsetInfo); taosMemoryFreeClear(param); } @@ -163,7 +164,7 @@ _end: } void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock, - int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) { + int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) { SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t k = 0; k < pSup->numOfExprs; ++k) { if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { @@ -175,7 +176,7 @@ void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId); double prevVal = 0, curVal = 0, winVal = 0; - SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId); + SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo); GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData); GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); @@ -278,7 +279,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp); - doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); + doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfOutput); @@ -294,7 +295,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { - doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); + doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo); } forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); @@ -302,7 +303,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); TSKEY endRowTs = tsCols[endRowId]; - transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); + transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo); } SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId}; if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { @@ -596,6 +597,9 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN code = getDownstreamRes(downstream, &pDownRes, &pPkCol); QUERY_CHECK_CODE(code, lino, _error); + code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes); + QUERY_CHECK_CODE(code, lino, _error); + int32_t keyBytes = sizeof(TSKEY); keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool); if (pPkCol) { diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index efc5dd6d6a..29c91a0763 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -281,8 +281,34 @@ static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* return TSDB_CODE_SUCCESS; } -static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SColumnInfo* pPkCol, - SStreamFillSupporter** ppResFillSup) { +int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t numOfCol = taosArrayGetSize(pRes->pDataBlock); + int32_t preLength = 0; + int32_t* pOffsetInfo = taosMemoryCalloc(numOfCol, sizeof(int32_t)); + QUERY_CHECK_NULL(pOffsetInfo, code, lino, _end, lino); + + for (int32_t i = 0; i < numOfCol; i++) { + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, i); + pOffsetInfo[i] = preLength; + int32_t bytes = 1; + if (pColInfo != NULL) { + bytes = pColInfo->info.bytes; + } + preLength += bytes + sizeof(SResultCellData); + } + + (*ppOffset) = pOffsetInfo; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} +static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, + SSDataBlock* pInputRes, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); @@ -320,6 +346,9 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE pFillSup->comparePkColFn = NULL; } + code = initOffsetInfo(&pFillSup->pOffsetInfo, pInputRes); + QUERY_CHECK_CODE(code, lino, _end); + (*ppResFillSup) = pFillSup; _end: @@ -359,17 +388,11 @@ _end: } } -SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo) { if (!pRowVal) { return NULL; } - char* pData = (char*)pRowVal; - SResultCellData* pCell = pRowVal; - for (int32_t i = 0; i < index; i++) { - pData += (pCell->bytes + sizeof(SResultCellData)); - pCell = (SResultCellData*)pData; - } - return pCell; + return POINTER_SHIFT(pRowVal, pCellOffsetInfo[index]); } static bool isGroupKeyFunc(SExprInfo* pExprInfo) { @@ -414,9 +437,9 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SResultCellData* pCell = NULL; if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) { - pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot); + pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); } else { - pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); + pCell = getSliceResultCell(pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); } code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); @@ -475,7 +498,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi QUERY_CHECK_CODE(code, lino, _end); } else if (isInterpFunc(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { colDataSetNULL(pDstCol, index); continue; @@ -498,7 +521,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi destroySPoint(&cur); } else { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); } @@ -952,8 +975,8 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) && !isIsfilledPseudoColumn(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); - SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot); + SResultCellData* pSrcCell = getSliceResultCell(pFillSup->cur.pRowVal, srcSlot, pFillSup->pOffsetInfo); + SResultCellData* pDestCell = getSliceResultCell(pFillInfo->pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); pDestCell->isNull = pSrcCell->isNull; if (!pDestCell->isNull) { memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); @@ -962,11 +985,11 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo } } -static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { +static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol, int32_t* pOffsetInfo) { for (int32_t i = 0; i < numOfCol; i++) { if (isInterpFunc(pFillCol[i].pExpr)) { int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pECell = getResultCell(pEndRow, slotId); + SResultCellData* pECell = getSliceResultCell(pEndRow->pRowVal, slotId, pOffsetInfo); SPoint* pPoint = taosArrayGet(pEndPoins, slotId); pPoint->key = pEndRow->key; memcpy(pPoint->val, pECell->pData, pECell->bytes); @@ -1108,7 +1131,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; @@ -1117,7 +1140,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pos = FILL_POS_END; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; @@ -1128,7 +1151,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pLinearInfo->hasNext = false; } @@ -1249,11 +1272,11 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream } void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, - int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i); + SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i, pCellOffsetInfo); if (!colDataIsNull_s(pColData, rowId)) { pCell->isNull = false; pCell->type = pColData->info.type; @@ -1374,7 +1397,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1393,7 +1416,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); if (left) { - transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1418,7 +1441,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1946,7 +1969,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p continue; } int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex; SVariant* pVar = &(pValueCol->fillVal); if (pCell->type == TSDB_DATA_TYPE_FLOAT) { @@ -1970,7 +1993,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; int32_t slotId = GET_DEST_SLOT_ID(pFillCol); - SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, slotId, pFillSup->pOffsetInfo); pCell->isNull = true; } } @@ -2090,7 +2113,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->pFillSup = NULL; - code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pPkCol, &pInfo->pFillSup); + code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pDownRes, pPkCol, &pInfo->pFillSup); QUERY_CHECK_CODE(code, lino, _error); int32_t ratio = 1; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0ee8f9be33..6facaed78d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream interp function only support force window close"); - } + // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + // "Stream interp function only support force window close"); + // } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) {