refactor(query): do some internal refactor in executor.
This commit is contained in:
parent
dc76f2e91a
commit
9864e367bb
|
@ -342,7 +342,7 @@ typedef struct STableScanInfo {
|
||||||
|
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
double sampleRatio; // data block sample ratio
|
double sampleRatio; // data block sample ratio, 1 by default
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
|
@ -395,7 +395,6 @@ typedef struct SOptrBasicInfo {
|
||||||
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
||||||
SqlFunctionCtx* pCtx;
|
SqlFunctionCtx* pCtx;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
int32_t capacity; // TODO remove it
|
|
||||||
} SOptrBasicInfo;
|
} SOptrBasicInfo;
|
||||||
|
|
||||||
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
||||||
|
@ -422,6 +421,8 @@ typedef struct STableIntervalOperatorInfo {
|
||||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||||
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||||
|
double watermark; // water mark
|
||||||
|
int32_t trigger; // trigger model
|
||||||
} STableIntervalOperatorInfo;
|
} STableIntervalOperatorInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
typedef struct SAggOperatorInfo {
|
||||||
|
@ -602,7 +603,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||||
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||||
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
||||||
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
|
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
|
||||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
||||||
|
|
|
@ -4449,7 +4449,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
SSortHandle* pHandle = pInfo->pSortHandle;
|
SSortHandle* pHandle = pInfo->pSortHandle;
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
|
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
|
||||||
blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
@ -4461,7 +4461,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// build datablock for merge for one group
|
// build datablock for merge for one group
|
||||||
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
|
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
|
||||||
if (pDataBlock->info.rows >= pInfo->binfo.capacity) {
|
if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4496,7 +4496,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
@ -4603,7 +4603,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
pInfo->bufPageSize = 1024;
|
pInfo->bufPageSize = 1024;
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
|
|
||||||
pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
|
pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
|
||||||
|
|
||||||
pOperator->name = "SortedMerge";
|
pOperator->name = "SortedMerge";
|
||||||
// pOperator->operatorType = OP_SortedMerge;
|
// pOperator->operatorType = OP_SortedMerge;
|
||||||
|
@ -5103,8 +5103,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
||||||
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
@ -5124,7 +5124,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -5158,8 +5158,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
@ -5260,8 +5260,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pInfo->binfo.pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
|
||||||
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
@ -5352,7 +5352,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5383,8 +5383,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5401,7 +5401,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5432,8 +5432,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||||
|
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
toSDatablock(pBInfo->pRes, pBInfo->capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -5611,15 +5611,23 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
||||||
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
||||||
pBasicInfo->pRes = pResultBlock;
|
pBasicInfo->pRes = pResultBlock;
|
||||||
pBasicInfo->capacity = numOfRows;
|
|
||||||
|
|
||||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||||
|
pOperator->resultInfo.capacity = numOfRows;
|
||||||
|
pOperator->resultInfo.threshold = numOfRows * 0.75;
|
||||||
|
|
||||||
|
if (pOperator->resultInfo.threshold == 0) {
|
||||||
|
pOperator->resultInfo.capacity = numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
|
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
|
||||||
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
||||||
if (pTableQueryInfo == NULL) {
|
if (pTableQueryInfo == NULL) {
|
||||||
|
@ -5655,7 +5663,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
int32_t numOfRows = 1;
|
int32_t numOfRows = 1;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
|
||||||
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -5805,7 +5815,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
int32_t numOfCols = num;
|
int32_t numOfCols = num;
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
|
||||||
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
|
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
||||||
|
|
||||||
|
@ -5854,7 +5866,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
|
||||||
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
|
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
|
||||||
|
|
||||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
|
@ -5933,7 +5947,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
||||||
|
|
||||||
pInfo->colIndex = -1;
|
pInfo->colIndex = -1;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str);
|
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
pOperator->name = "StateWindowOperator";
|
pOperator->name = "StateWindowOperator";
|
||||||
|
@ -5968,7 +5984,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
|
||||||
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -307,11 +307,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
// pInfo->binfo.rowCellInfoOffset);
|
// pInfo->binfo.rowCellInfoOffset);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
blockDataEnsureCapacity(pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
toSDatablock(pRes, pInfo->binfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||||
|
@ -348,7 +348,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||||
|
|
||||||
pOperator->name = "GroupbyAggOperator";
|
pOperator->name = "GroupbyAggOperator";
|
||||||
|
|
Loading…
Reference in New Issue