From 2020ea0a7683d96bedb6361d44737518707abcaf Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Sun, 20 Nov 2022 10:15:26 +0800 Subject: [PATCH] fix:stream fill crash --- source/common/src/tdatablock.c | 1 + source/libs/executor/inc/executorimpl.h | 17 ++++++++++++++ source/libs/executor/inc/tfill.h | 16 ------------- source/libs/executor/src/executorimpl.c | 2 ++ source/libs/executor/src/tfill.c | 31 +++++++++++-------------- 5 files changed, 34 insertions(+), 33 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3c2a5377e3..c2cdfc056a 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1962,6 +1962,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) memset(pBuf, 0, sizeof(pBuf)); char* pData = colDataGetVarData(pColInfoData, j); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); + dataSize = TMIN(dataSize, 50); memcpy(pBuf, varDataVal(pData), dataSize); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) return dumpBuf; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f179c7bd41..7dc9e9ab03 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -754,6 +754,23 @@ typedef struct SStreamPartitionOperatorInfo { SSDataBlock* pDelRes; } SStreamPartitionOperatorInfo; +typedef struct SStreamFillSupporter { + int32_t type; // fill type + SInterval interval; + SResultRowData prev; + SResultRowData cur; + SResultRowData next; + SResultRowData nextNext; + SFillColInfo* pAllColInfo; // fill exprs and not fill exprs + SExprSupp notFillExprSup; + int32_t numOfAllCols; // number of all exprs, including the tags columns + int32_t numOfFillCols; + int32_t numOfNotFillCols; + int32_t rowSize; + SSHashObj* pResMap; + bool hasDelete; +} SStreamFillSupporter; + typedef struct SStreamFillOperatorInfo { SStreamFillSupporter* pFillSup; SSDataBlock* pRes; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 2d8df81dbd..b0017fef50 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -111,22 +111,6 @@ typedef struct SStreamFillInfo { int32_t delIndex; } SStreamFillInfo; -typedef struct SStreamFillSupporter { - int32_t type; // fill type - SInterval interval; - SResultRowData prev; - SResultRowData cur; - SResultRowData next; - SResultRowData nextNext; - SFillColInfo* pAllColInfo; // fill exprs and not fill exprs - int32_t numOfAllCols; // number of all exprs, including the tags columns - int32_t numOfFillCols; - int32_t numOfNotFillCols; - int32_t rowSize; - SSHashObj* pResMap; - bool hasDelete; -} SStreamFillSupporter; - int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a7e955100c..20c39ec921 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2056,6 +2056,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol); + } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) { + taosVariantDestroy(&pExprInfo->base.pParam[j].param); } } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index c41376b2dc..9908f35818 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -709,6 +709,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { pFillSup->pResMap = NULL; releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal); pFillSup->cur.pRowVal = NULL; + cleanupExprSupp(&pFillSup->notFillExprSup); taosMemoryFree(pFillSup); return NULL; @@ -1417,25 +1418,13 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows); setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); + + pDstBlock->info.rows = 0; + pSup = &pInfo->pFillSup->notFillExprSup; + setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false); + projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL); pDstBlock->info.groupId = pSrcBlock->info.groupId; - SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, pInfo->primaryTsCol); - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, pInfo->primarySrcSlotId); - colDataAssign(pDst, pSrc, pDstBlock->info.rows, &pDstBlock->info); - - int32_t numOfNotFill = pInfo->pFillSup->numOfAllCols - pInfo->pFillSup->numOfFillCols; - for (int32_t i = 0; i < numOfNotFill; ++i) { - SFillColInfo* pCol = &pInfo->pFillSup->pAllColInfo[i + pInfo->pFillSup->numOfFillCols]; - ASSERT(pCol->notFillCol); - - SExprInfo* pExpr = pCol->pExpr; - int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; - int32_t dstSlotId = pExpr->base.resSchema.slotId; - - SColumnInfoData* pDst1 = taosArrayGet(pDstBlock->pDataBlock, dstSlotId); - SColumnInfoData* pSrc1 = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); - colDataAssign(pDst1, pSrc1, pDstBlock->info.rows, &pDstBlock->info); - } blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol); } @@ -1577,6 +1566,14 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod destroyStreamFillSupporter(pFillSup); return NULL; } + + SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); + code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamFillSupporter(pFillSup); + return NULL; + } + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pFillSup->pResMap = tSimpleHashInit(16, hashFn); pFillSup->hasDelete = false;