From 781fbce942a79f47a4a3b5ddab1e3446d7f608ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Apr 2022 12:07:04 +0800 Subject: [PATCH] fix(query): fix bug caused by new completed query function. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 34 +++++++++++++----------- source/libs/executor/src/groupoperator.c | 2 +- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fe230f04b4..5ef1754913 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -674,7 +674,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 425696ced0..ed2454faae 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1031,7 +1031,7 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows, return ts; } -static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); +static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol); static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { @@ -1041,11 +1041,11 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC } } -void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { +void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) { if (pBlock->pBlockAgg != NULL) { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); } else { - doSetInputDataBlock(pOperator, pCtx, pBlock, order); + doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol); } } @@ -1082,7 +1082,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc return TSDB_CODE_SUCCESS; } -static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { +static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { @@ -1103,9 +1103,11 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pInput->startRowIndex = 0; ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (createDummyCol) { + code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -4472,7 +4474,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { break; } - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true); // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // pOperator->pRuntimeEnv, true); doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); @@ -4745,7 +4747,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true); doAggregateImpl(pOperator, 0, pInfo->pCtx); #if 0 // test for encode/decode result info @@ -4971,7 +4973,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) // } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); @@ -5057,7 +5059,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); @@ -5152,7 +5154,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } @@ -5197,7 +5199,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order); + setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); } @@ -5247,7 +5249,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup // the pDataBlock are always the same one, no need to call this again // setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); @@ -5375,7 +5377,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { break; } - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true); doStateWindowAggImpl(pOperator, pInfo, pBlock); } @@ -5423,7 +5425,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true); doSessionWindowAggImpl(pOperator, pInfo, pBlock); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d58bcd1162..8739371dd9 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -284,7 +284,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->pScalarExprInfo != NULL) {