fix(query): fix bug caused by new completed query function.
This commit is contained in:
parent
e91233f5eb
commit
781fbce942
|
@ -674,7 +674,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo
|
||||||
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, SArray* pPseudoList);
|
int32_t numOfOutput, SArray* pPseudoList);
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
||||||
|
|
||||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
|
@ -1031,7 +1031,7 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows,
|
||||||
return ts;
|
return ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
|
@ -1041,11 +1041,11 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) {
|
||||||
if (pBlock->pBlockAgg != NULL) {
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||||
} else {
|
} else {
|
||||||
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1082,7 +1082,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
|
@ -1103,9 +1103,11 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
pInput->startRowIndex = 0;
|
pInput->startRowIndex = 0;
|
||||||
ASSERT(pInput->pData[j] != NULL);
|
ASSERT(pInput->pData[j] != NULL);
|
||||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
if (createDummyCol) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
|
||||||
return code;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4472,7 +4474,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
|
||||||
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
||||||
// pOperator->pRuntimeEnv, true);
|
// pOperator->pRuntimeEnv, true);
|
||||||
doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock);
|
doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock);
|
||||||
|
@ -4745,7 +4747,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
|
||||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||||
|
|
||||||
#if 0 // test for encode/decode result info
|
#if 0 // test for encode/decode result info
|
||||||
|
@ -4971,7 +4973,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
|
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
|
||||||
|
@ -5057,7 +5059,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
||||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
|
||||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
@ -5152,7 +5154,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
||||||
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5197,7 +5199,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
|
||||||
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5247,7 +5249,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC, true);
|
||||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
|
||||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
@ -5375,7 +5377,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
||||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5423,7 +5425,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
||||||
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -284,7 +284,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->pScalarExprInfo != NULL) {
|
if (pInfo->pScalarExprInfo != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue