fix(query): do perform arithmetic operator before apply the sort procedure.
This commit is contained in:
parent
eb18d518ed
commit
c1659805b6
|
@ -3279,7 +3279,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
|
|||
|
||||
pDataBlockInfo->rows = cur->rows;
|
||||
pDataBlockInfo->window = cur->win;
|
||||
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
|
||||
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -277,7 +277,7 @@ typedef struct SOperatorInfo {
|
|||
uint8_t operatorType;
|
||||
bool blocking; // block operator or not
|
||||
uint8_t status; // denote if current operator is completed
|
||||
int32_t numOfOutput; // number of columns of the current operator results
|
||||
int32_t numOfExprs; // number of columns of the current operator results
|
||||
char* name; // name, used to show the query execution plan
|
||||
void* info; // extension attribution
|
||||
SExprInfo* pExpr;
|
||||
|
@ -415,8 +415,6 @@ typedef struct SOptrBasicInfo {
|
|||
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
||||
typedef struct SAggSupporter {
|
||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||
// SArray* pResultRowArrayList; // The array list that contains the Result rows
|
||||
char* keyBuf; // window key buffer
|
||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
|
@ -577,13 +575,13 @@ typedef struct SSortedMergeOperatorInfo {
|
|||
} SSortedMergeOperatorInfo;
|
||||
|
||||
typedef struct SSortOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SSDataBlock* pDataBlock;
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
SArray* inputSlotMap; // for index map from table scan output
|
||||
int32_t bufPageSize;
|
||||
int32_t numOfRowsInRes;
|
||||
// int32_t numOfRowsInRes;
|
||||
|
||||
// TODO extact struct
|
||||
int64_t startTs; // sort start time
|
||||
|
@ -645,10 +643,13 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
|
||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
|
||||
int32_t* rowCellInfoOffset);
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
|
||||
SSDataBlock* loadNextDataBlock(void* param);
|
||||
|
||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||
|
||||
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
||||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||
|
@ -663,7 +664,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ int32_t tsortClose(SSortHandle* pHandle);
|
|||
*
|
||||
* @return
|
||||
*/
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param);
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -202,9 +202,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData idata = {{0}};
|
||||
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
|
||||
if (!pDescNode->output) {
|
||||
continue;
|
||||
}
|
||||
// if (!pDescNode->output) { // todo disable it temporarily
|
||||
// continue;
|
||||
// }
|
||||
|
||||
idata.info.type = pDescNode->dataType.type;
|
||||
idata.info.bytes = pDescNode->dataType.bytes;
|
||||
|
@ -651,7 +651,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
|
||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
||||
int32_t order) {
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
|
||||
|
@ -713,7 +713,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
bool createDummyCol) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].pSrcBlock = pBlock;
|
||||
|
@ -798,7 +798,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
}
|
||||
|
||||
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
|
||||
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
||||
if (functionNeedToExecute(&pCtx[k])) {
|
||||
pCtx[k].startTs = startTs; // this can be set during create the struct
|
||||
pCtx[k].fpSet.process(&pCtx[k]);
|
||||
|
@ -2815,7 +2815,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
// NOTE: sources columns are more than the destination SSDatablock columns.
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) {
|
||||
size_t numOfSrcCols = taosArrayGetSize(pCols);
|
||||
ASSERT(numOfSrcCols >= pBlock->info.numOfCols);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
|
||||
|
@ -3287,7 +3286,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pBlock->info.numOfCols;
|
||||
pOperator->numOfExprs = pBlock->info.numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||
|
@ -3345,49 +3344,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
cleanupAggSup(&pInfo->aggSup);
|
||||
}
|
||||
|
||||
// TODO merge aggregate super table
|
||||
static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
||||
if (isNull) {
|
||||
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
|
||||
} else {
|
||||
char* pData = tsortGetValue(pTupleHandle, i);
|
||||
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
||||
|
||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
|
||||
if (pDataBlock->info.rows >= capacity) {
|
||||
return pDataBlock;
|
||||
}
|
||||
}
|
||||
|
||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* loadNextDataBlock(void* param) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
return pOperator->fpSet.getNextFn(pOperator);
|
||||
}
|
||||
|
||||
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
|
||||
size_t size = taosArrayGetSize(groupInfo);
|
||||
if (size == 0) {
|
||||
|
@ -3490,8 +3446,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
|
|||
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
|
||||
} else {
|
||||
doFinalizeResultImpl(pCtx, numOfExpr);
|
||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL);
|
||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
|
||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
|
||||
|
||||
// TODO check for available buffer;
|
||||
|
||||
|
@ -3541,13 +3497,13 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
|
||||
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
||||
// pOperator->pRuntimeEnv, true);
|
||||
doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock);
|
||||
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
||||
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
|
||||
}
|
||||
|
||||
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL);
|
||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
|
||||
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
|
||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
|
||||
|
||||
// TODO check for available buffer;
|
||||
|
||||
|
@ -3571,7 +3527,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
|||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
|
@ -3678,7 +3634,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = num;
|
||||
pOperator->numOfExprs = num;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
@ -3703,79 +3659,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT,
|
||||
pInfo->bufPageSize, numOfBufPage, pInfo->pDataBlock, pTaskInfo->id.str);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
ps->param = pOperator->pDownstream[0];
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
taosMemoryFreeClear(ps);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
int32_t rowSize = pResBlock->info.rowSize;
|
||||
|
||||
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
|
||||
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
|
||||
pInfo->numOfRowsInRes = 1024;
|
||||
pInfo->pDataBlock = pResBlock;
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->inputSlotMap = pIndexMap;
|
||||
|
||||
pOperator->name = "SortOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
||||
|
@ -3813,7 +3696,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
// if (pAggInfo->current != NULL) {
|
||||
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// }
|
||||
|
||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||
|
@ -3827,7 +3710,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
|
||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||
|
||||
|
@ -3848,7 +3731,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf,
|
||||
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf,
|
||||
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
|
||||
|
@ -4092,17 +3975,17 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
// todo dynamic set tags
|
||||
// if (pTableQueryInfo != NULL) {
|
||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// }
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
||||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
|
||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs);
|
||||
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
|
||||
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
|
||||
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
|
||||
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs);
|
||||
return pRes;
|
||||
}
|
||||
}
|
||||
|
@ -4127,14 +4010,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
pProjectInfo->existDataBlock = pBlock;
|
||||
break;
|
||||
} else { // init output buffer for a new group data
|
||||
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput);
|
||||
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
|
||||
}
|
||||
}
|
||||
|
||||
// todo dynamic set tags
|
||||
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
// if (pTableQueryInfo != NULL) {
|
||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// }
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
|
@ -4143,7 +4026,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
|
||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput,
|
||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
||||
pProjectInfo->pPseudoColInfo);
|
||||
|
||||
int32_t status = handleLimitOffset(pOperator, pBlock);
|
||||
|
@ -4156,7 +4039,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
|
||||
pProjectInfo->curOutput += pInfo->pRes->info.rows;
|
||||
|
||||
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
|
||||
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
|
||||
}
|
||||
|
||||
|
@ -4289,7 +4172,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pOperator->fpSet.closeFn != NULL) {
|
||||
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput);
|
||||
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs);
|
||||
}
|
||||
|
||||
if (pOperator->pDownstream != NULL) {
|
||||
|
@ -4425,7 +4308,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||
|
@ -4477,14 +4360,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
|
||||
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
||||
|
||||
taosArrayDestroy(pInfo->pSortInfo);
|
||||
taosArrayDestroy(pInfo->inputSlotMap);
|
||||
}
|
||||
|
||||
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
||||
taosArrayDestroy(pExInfo->pSources);
|
||||
|
@ -4538,7 +4413,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = num;
|
||||
pOperator->numOfExprs = num;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -4621,7 +4496,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
|
@ -4979,7 +4854,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
|
||||
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
if (pSortPhyNode->pExprs != NULL) {
|
||||
pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||
}
|
||||
|
||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
|
@ -5566,7 +5448,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
|||
// only the timestamp match support for ordinary table
|
||||
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
||||
|
||||
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
||||
|
@ -5633,7 +5515,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
|
|
@ -227,16 +227,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
int32_t rowIndex = j - num;
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
|
||||
num = 1;
|
||||
}
|
||||
|
@ -244,15 +244,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
if (num > 0) {
|
||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
int32_t rowIndex = pBlock->info.rows - num;
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||
doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -291,19 +291,19 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
||||
}
|
||||
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||
doHashGroupbyAgg(pOperator, pBlock);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
// if (!stableQuery) { // finalize include the update of result rows
|
||||
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||
// } else {
|
||||
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo,
|
||||
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
|
||||
// pInfo->binfo.rowCellInfoOffset);
|
||||
// }
|
||||
|
||||
|
@ -357,7 +357,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
// pOperator->operatorType = OP_Groupby;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
@ -392,7 +392,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
|
||||
int32_t* rows = (int32_t*) pPage;
|
||||
|
||||
size_t numOfCols = pOperator->numOfOutput;
|
||||
size_t numOfCols = pOperator->numOfExprs;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SExprInfo* pExpr = &pOperator->pExpr[i];
|
||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
|
@ -565,7 +565,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||
doHashPartition(pOperator, pBlock);
|
||||
}
|
||||
|
||||
|
@ -616,7 +616,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
pInfo->binfo.pRes = pResultBlock;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
|
|
|
@ -386,7 +386,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->numOfExprs = numOfOutput;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
|
@ -646,7 +646,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||
pOperator->numOfExprs = pResBlock->info.numOfCols;
|
||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
||||
pOperator->fpSet.getNextFn = doStreamBlockScan;
|
||||
pOperator->fpSet.closeFn = operatorDummyCloseFn;
|
||||
|
@ -1020,7 +1020,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
||||
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||
pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols);
|
||||
pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
|
||||
|
||||
// todo log the filter info
|
||||
doFilterResult(pInfo);
|
||||
|
@ -1150,7 +1150,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
|||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||
pOperator->numOfExprs = pResBlock->info.numOfCols;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
@ -1247,7 +1247,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
|
||||
char *data = NULL, *dst = NULL;
|
||||
int16_t type = 0, bytes = 0;
|
||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||
for(int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
||||
// not assign value in case of user defined constant output column
|
||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
||||
continue;
|
||||
|
@ -1308,7 +1308,7 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->numOfExprs = numOfOutput;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
|
|
|
@ -325,7 +325,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
|
|||
|
||||
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
||||
|
||||
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
|
||||
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
||||
int32_t functionId = pCtx[k].functionId;
|
||||
if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
|
||||
pCtx[k].start.key = INT64_MIN;
|
||||
|
@ -406,12 +406,12 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF
|
|||
// start exactly from this point, no need to do interpolation
|
||||
TSKEY key = ascQuery ? win->skey : win->ekey;
|
||||
if (key == curTs) {
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) {
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -427,7 +427,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
|
|||
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
|
||||
STimeWindow* win) {
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
|
||||
TSKEY actualEndKey = tsCols[endRowIndex];
|
||||
TSKEY key = order ? win->ekey : win->skey;
|
||||
|
@ -572,7 +572,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
|
|||
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
|
||||
}
|
||||
} else {
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
|
||||
}
|
||||
|
||||
// point interpolation does not require the end key time window interpolation.
|
||||
|
@ -592,7 +592,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
|
|||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
}
|
||||
} else {
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP);
|
||||
setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -612,7 +612,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
|
||||
SArray* pUpdated = NULL;
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
|
@ -683,7 +683,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
||||
|
||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP);
|
||||
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
|
||||
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
@ -773,7 +773,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||
|
@ -798,7 +798,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
|
||||
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
|
@ -813,7 +813,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
int64_t gid = pBlock->info.groupId;
|
||||
|
||||
bool masterScan = true;
|
||||
int32_t numOfOutput = pOperator->numOfOutput;
|
||||
int32_t numOfOutput = pOperator->numOfExprs;
|
||||
int16_t bytes = pStateColInfoData->info.bytes;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
|
@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
|
@ -1013,13 +1013,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
|
||||
// caller. Note that all the time window are not close till now.
|
||||
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
||||
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
@ -1082,7 +1082,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
|
||||
|
@ -1141,7 +1141,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
|
||||
|
@ -1169,7 +1169,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
|
||||
bool masterScan = true;
|
||||
int32_t numOfOutput = pOperator->numOfOutput;
|
||||
int32_t numOfOutput = pOperator->numOfExprs;
|
||||
int64_t gid = pBlock->info.groupId;
|
||||
|
||||
int64_t gap = pInfo->gap;
|
||||
|
@ -1270,7 +1270,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
|
||||
|
@ -1309,7 +1309,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
|
||||
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
|
@ -1319,7 +1319,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pSliceInfo->binfo.resultRowInfo);
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||
|
||||
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
||||
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
|
||||
|
@ -1346,7 +1346,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
@ -1388,7 +1388,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
|
@ -1440,7 +1440,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||
|
|
|
@ -42,11 +42,7 @@ struct SSortHandle {
|
|||
|
||||
_sort_fetch_block_fn_t fetchfp;
|
||||
_sort_merge_compar_fn_t comparFn;
|
||||
|
||||
void *pParam;
|
||||
SMultiwayMergeTreeInfo *pMergeTree;
|
||||
int32_t numOfCols;
|
||||
|
||||
int64_t startTs;
|
||||
uint64_t sortElapsed;
|
||||
uint64_t totalElapsed;
|
||||
|
@ -61,6 +57,9 @@ struct SSortHandle {
|
|||
bool inMemSort;
|
||||
bool needAdjust;
|
||||
STupleHandle tupleHandle;
|
||||
|
||||
void *param;
|
||||
void (*beforeFp)(SSDataBlock* pBlock, void* param);
|
||||
};
|
||||
|
||||
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
|
||||
|
@ -533,6 +532,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
|||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||
}
|
||||
|
||||
// perform the scalar function calculation before apply the sort
|
||||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
|
||||
// todo relocate the columns
|
||||
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
|
@ -623,8 +629,10 @@ int32_t tsortClose(SSortHandle* pHandle) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
|
||||
pHandle->fetchfp = fp;
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) {
|
||||
pHandle->fetchfp = fetchFp;
|
||||
pHandle->beforeFp = fp;
|
||||
pHandle->param = param;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue