From 9d1a9d9510aa7adfb5f1ffa760bad3bbb247f122 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Apr 2022 19:27:41 +0800 Subject: [PATCH] fix(query): avoid the output result overlap within the project operator buffer. --- source/common/src/tdatablock.c | 1 + source/libs/executor/src/executorimpl.c | 41 +++++++++++++++++-------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index beabc1b6eb..8f10fa58cc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -203,6 +203,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co if (pSource->hasNull) { pColumnInfoData->hasNull = pSource->hasNull; } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index beeea4f223..0fdb523e64 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1171,31 +1171,43 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + int32_t numOfRows = 0; + for (int32_t k = 0; k < numOfOutput; ++k) { int32_t outputSlotId = pExpr[k].base.resSchema.slotId; SqlFunctionCtx* pfCtx = &pCtx[k]; if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + if (pResult->info.rows > 0) { + colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } else { + colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); + } - pResult->info.rows = pSrcBlock->info.rows; + numOfRows = pfCtx->input.numOfRows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); + + int32_t offset = pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { - colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); + colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } - pResult->info.rows = pSrcBlock->info.rows; + + numOfRows = pSrcBlock->info.rows; } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pfCtx->functionId)); @@ -1212,28 +1224,33 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); pfCtx->offset = pResult->info.rows; // set the start offset + // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; } - int32_t numOfRows = pfCtx->fpSet.process(pfCtx); - pResult->info.rows += numOfRows; + numOfRows = pfCtx->fpSet.process(pfCtx); } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); - SScalarParam dest = {0}; - dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); + SColumnInfoData idata = {.info = pResColData->info}; + SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - pResult->info.rows = dest.numOfRows; + colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); } } else { ASSERT(0); } } + + pResult->info.rows += numOfRows; } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,