fix(query): fix the crash caused by the complex having clause in which an scalar function is nested with an aggregate function in group by query.
This commit is contained in:
parent
b122538268
commit
f25a72e5c3
|
@ -490,6 +490,7 @@ typedef struct SGroupbyOperatorInfo {
|
||||||
SExprInfo* pScalarExprInfo;
|
SExprInfo* pScalarExprInfo;
|
||||||
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
||||||
SqlFunctionCtx* pScalarFuncCtx;
|
SqlFunctionCtx* pScalarFuncCtx;
|
||||||
|
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
||||||
} SGroupbyOperatorInfo;
|
} SGroupbyOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
|
|
@ -1173,6 +1173,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||||
|
|
||||||
|
// if the source equals to the destination, it is to create a new column as the result of scalar function or some operators.
|
||||||
|
bool createNewColModel = (pResult == pSrcBlock);
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
|
@ -1181,7 +1184,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
|
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
if (pResult->info.rows > 0) {
|
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||||
colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
} else {
|
} else {
|
||||||
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
|
@ -1191,7 +1194,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
int32_t offset = pResult->info.rows;
|
int32_t offset = createNewColModel? 0: pResult->info.rows;
|
||||||
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
||||||
}
|
}
|
||||||
|
@ -1207,7 +1210,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
SScalarParam dest = {.columnData = &idata};
|
SScalarParam dest = {.columnData = &idata};
|
||||||
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||||
|
|
||||||
colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows);
|
int32_t startOffset = createNewColModel? 0:pResult->info.rows;
|
||||||
|
colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows);
|
||||||
|
|
||||||
numOfRows = dest.numOfRows;
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
|
@ -1224,7 +1228,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
||||||
|
|
||||||
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
pfCtx->offset = pResult->info.rows; // set the start offset
|
pfCtx->offset = createNewColModel? 0:pResult->info.rows; // set the start offset
|
||||||
|
|
||||||
// set the timestamp(_rowts) output buffer
|
// set the timestamp(_rowts) output buffer
|
||||||
if (taosArrayGetSize(pPseudoList) > 0) {
|
if (taosArrayGetSize(pPseudoList) > 0) {
|
||||||
|
@ -1242,7 +1246,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
|
|
||||||
SScalarParam dest = {.columnData = &idata};
|
SScalarParam dest = {.columnData = &idata};
|
||||||
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||||
colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows);
|
|
||||||
|
int32_t startOffset = createNewColModel? 0:pResult->info.rows;
|
||||||
|
colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows);
|
||||||
|
|
||||||
numOfRows = dest.numOfRows;
|
numOfRows = dest.numOfRows;
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
|
@ -1252,7 +1258,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pResult->info.rows += numOfRows;
|
if (!createNewColModel) {
|
||||||
|
pResult->info.rows += numOfRows;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
|
||||||
|
|
|
@ -341,7 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
|
|
||||||
pInfo->pScalarExprInfo = pScalarExprInfo;
|
pInfo->pScalarExprInfo = pScalarExprInfo;
|
||||||
pInfo->numOfScalarExpr = numOfScalarExpr;
|
pInfo->numOfScalarExpr = numOfScalarExpr;
|
||||||
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue