fix(query): handle crash of taosd caused by a complex expression in GROUP BY clauses.
This commit is contained in:
parent
fac649403a
commit
b9bf2515f5
|
@ -222,7 +222,7 @@ typedef struct SFunctParam {
|
||||||
// the structure for sql function in select clause
|
// the structure for sql function in select clause
|
||||||
typedef struct SResSchame {
|
typedef struct SResSchame {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int32_t colId;
|
int32_t slotId;
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
int32_t precision;
|
int32_t precision;
|
||||||
int32_t scale;
|
int32_t scale;
|
||||||
|
|
|
@ -411,7 +411,7 @@ typedef struct STableScanInfo {
|
||||||
SResultRowInfo* pResultRowInfo;
|
SResultRowInfo* pResultRowInfo;
|
||||||
int32_t* rowCellInfoOffset;
|
int32_t* rowCellInfoOffset;
|
||||||
SExprInfo* pExpr;
|
SExprInfo* pExpr;
|
||||||
SSDataBlock block;
|
SSDataBlock* pResBlock;
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
int32_t numOfOutput;
|
int32_t numOfOutput;
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
|
@ -569,6 +569,9 @@ typedef struct SGroupbyOperatorInfo {
|
||||||
int32_t groupKeyLen; // total group by column width
|
int32_t groupKeyLen; // total group by column width
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SAggSupporter aggSup;
|
SAggSupporter aggSup;
|
||||||
|
SExprInfo* pScalarExprInfo;
|
||||||
|
int32_t numOfScalarExpr;// the number of scalar expression in group operator
|
||||||
|
SqlFunctionCtx*pScalarFuncCtx;
|
||||||
} SGroupbyOperatorInfo;
|
} SGroupbyOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
@ -674,7 +677,7 @@ void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||||
void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
|
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
|
||||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||||
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||||
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
|
int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
|
||||||
|
@ -685,12 +688,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
uint64_t* total, SArray* pColList);
|
uint64_t* total, SArray* pColList);
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||||
SSDataBlock* loadNextDataBlock(void* param);
|
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
||||||
int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
@ -704,8 +706,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
|
||||||
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||||
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
@ -729,6 +731,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf
|
||||||
int32_t numOfOutput);
|
int32_t numOfOutput);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList);
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||||
|
|
||||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
|
@ -234,9 +234,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||||
|
|
||||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType,
|
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
|
int32_t orderType, int32_t* rowCellOffset);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst,
|
static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst,
|
||||||
int64_t keyLast, STimeWindow* win);
|
int64_t keyLast, STimeWindow* win);
|
||||||
|
@ -1068,9 +1067,9 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx,
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
pCtx[i].currentStage = MAIN_SCAN;
|
pCtx[i].currentStage = MAIN_SCAN;
|
||||||
|
|
||||||
SExprInfo expr = pOperator->pExpr[i];
|
SExprInfo* pOneExpr = &pOperator->pExpr[i];
|
||||||
for (int32_t j = 0; j < expr.base.numOfParams; ++j) {
|
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
||||||
SFunctParam *pFuncParam = &expr.base.pParam[j];
|
SFunctParam *pFuncParam = &pOneExpr->base.pParam[j];
|
||||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
int32_t slotId = pFuncParam->pCol->slotId;
|
int32_t slotId = pFuncParam->pCol->slotId;
|
||||||
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
@ -1145,19 +1144,21 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) {
|
||||||
int32_t numOfOutput, SArray* pPseudoList) {
|
|
||||||
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
|
||||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||||
|
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
|
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||||
|
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||||
|
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
|
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows);
|
||||||
|
|
||||||
pResult->info.rows = pSrcBlock->info.rows;
|
pResult->info.rows = pSrcBlock->info.rows;
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
||||||
colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
||||||
}
|
}
|
||||||
|
@ -1167,40 +1168,40 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
|
||||||
taosArrayPush(pBlockList, &pSrcBlock);
|
taosArrayPush(pBlockList, &pSrcBlock);
|
||||||
|
|
||||||
SScalarParam dest = {0};
|
SScalarParam dest = {0};
|
||||||
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
|
dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
|
||||||
pResult->info.rows = dest.numOfRows;
|
pResult->info.rows = dest.numOfRows;
|
||||||
|
|
||||||
taosArrayDestroy(pBlockList);
|
taosArrayDestroy(pBlockList);
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
ASSERT(!fmIsAggFunc(pCtx[k].functionId));
|
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
||||||
|
|
||||||
if (fmIsPseudoColumnFunc(pCtx[k].functionId)) {
|
if (fmIsPseudoColumnFunc(pfCtx->functionId)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) {
|
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
|
||||||
// todo set the correct timestamp column
|
// todo set the correct timestamp column
|
||||||
pCtx[k].input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1);
|
pfCtx->input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1);
|
||||||
|
|
||||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]);
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]);
|
||||||
pCtx[k].fpSet.init(&pCtx[k], pResInfo);
|
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
||||||
|
|
||||||
pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k);
|
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
pCtx[k].offset = pResult->info.rows; // set the start offset
|
pfCtx->offset = pResult->info.rows; // set the start offset
|
||||||
|
|
||||||
if (taosArrayGetSize(pPseudoList) > 0) {
|
if (taosArrayGetSize(pPseudoList) > 0) {
|
||||||
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
|
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
|
||||||
pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]);
|
int32_t numOfRows = pfCtx->fpSet.process(pfCtx);
|
||||||
pResult->info.rows += numOfRows;
|
pResult->info.rows += numOfRows;
|
||||||
} else {
|
} else {
|
||||||
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
|
||||||
taosArrayPush(pBlockList, &pSrcBlock);
|
taosArrayPush(pBlockList, &pSrcBlock);
|
||||||
|
|
||||||
SScalarParam dest = {0};
|
SScalarParam dest = {0};
|
||||||
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
|
dest.columnData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
|
||||||
pResult->info.rows = dest.numOfRows;
|
pResult->info.rows = dest.numOfRows;
|
||||||
|
@ -1810,7 +1811,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
|
||||||
SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
|
SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
|
||||||
if (pFuncCtx == NULL) {
|
if (pFuncCtx == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1851,12 +1852,12 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
|
||||||
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
||||||
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
||||||
|
|
||||||
pCtx->pTsOutput = NULL;//taosArrayInit(4, POINTER_BYTES);
|
pCtx->pTsOutput = NULL;
|
||||||
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
||||||
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
pCtx->resDataInfo.type = pFunct->resSchema.type;
|
||||||
pCtx->order = TSDB_ORDER_ASC;
|
pCtx->order = TSDB_ORDER_ASC;
|
||||||
pCtx->start.key = INT64_MIN;
|
pCtx->start.key = INT64_MIN;
|
||||||
pCtx->end.key = INT64_MIN;
|
pCtx->end.key = INT64_MIN;
|
||||||
#if 0
|
#if 0
|
||||||
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
||||||
// int16_t type = pFunct->param[j].nType;
|
// int16_t type = pFunct->param[j].nType;
|
||||||
|
@ -3330,8 +3331,7 @@ void setIntervalQueryRange(STaskRuntimeEnv* pRuntimeEnv, TSKEY key) {
|
||||||
* @param pQInfo
|
* @param pQInfo
|
||||||
* @param result
|
* @param result
|
||||||
*/
|
*/
|
||||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType,
|
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset) {
|
||||||
SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
|
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
||||||
|
|
||||||
|
@ -3370,7 +3370,9 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
|
||||||
pGroupResInfo->index += 1;
|
pGroupResInfo->index += 1;
|
||||||
|
|
||||||
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
|
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
|
|
||||||
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
@ -3391,8 +3393,8 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity,
|
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||||
int32_t* rowCellOffset) {
|
int32_t* rowCellOffset) {
|
||||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
@ -3401,7 +3403,7 @@ void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t orderType = TSDB_ORDER_ASC;
|
int32_t orderType = TSDB_ORDER_ASC;
|
||||||
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset);
|
doCopyToSDataBlock(pBlock, rowCapacity, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset);
|
||||||
|
|
||||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock);
|
||||||
|
@ -4438,32 +4440,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
|
|
||||||
SSDataBlock* pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
|
||||||
if (pResBlock == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pExprInfo);
|
|
||||||
pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
|
||||||
|
|
||||||
SArray* pResult = pResBlock->pDataBlock;
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData colInfoData = {0};
|
|
||||||
SExprInfo* p = taosArrayGetP(pExprInfo, i);
|
|
||||||
|
|
||||||
SResSchema* pSchema = &p->base.resSchema;
|
|
||||||
colInfoData.info.type = pSchema->type;
|
|
||||||
colInfoData.info.colId = pSchema->colId;
|
|
||||||
colInfoData.info.bytes = pSchema->bytes;
|
|
||||||
colInfoData.info.scale = pSchema->scale;
|
|
||||||
colInfoData.info.precision = pSchema->precision;
|
|
||||||
taosArrayPush(pResult, &colInfoData);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pResBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey);
|
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey);
|
||||||
static void cleanupAggSup(SAggSupporter* pAggSup);
|
static void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
|
|
||||||
|
@ -4777,7 +4753,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr
|
||||||
SColumn* pCol = taosArrayGet(pGroupInfo, i);
|
SColumn* pCol = taosArrayGet(pGroupInfo, i);
|
||||||
for (int32_t j = 0; j < numOfCols; ++j) {
|
for (int32_t j = 0; j < numOfCols; ++j) {
|
||||||
SExprInfo* pe = &pExprInfo[j];
|
SExprInfo* pe = &pExprInfo[j];
|
||||||
if (pe->base.resSchema.colId == pCol->colId) {
|
if (pe->base.resSchema.slotId == pCol->colId) {
|
||||||
taosArrayPush(plist, pCol);
|
taosArrayPush(plist, pCol);
|
||||||
taosArrayPush(pInfo->groupInfo, &j);
|
taosArrayPush(pInfo->groupInfo, &j);
|
||||||
len += pCol->bytes;
|
len += pCol->bytes;
|
||||||
|
@ -4816,7 +4792,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||||
|
|
||||||
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
||||||
|
@ -5137,9 +5113,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity,
|
toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset);
|
||||||
pAggInfo->binfo.rowCellInfoOffset);
|
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -5188,8 +5162,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo* pOperator, bool* newgro
|
||||||
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
|
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
|
initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
|
||||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity,
|
toSDatablock(pInfo->pRes, pAggInfo->binfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->pResultBuf, pAggInfo->binfo.rowCellInfoOffset);
|
||||||
pAggInfo->binfo.rowCellInfoOffset);
|
|
||||||
|
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
|
@ -5204,6 +5177,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if (pProjectInfo->existDataBlock) { // TODO refactor
|
if (pProjectInfo->existDataBlock) { // TODO refactor
|
||||||
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
|
||||||
|
@ -5392,8 +5366,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
|
@ -5411,8 +5384,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -5447,8 +5419,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
|
|
||||||
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
@ -5693,7 +5664,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5725,7 +5696,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5742,7 +5713,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5774,7 +5745,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5953,7 +5924,7 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
|
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
||||||
pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
||||||
pBasicInfo->pRes = pResultBlock;
|
pBasicInfo->pRes = pResultBlock;
|
||||||
pBasicInfo->capacity = numOfRows;
|
pBasicInfo->capacity = numOfRows;
|
||||||
|
|
||||||
|
@ -6382,8 +6353,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
|
||||||
SExprInfo* pExpr, int32_t numOfOutput) {
|
SExprInfo* pExpr, int32_t numOfOutput) {
|
||||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||||
|
|
||||||
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
||||||
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -6406,8 +6375,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
|
||||||
SExprInfo* pExpr, int32_t numOfOutput) {
|
SExprInfo* pExpr, int32_t numOfOutput) {
|
||||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||||
|
|
||||||
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
|
||||||
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
@ -6685,10 +6652,10 @@ bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr
|
||||||
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
|
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
|
||||||
const char* name) {
|
const char* name) {
|
||||||
SResSchema s = {0};
|
SResSchema s = {0};
|
||||||
s.scale = scale;
|
s.scale = scale;
|
||||||
s.type = type;
|
s.type = type;
|
||||||
s.bytes = bytes;
|
s.bytes = bytes;
|
||||||
s.colId = slotId;
|
s.slotId = slotId;
|
||||||
s.precision = precision;
|
s.precision = precision;
|
||||||
strncpy(s.name, name, tListLen(s.name));
|
strncpy(s.name, name, tListLen(s.name));
|
||||||
|
|
||||||
|
@ -6730,7 +6697,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
|
pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
|
||||||
}
|
}
|
||||||
|
|
||||||
SExprInfo* pExp = &pExprs[pTargetNode->slotId];
|
SExprInfo* pExp = &pExprs[i];
|
||||||
|
|
||||||
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
||||||
pExp->pExpr->_function.num = 1;
|
pExp->pExpr->_function.num = 1;
|
||||||
|
@ -6850,8 +6817,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
||||||
pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo);
|
pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
|
||||||
|
@ -6906,9 +6875,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
SExprInfo* pScalarExprInfo = NULL;
|
||||||
|
int32_t numOfScalarExpr = 0;
|
||||||
if (pAggNode->pGroupKeys != NULL) {
|
if (pAggNode->pGroupKeys != NULL) {
|
||||||
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
|
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
|
||||||
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL);
|
if (pAggNode->pExprs != NULL) {
|
||||||
|
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL);
|
||||||
} else {
|
} else {
|
||||||
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
|
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
|
||||||
}
|
}
|
||||||
|
@ -7155,10 +7130,15 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
}
|
}
|
||||||
|
|
||||||
*numOfOutputCols = 0;
|
*numOfOutputCols = 0;
|
||||||
|
|
||||||
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
||||||
|
// todo: add reserve flag check
|
||||||
|
if (pNode->slotId >= numOfCols) { // it is a column reserved for the arithmetic expression calculation
|
||||||
|
(*numOfOutputCols) += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
|
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
|
||||||
if (pNode->output) {
|
if (pNode->output) {
|
||||||
(*numOfOutputCols) += 1;
|
(*numOfOutputCols) += 1;
|
||||||
|
|
|
@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -285,6 +285,12 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
||||||
|
|
||||||
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
|
if (pInfo->pScalarExprInfo != NULL) {
|
||||||
|
projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||||
doHashGroupbyAgg(pOperator, pBlock);
|
doHashGroupbyAgg(pOperator, pBlock);
|
||||||
}
|
}
|
||||||
|
@ -305,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||||
|
@ -322,16 +328,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
return (pRes->info.rows == 0)? NULL:pRes;
|
return (pRes->info.rows == 0)? NULL:pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
|
||||||
const STableGroupInfo* pTableGroupInfo) {
|
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||||
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pGroupCols = pGroupColList;
|
pInfo->pGroupCols = pGroupColList;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pCondition;
|
||||||
|
|
||||||
|
pInfo->pScalarExprInfo = pScalarExprInfo;
|
||||||
|
pInfo->numOfScalarExpr = numOfScalarExpr;
|
||||||
|
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
|
|
||||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -113,7 +113,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SSDataBlock* pBlock = &pTableScanInfo->block;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
|
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
|
||||||
|
|
||||||
*newgroup = false;
|
*newgroup = false;
|
||||||
|
@ -218,7 +218,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
||||||
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
|
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
|
||||||
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||||
assert(repeatTime > 0);
|
assert(repeatTime > 0);
|
||||||
|
|
||||||
|
@ -232,12 +232,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->block.pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
|
pInfo->pResBlock = pResBlock;
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
SColumnInfoData idata = {0};
|
|
||||||
taosArrayPush(pInfo->block.pDataBlock, &idata);
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->pFilterNode = pCondition;
|
pInfo->pFilterNode = pCondition;
|
||||||
pInfo->dataReader = pTsdbReadHandle;
|
pInfo->dataReader = pTsdbReadHandle;
|
||||||
pInfo->times = repeatTime;
|
pInfo->times = repeatTime;
|
||||||
|
@ -312,7 +307,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
|
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
|
||||||
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
|
||||||
|
|
||||||
SSDataBlock* pBlock = &pTableScanInfo->block;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pBlock->info.numOfCols = 1;
|
pBlock->info.numOfCols = 1;
|
||||||
|
|
||||||
|
@ -343,13 +338,13 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dataReader = dataReader;
|
pInfo->dataReader = dataReader;
|
||||||
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
// pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
SColumnInfoData infoData = {0};
|
SColumnInfoData infoData = {0};
|
||||||
infoData.info.type = TSDB_DATA_TYPE_BINARY;
|
infoData.info.type = TSDB_DATA_TYPE_BINARY;
|
||||||
infoData.info.bytes = 1024;
|
infoData.info.bytes = 1024;
|
||||||
infoData.info.colId = 0;
|
infoData.info.colId = 0;
|
||||||
taosArrayPush(pInfo->block.pDataBlock, &infoData);
|
// taosArrayPush(pInfo->block.pDataBlock, &infoData);
|
||||||
|
|
||||||
pOperator->name = "DataBlockInfoScanOperator";
|
pOperator->name = "DataBlockInfoScanOperator";
|
||||||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||||
|
|
|
@ -541,7 +541,7 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
|
||||||
pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes;
|
pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes;
|
||||||
pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type;
|
pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type;
|
||||||
pFillCol[i].col.offset = offset;
|
pFillCol[i].col.offset = offset;
|
||||||
pFillCol[i].col.colId = pExprInfo->base.resSchema.colId;
|
pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId;
|
||||||
pFillCol[i].tagIndex = -2;
|
pFillCol[i].tagIndex = -2;
|
||||||
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
|
||||||
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
|
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
|
||||||
|
|
Loading…
Reference in New Issue