From f37ae57a87963a5d214f471cd312710ed6d29e9d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Nov 2024 15:23:57 +0800 Subject: [PATCH 01/10] opt stream row buff --- source/libs/executor/inc/executorInt.h | 2 + source/libs/executor/inc/streamexecutorInt.h | 5 +- source/libs/executor/src/streamfilloperator.c | 22 ++++- .../src/streamintervalsliceoperator.c | 14 ++-- .../executor/src/streamtimesliceoperator.c | 81 ++++++++++++------- source/libs/parser/src/parTranslater.c | 8 +- 6 files changed, 88 insertions(+), 44 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 271e9c91a1..89ab8ead21 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -475,6 +475,7 @@ typedef struct SStreamFillSupporter { STimeWindow winRange; int32_t pkColBytes; __compar_fn_t comparePkColFn; + int32_t* pOffsetInfo; } SStreamFillSupporter; typedef struct SStreamScanInfo { @@ -884,6 +885,7 @@ typedef struct SStreamIntervalSliceOperatorInfo { struct SOperatorInfo* pOperator; bool hasFill; bool hasInterpoFunc; + int32_t* pOffsetInfo; } SStreamIntervalSliceOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 27686b0081..4bd931c567 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -90,19 +90,20 @@ int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap); int winPosCmprImpl(const void* pKey1, const void* pKey2); void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); -SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index); +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo); int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol); void destroyFlusedppPos(void* ppRes); void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo); void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, - int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol); + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo); int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull); int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, struct SOperatorInfo** ppOptInfo); int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); +int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); #ifdef __cplusplus } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index ccf1f7c9e5..d00f95e5a9 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -100,6 +100,8 @@ void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { taosMemoryFree(pFillSup->next.pRowVal); taosMemoryFree(pFillSup->nextNext.pRowVal); + taosMemoryFree(pFillSup->pOffsetInfo); + taosMemoryFree(pFillSup); } @@ -1573,10 +1575,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); - if (!pFillInfo->pResRow->pRowVal) { - code = terrno; - QUERY_CHECK_CODE(code, lino, _end); - } + QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno); for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); @@ -1590,6 +1589,21 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pCell->type = pColData->info.type; } + int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock); + if (numOfResCol < pFillSup->numOfAllCols) { + int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t)); + QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno); + pFillSup->pOffsetInfo = pTmpBuf; + + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1); + int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData); + for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) { + pFillSup->pOffsetInfo[i] = preLength; + pCell = getResultCell(pFillInfo->pResRow, i); + preLength += pCell->bytes + sizeof(SResultCellData); + } + } + pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData)); QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno); pFillInfo->pNonFillRow->key = INT64_MIN; diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index bc35b58a99..f26cbce119 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -74,6 +74,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) { blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pCheckpointRes); + taosMemoryFreeClear(pInfo->pOffsetInfo); taosMemoryFreeClear(param); } @@ -163,7 +164,7 @@ _end: } void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock, - int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) { + int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) { SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t k = 0; k < pSup->numOfExprs; ++k) { if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { @@ -175,7 +176,7 @@ void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId); double prevVal = 0, curVal = 0, winVal = 0; - SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId); + SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo); GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData); GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); @@ -278,7 +279,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp); - doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); + doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfOutput); @@ -294,7 +295,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { - doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); + doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo); } forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); @@ -302,7 +303,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); TSKEY endRowTs = tsCols[endRowId]; - transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); + transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo); } SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId}; if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { @@ -596,6 +597,9 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN code = getDownstreamRes(downstream, &pDownRes, &pPkCol); QUERY_CHECK_CODE(code, lino, _error); + code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes); + QUERY_CHECK_CODE(code, lino, _error); + int32_t keyBytes = sizeof(TSKEY); keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool); if (pPkCol) { diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index efc5dd6d6a..29c91a0763 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -281,8 +281,34 @@ static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* return TSDB_CODE_SUCCESS; } -static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SColumnInfo* pPkCol, - SStreamFillSupporter** ppResFillSup) { +int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t numOfCol = taosArrayGetSize(pRes->pDataBlock); + int32_t preLength = 0; + int32_t* pOffsetInfo = taosMemoryCalloc(numOfCol, sizeof(int32_t)); + QUERY_CHECK_NULL(pOffsetInfo, code, lino, _end, lino); + + for (int32_t i = 0; i < numOfCol; i++) { + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, i); + pOffsetInfo[i] = preLength; + int32_t bytes = 1; + if (pColInfo != NULL) { + bytes = pColInfo->info.bytes; + } + preLength += bytes + sizeof(SResultCellData); + } + + (*ppOffset) = pOffsetInfo; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} +static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, + SSDataBlock* pInputRes, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); @@ -320,6 +346,9 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE pFillSup->comparePkColFn = NULL; } + code = initOffsetInfo(&pFillSup->pOffsetInfo, pInputRes); + QUERY_CHECK_CODE(code, lino, _end); + (*ppResFillSup) = pFillSup; _end: @@ -359,17 +388,11 @@ _end: } } -SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo) { if (!pRowVal) { return NULL; } - char* pData = (char*)pRowVal; - SResultCellData* pCell = pRowVal; - for (int32_t i = 0; i < index; i++) { - pData += (pCell->bytes + sizeof(SResultCellData)); - pCell = (SResultCellData*)pData; - } - return pCell; + return POINTER_SHIFT(pRowVal, pCellOffsetInfo[index]); } static bool isGroupKeyFunc(SExprInfo* pExprInfo) { @@ -414,9 +437,9 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SResultCellData* pCell = NULL; if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) { - pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot); + pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); } else { - pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); + pCell = getSliceResultCell(pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); } code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); @@ -475,7 +498,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi QUERY_CHECK_CODE(code, lino, _end); } else if (isInterpFunc(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { colDataSetNULL(pDstCol, index); continue; @@ -498,7 +521,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi destroySPoint(&cur); } else { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); } @@ -952,8 +975,8 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) && !isIsfilledPseudoColumn(pFillCol->pExpr)) { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); - SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot); + SResultCellData* pSrcCell = getSliceResultCell(pFillSup->cur.pRowVal, srcSlot, pFillSup->pOffsetInfo); + SResultCellData* pDestCell = getSliceResultCell(pFillInfo->pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); pDestCell->isNull = pSrcCell->isNull; if (!pDestCell->isNull) { memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); @@ -962,11 +985,11 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo } } -static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { +static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol, int32_t* pOffsetInfo) { for (int32_t i = 0; i < numOfCol; i++) { if (isInterpFunc(pFillCol[i].pExpr)) { int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pECell = getResultCell(pEndRow, slotId); + SResultCellData* pECell = getSliceResultCell(pEndRow->pRowVal, slotId, pOffsetInfo); SPoint* pPoint = taosArrayGet(pEndPoins, slotId); pPoint->key = pEndRow->key; memcpy(pPoint->val, pECell->pData, pECell->bytes); @@ -1108,7 +1131,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; @@ -1117,7 +1140,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo pFillInfo->pos = FILL_POS_END; SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; @@ -1128,7 +1151,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); pFillSup->next.key = pFillSup->nextOriginKey; copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); + pFillSup->numOfAllCols, pFillSup->pOffsetInfo); pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pLinearInfo->hasNext = false; } @@ -1249,11 +1272,11 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream } void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, - int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i); + SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i, pCellOffsetInfo); if (!colDataIsNull_s(pColData, rowId)) { pCell->isNull = false; pCell->type = pColData->info.type; @@ -1374,7 +1397,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1393,7 +1416,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); if (left) { - transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1418,7 +1441,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1946,7 +1969,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p continue; } int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo); SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex; SVariant* pVar = &(pValueCol->fillVal); if (pCell->type == TSDB_DATA_TYPE_FLOAT) { @@ -1970,7 +1993,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; int32_t slotId = GET_DEST_SLOT_ID(pFillCol); - SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, slotId, pFillSup->pOffsetInfo); pCell->isNull = true; } } @@ -2090,7 +2113,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->pFillSup = NULL; - code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pPkCol, &pInfo->pFillSup); + code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pDownRes, pPkCol, &pInfo->pFillSup); QUERY_CHECK_CODE(code, lino, _error); int32_t ratio = 1; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0ee8f9be33..6facaed78d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream interp function only support force window close"); - } + // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + // "Stream interp function only support force window close"); + // } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) { From ba569d3df30215d07434147a22e487e2f1480c79 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Nov 2024 15:27:03 +0800 Subject: [PATCH 02/10] adj code --- source/libs/parser/src/parTranslater.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6facaed78d..0ee8f9be33 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - // "Stream interp function only support force window close"); - // } + if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream interp function only support force window close"); + } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) { From 22023575ba52abc53a81bdd1cf91b4c76914519c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 26 Nov 2024 19:46:36 +0800 Subject: [PATCH 03/10] opt stream force fill op --- source/libs/executor/inc/executorInt.h | 3 +- source/libs/executor/inc/operator.h | 2 +- source/libs/executor/inc/streamexecutorInt.h | 3 +- source/libs/executor/inc/tfill.h | 3 +- source/libs/executor/src/operator.c | 2 +- source/libs/executor/src/streamfilloperator.c | 152 ++++++++++++++---- .../src/streamintervalsliceoperator.c | 8 +- .../executor/src/streamtimesliceoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 2 +- source/libs/executor/test/queryPlanTests.cpp | 2 +- source/libs/stream/src/tstreamFileState.c | 5 - 11 files changed, 130 insertions(+), 54 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 89ab8ead21..9c2df5de2c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -823,10 +823,11 @@ typedef struct SStreamFillOperatorInfo { int32_t primaryTsCol; int32_t primarySrcSlotId; SStreamFillInfo* pFillInfo; - SStreamAggSupporter* pStreamAggSup; SArray* pCloseTs; SArray* pUpdated; SGroupResInfo groupResInfo; + SStreamState* pState; + SStateStore stateStore; } SStreamFillOperatorInfo; typedef struct SStreamTimeSliceOperatorInfo { diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 91aef93452..2cf2959924 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -149,7 +149,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); -int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); +int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 4bd931c567..0a69080314 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -102,8 +102,9 @@ int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, struct SOperatorInfo** ppOptInfo); -int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); +int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated); int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes); +TSKEY compareTs(void* pKey); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 6072063bbf..d20742bf45 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -119,7 +119,8 @@ typedef struct SStreamFillInfo { int32_t delIndex; uint64_t curGroupId; bool hasNext; - SResultRowData* pNonFillRow; + SResultRowData* pNonFillRow; + void* pTempBuff; } SStreamFillInfo; int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 3b10dce63f..a31d3d50e1 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -614,7 +614,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { - code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); + code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index d00f95e5a9..b90729e3b5 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -131,6 +131,7 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) { pFillInfo->pLinearInfo = NULL; taosArrayDestroy(pFillInfo->delRanges); + taosMemoryFreeClear(pFillInfo->pTempBuff); taosMemoryFree(pFillInfo); } @@ -150,6 +151,14 @@ static void destroyStreamFillOperatorInfo(void* param) { clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroy(pInfo->pCloseTs); + if (pInfo->stateStore.streamFileStateDestroy != NULL) { + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + } + + if (pInfo->pState != NULL) { + taosMemoryFreeClear(pInfo->pState); + } + taosMemoryFree(pInfo); } @@ -1159,14 +1168,19 @@ _end: return code; } +static void resetForceFillWindow(SResultRowData* pRowData) { + pRowData->key = INT64_MIN; + pRowData->pRowVal = NULL; +} + void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - void* pState = pOperator->pTaskInfo->streamInfo.pState; - bool res = false; - int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + + SStreamFillOperatorInfo* pInfo = pOperator->info; + bool res = false; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); if (pBlock->info.id.groupId == 0) { @@ -1174,25 +1188,30 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* } else if (pBlock->info.id.groupId != pKey->groupId) { break; } - void* val = NULL; - int32_t len = 0; - int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL); + + SRowBuffPos* pValPos = NULL; + int32_t len = 0; + int32_t winCode = TSDB_CODE_SUCCESS; + code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode); + QUERY_CHECK_CODE(code, lino, _end); qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode); if (winCode == TSDB_CODE_SUCCESS) { pFillSup->cur.key = pKey->ts; - pFillSup->cur.pRowVal = val; + pFillSup->cur.pRowVal = pValPos->pRowBuff; code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); - resetFillWindow(&pFillSup->cur); + resetForceFillWindow(&pFillSup->cur); + releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore); } else { - SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey); - SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; - void* preVal = NULL; - int32_t preVLen = 0; - winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); + SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; + SRowBuffPos* prePos = NULL; + int32_t preVLen = 0; + code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey, + (void**)&prePos, &preVLen, &winCode); + QUERY_CHECK_CODE(code, lino, _end); if (winCode == TSDB_CODE_SUCCESS) { pFillSup->cur.key = pKey->ts; - pFillSup->cur.pRowVal = preVal; + pFillSup->cur.pRowVal = prePos->pRowBuff; if (pFillInfo->type == TSDB_FILL_PREV) { code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); @@ -1202,9 +1221,9 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res); QUERY_CHECK_CODE(code, lino, _end); } - resetFillWindow(&pFillSup->cur); + resetForceFillWindow(&pFillSup->cur); } - pAPI->stateStore.streamStateFreeCur(pCur); + releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore); } } @@ -1249,6 +1268,45 @@ _end: return code; } +static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + SWinKey key = {.groupId = groupId, .ts = pRow->key}; + int32_t curVLen = 0; + SRowBuffPos* pStatePos = NULL; + int32_t winCode = TSDB_CODE_SUCCESS; + code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos, + &curVLen, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize); + qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } +} + +int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, + int32_t rowId, uint64_t groupId, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + TSKEY ts = tsCol[rowId]; + pFillInfo->nextRowKey = ts; + TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize); + SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff}; + + transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow); + keepResultInStateBuf(pInfo, groupId, &tmpNextRow); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + // force window close impl static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; @@ -1259,11 +1317,10 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcBlock; uint64_t groupId = pBlock->info.id.groupId; - SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup; SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); TSKEY* tsCol = (TSKEY*)pTsCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++){ - code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); + code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); QUERY_CHECK_CODE(code, lino, _end); int32_t size = taosArrayGetSize(pInfo->pCloseTs); @@ -1283,7 +1340,7 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { } } } - code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); + code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0); QUERY_CHECK_CODE(code, lino, _end); _end: @@ -1293,13 +1350,13 @@ _end: return code; } -int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { +int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int64_t groupId = 0; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState); + SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState); while (1) { - int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); + int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); if (winCode != TSDB_CODE_SUCCESS) { break; } @@ -1307,14 +1364,14 @@ int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdat void* pPushRes = taosArrayPush(pUpdated, &key); QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); - pAggSup->stateStore.streamStateGroupCurNext(pCur); + pStateStore->streamStateGroupCurNext(pCur); } - pAggSup->stateStore.streamStateFreeCur(pCur); + pStateStore->streamStateFreeCur(pCur); pCur = NULL; _end: if (code != TSDB_CODE_SUCCESS) { - pAggSup->stateStore.streamStateFreeCur(pCur); + pStateStore->streamStateFreeCur(pCur); pCur = NULL; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } @@ -1347,7 +1404,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR (*ppRes) = resBlock; goto _end; } - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + + pInfo->stateStore.streamStateClearExpiredState(pInfo->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; goto _end; @@ -1395,7 +1453,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); - code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated); + code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated); QUERY_CHECK_CODE(code, lino, _end); } taosArrayClear(pInfo->pCloseTs); @@ -1414,7 +1472,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + pInfo->stateStore.streamStateClearExpiredState(pInfo->pState); setStreamOperatorCompleted(pOperator); } @@ -1621,6 +1679,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->delIndex = 0; pFillInfo->curGroupId = 0; pFillInfo->hasNext = false; + pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize); return pFillInfo; _end: @@ -1664,21 +1723,18 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* } } -int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) { +int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (IS_NORMAL_INTERVAL_OP(downstream)) { SStreamIntervalOperatorInfo* pInfo = downstream->info; *triggerType = pInfo->twAggSup.calTrigger; *pInterval = pInfo->interval; - (*ppAggSup) = NULL; } else if (IS_CONTINUE_INTERVAL_OP(downstream)) { SStreamIntervalSliceOperatorInfo* pInfo = downstream->info; *triggerType = pInfo->twAggSup.calTrigger; *pInterval = pInfo->interval; pInfo->hasFill = true; - (*ppAggSup) = &pInfo->streamAggSup; - pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState); } else { code = TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -1691,8 +1747,31 @@ _end: return code; } +int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore, + SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + pInfo->stateStore = *pStore; + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno); + + *(pInfo->pState) = *pState; + pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol); + code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs, + pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId, + STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1718,7 +1797,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi int8_t triggerType = 0; SInterval interval = {0}; - code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup); + code = getDownStreamInfo(downstream, &triggerType, &interval); QUERY_CHECK_CODE(code, lino, _error); pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, @@ -1773,9 +1852,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pTaskInfo); if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle, + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { + pInfo->pState != NULL; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index f26cbce119..fcf7d6ef10 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -360,9 +360,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** return code; } - if (pInfo->hasFill == false) { - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); - } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; return code; @@ -452,9 +450,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - if (pInfo->hasFill == false) { - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); - } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 66793ca5a0..8611678e5a 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1909,7 +1909,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR qDebug("===stream===build stream result, ts count:%d", size); for (int32_t i = 0; i < size; i++) { TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); - code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated); + code = buildAllResultKey(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, ts, pInfo->pUpdated); QUERY_CHECK_CODE(code, lino, _end); } qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated)); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8fd00e9313..8bf7323d63 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1834,7 +1834,7 @@ int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) { return deleteMark; } -static TSKEY compareTs(void* pKey) { +TSKEY compareTs(void* pKey) { SWinKey* pWinKey = (SWinKey*)pKey; return pWinKey->ts; } diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 6710435aba..69097ce755 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -3094,7 +3094,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); break; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: - qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); + qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index dc4ca7c0e5..564f0d3cb0 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1232,11 +1232,6 @@ void clearExpiredState(SStreamFileState* pFileState) { int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file); } - - if (pFileState->hasFillCatch == false) { - int32_t code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); - qTrace("force clear expired file, ts:%" PRId64 ". %s at line %d res %d", pKey->ts, __func__, __LINE__, code_file); - } } taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); } From a6cd2b16fe54600d6a0fef93bffdcbe504ec73c6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 08:40:19 +0800 Subject: [PATCH 04/10] adj function name --- include/libs/stream/tstreamFileState.h | 2 +- source/libs/stream/src/tstreamFileState.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 4a696d9798..f1f5b00e38 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -93,7 +93,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen); SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen); -int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId); +int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId); void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateCleanup(void* pBuff); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 564f0d3cb0..8c8aee8c7d 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -259,7 +259,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) { code = recoverSnapshot(pFileState, checkpointId); } else if (type == STREAM_STATE_BUFF_SORT) { - code = recoverSesssion(pFileState, checkpointId); + code = recoverSession(pFileState, checkpointId); } else if (type == STREAM_STATE_BUFF_HASH_SORT) { code = recoverFillSnapshot(pFileState, checkpointId); } @@ -914,7 +914,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return code; } -int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { +int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winRes = TSDB_CODE_SUCCESS; From 0669910da544801512764d8f5fd93e8167579638 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 08:58:39 +0800 Subject: [PATCH 05/10] fix issue --- source/libs/executor/src/streamfilloperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b90729e3b5..6bce2e54a1 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1852,8 +1852,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pTaskInfo); if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle, + code = initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + QUERY_CHECK_CODE(code, lino, _error); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { From e63b8a0ccb1648be6328aafa681213e237e48836 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 10:55:52 +0800 Subject: [PATCH 06/10] fix issue --- source/libs/executor/src/streamfilloperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 6bce2e54a1..7b364de09d 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1858,7 +1858,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } else { - pInfo->pState != NULL; + pInfo->pState = NULL; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } From 9d668150d51cd58f41eb0f8bd661812e2cb65377 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 16:24:46 +0800 Subject: [PATCH 07/10] fix stream issue --- source/libs/executor/src/streamfilloperator.c | 6 +- .../src/streamintervalsliceoperator.c | 15 +- source/libs/parser/src/parTranslater.c | 8 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamSliceState.c | 75 +++--- .../stream/streamFwcIntervalCheckpoint.sim | 67 ----- .../script/tsim/stream/streamTwaInterpFwc.sim | 247 ++++++++++++++++++ .../stream/streamTwaInterpFwcCheckpoint.sim | 180 +++++++++++++ 8 files changed, 487 insertions(+), 113 deletions(-) delete mode 100644 tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim create mode 100644 tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 7b364de09d..c992bd15b7 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1432,7 +1432,11 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = -1; } break; - case STREAM_CHECKPOINT: + case STREAM_CHECKPOINT: { + pInfo->stateStore.streamStateCommit(pInfo->pState); + (*ppRes) = pBlock; + goto _end; + } break; case STREAM_CREATE_CHILD_TABLE: { (*ppRes) = pBlock; goto _end; diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index fcf7d6ef10..e7a9c58710 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -360,6 +360,13 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** return code; } + if (pInfo->recvCkBlock) { + pInfo->recvCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCheckpointRes; + return code; + } + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -391,8 +398,6 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** case STREAM_CHECKPOINT: { pInfo->recvCkBlock = true; pAggSup->stateStore.streamStateCommit(pAggSup->pState); - // doStreamIntervalSliceSaveCheckpoint(pOperator); - pInfo->recvCkBlock = true; code = copyDataBlock(pInfo->pCheckpointRes, pBlock); QUERY_CHECK_CODE(code, lino, _end); continue; @@ -450,6 +455,12 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { + if (pInfo->recvCkBlock) { + pInfo->recvCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pCheckpointRes; + return code; + } pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0ee8f9be33..6facaed78d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Stream interp function only support force window close"); - } + // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + // "Stream interp function only support force window close"); + // } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 09f4e95376..68c99aa1b3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -4130,7 +4130,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) { SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX}; - return streamStateFillSeekKeyNext_rocksdb(pState, &key); + return streamStateFillSeekKeyPrev_rocksdb(pState, &key); } #ifdef BUILD_NO_CALL diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 238bff8afc..c23537d018 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -23,6 +23,33 @@ #define NUM_OF_CACHE_WIN 64 #define MAX_NUM_OF_CACHE_WIN 128 +int32_t recoverSearchBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + SWinKey start = {.groupId = groupId, .ts = INT64_MAX}; + void* pState = getStateFileStore(pFileState); + SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); + for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { + SWinKey tmpKey = {.groupId = groupId}; + int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); + if (tmpRes != TSDB_CODE_SUCCESS) { + break; + } + void* tmp = taosArrayPush(pWinStates, &tmpKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + streamStateCurPrev_rocksdb(pCur); + } + taosArraySort(pWinStates, winKeyCmprImpl); + streamStateFreeCur(pCur); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -38,22 +65,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { - TSKEY ts = getFlushMark(pFileState); - SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; - void* pState = getStateFileStore(pFileState); - SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); - for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { - SWinKey tmpKey = {.groupId = pKey->groupId}; - int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); - if (tmpRes != TSDB_CODE_SUCCESS) { - break; - } - void* tmp = taosArrayPush(pWinStates, &tmpKey); - QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); - streamStateCurPrev_rocksdb(pCur); - } - taosArraySort(pWinStates, winKeyCmprImpl); - streamStateFreeCur(pCur); + recoverSearchBuff(pFileState, pWinStates, pKey->groupId); } code = addSearchItem(pFileState, pWinStates, pKey); @@ -203,29 +215,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); - void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); - if (ppBuff) { - pWinStates = (SArray*)(*ppBuff); - } else { - qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId); - SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - void* tmpVal = NULL; - int32_t len = 0; - (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); - if ((*pWinCode) == TSDB_CODE_SUCCESS) { - SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - if (!pNewPos || !pNewPos->pRowBuff) { - code = TSDB_CODE_OUT_OF_MEMORY; - QUERY_CHECK_CODE(code, lino, _end); - } - memcpy(pNewPos->pRowBuff, tmpVal, len); - taosMemoryFreeClear(tmpVal); - *pVLen = getRowStateRowSize(pFileState); - (*ppVal) = pNewPos; - } - streamStateFreeCur(pCur); - return code; + // void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + + code = addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates); + QUERY_CHECK_CODE(code, lino, _end); + + // recover + if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { + recoverSearchBuff(pFileState, pWinStates, pKey->groupId); } + int32_t size = taosArrayGetSize(pWinStates); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index >= 0) { diff --git a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim b/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim deleted file mode 100644 index ed72d87e9a..0000000000 --- a/tests/script/tsim/stream/streamFwcIntervalCheckpoint.sim +++ /dev/null @@ -1,67 +0,0 @@ -system sh/stop_dnodes.sh -system sh/deploy.sh -n dnode1 -i 1 - -system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 - -system sh/exec.sh -n dnode1 -s start -sleep 50 -sql connect - -print step1 -print =============== create database -sql create database test vgroups 4; -sql use test; - -sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); -sql create table t1 using st tags(1,1,1); -sql create table t2 using st tags(2,2,2); - -sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s); -sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s); - -run tsim/stream/checkTaskStatus.sim - -sleep 70000 - - -print restart taosd 01 ...... - -system sh/stop_dnodes.sh - -system sh/exec.sh -n dnode1 -s start - -run tsim/stream/checkTaskStatus.sim - -sql insert into t1 values(now + 3000a,1,1,1); - -$loop_count = 0 -loop0: - -sleep 2000 - -$loop_count = $loop_count + 1 -if $loop_count == 20 then - return -1 -endi - -print select * from streamt1; -sql select * from streamt1; - -print $data00 $data01 $data02 - -if $rows == 0 then - goto loop0 -endi - -print select * from streamt2; -sql select * from streamt2; - -print $data00 $data01 $data02 - -if $rows == 0 then - goto loop0 -endi - -print end - -system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaInterpFwc.sim b/tests/script/tsim/stream/streamTwaInterpFwc.sim index ce76387c91..050e0282e2 100644 --- a/tests/script/tsim/stream/streamTwaInterpFwc.sim +++ b/tests/script/tsim/stream/streamTwaInterpFwc.sim @@ -304,6 +304,253 @@ if $rows != 1 then return -1 endi +print step3 +print =============== create database +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1234567890t1 using st tags(1,1,1); +sql create table t1234567890t2 using st tags(2,2,2); + +sql create stable streamt9(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); +sql create stable streamt10(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); +sql create stable streamt11(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int); + +sql create stream streams9 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1, interp(b) from st partition by tbname as ta, b as cc every(2s) fill(value, 100000,200000); +sql create stream streams10 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a), sum(b),max(c) from st partition by tbname as ta, b as cc interval(2s) fill(NULL); +sql create stream streams11 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a),avg(c),min(b) from st partition by tbname as ta, b as cc interval(2s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1234567890t1 values(now + 3s,100000,1,1); + +$loop_count = 0 +loop9: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta, * from streamt9; +sql select cc,ta, * from streamt9; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop9 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @100000@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 64 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt9"; +sql select * from information_schema.ins_tables where stable_name = "streamt9"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%"; +sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop10: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta, * from streamt10; +sql select cc,ta, * from streamt10; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop10 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @100000.000@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt10"; +sql select * from information_schema.ins_tables where stable_name = "streamt10"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%"; +sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +$loop_count = 0 +loop11: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select cc,ta,* from streamt11; +sql select cc,ta,* from streamt11; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 1 then + print ======rows=$rows + goto loop11 +endi + +if $data00 != 1 then + return -1 +endi + +if $data01 != @t12@ then + return -1 +endi + +if $data03 != @1@ then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 1 then + return -1 +endi + +print 3 sql select * from information_schema.ins_tables where stable_name = "streamt11"; +sql select * from information_schema.ins_tables where stable_name = "streamt11"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + +print 4 sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%"; +sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%"; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 1 then + return -1 +endi + + print end system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim new file mode 100644 index 0000000000..f983dd3ab5 --- /dev/null +++ b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim @@ -0,0 +1,180 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60 +system sh/cfg.sh -n dnode1 -c ratioOfVnodeStreamThreads -v 4 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a),max(b) from st partition by tbname interval(5s); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a), max(b) from st interval(5s); +sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, count(a), twa(b) from st partition by tbname interval(5s) fill(prev); +sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt4 as select _irowts, interp(a), interp(b) from st partition by tbname every(5s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now + 3000a,1,1,1); + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows == 0 then + goto loop0 +endi + + +print select * from streamt4; +sql select * from streamt4; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows == 0 then + goto loop0 +endi + + +sleep 70000 + +$loop_count = 0 +loop0_1: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print sql select * from information_schema.ins_stream_tasks where checkpoint_time is null; +sql select * from information_schema.ins_stream_tasks where checkpoint_time is null; + + +sleep 10000 + +if $rows > 0 then + print wait checkpoint.rows = $rows + goto loop0_1 +endi + +print restart taosd 01 ...... + +system sh/stop_dnodes.sh + +system sh/exec.sh -n dnode1 -s start + +run tsim/stream/checkTaskStatus.sim + +print select * from streamt3; +sql select * from streamt3; + +$streamt3_rows = $rows +print =====streamt3_rows=$streamt3_rows + +print select * from streamt4; +sql select * from streamt4; + +$streamt4_rows = $rows +print =====streamt4_rows=$streamt4_rows + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows <= $streamt3_rows then + print =====rows=$rows + print =====streamt3_rows=$streamt3_rows + goto loop1 +endi + +print select * from streamt4; +sql select * from streamt4; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows <= $streamt4_rows then + print =====rows=$rows + print =====streamt4_rows=$streamt4_rows + goto loop1 +endi + +sql insert into t1 values(now + 3000a,10,10,10); + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $data02 != 10 then + goto loop2 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 23d67933cdcb3c47047fb0a95b737e561c4e5d97 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 16:52:01 +0800 Subject: [PATCH 08/10] add ci --- source/libs/stream/src/streamSliceState.c | 6 ++++-- tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index c23537d018..eefe046878 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -65,7 +65,8 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { - recoverSearchBuff(pFileState, pWinStates, pKey->groupId); + code = recoverSearchBuff(pFileState, pWinStates, pKey->groupId); + QUERY_CHECK_CODE(code, lino, _end); } code = addSearchItem(pFileState, pWinStates, pKey); @@ -222,7 +223,8 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { - recoverSearchBuff(pFileState, pWinStates, pKey->groupId); + code = recoverSearchBuff(pFileState, pWinStates, pKey->groupId); + QUERY_CHECK_CODE(code, lino, _end); } int32_t size = taosArrayGetSize(pWinStates); diff --git a/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim index f983dd3ab5..26be5018c1 100644 --- a/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim +++ b/tests/script/tsim/stream/streamTwaInterpFwcCheckpoint.sim @@ -161,8 +161,8 @@ if $loop_count == 20 then return -1 endi -print select * from streamt1; -sql select * from streamt1; +print select * from streamt1 order by 1; +sql select * from streamt1 order by 1; print $data00 $data01 $data02 $data03 $data04 print $data10 $data11 $data12 $data13 $data14 @@ -171,7 +171,7 @@ print $data30 $data31 $data32 $data33 $data34 print $data40 $data41 $data42 $data43 $data44 print $data50 $data51 $data52 $data53 $data54 -if $data02 != 10 then +if $data12 != 10 then goto loop2 endi From 9d384bc438c09260ddd66d08f8855234ffdbb7f8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 18:51:50 +0800 Subject: [PATCH 09/10] fix issue --- source/libs/parser/src/parTranslater.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6facaed78d..0ee8f9be33 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - // "Stream interp function only support force window close"); - // } + if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream interp function only support force window close"); + } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->fillHistory) { From 1b285f8a1d00ad987febc806ba53d5a79b5d75a2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 18:52:04 +0800 Subject: [PATCH 10/10] fix issue --- source/libs/stream/src/tstreamFileState.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 8c8aee8c7d..dcabadb8bd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -991,6 +991,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__); if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -1007,6 +1008,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + qDebug("===stream=== read checkpoint state from disc. %s", __func__); code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); @@ -1077,6 +1079,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__); if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -1085,9 +1088,17 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } + if (vlen != pFileState->rowSize) { + qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + taosMemoryFreeClear(pVal); + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + qDebug("===stream=== read checkpoint state from disc. %s", __func__); winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (winRes != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos);