refactor: do some internal refactor.
This commit is contained in:
parent
9c6a9f1c92
commit
1c52b59344
|
@ -831,10 +831,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
||||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
||||||
|
|
||||||
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
||||||
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
|
||||||
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
|
|
||||||
bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
|
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
|
@ -3073,7 +3073,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||||
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||||
|
@ -4162,18 +4162,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
static int32_t convertFillType(int32_t mode) {
|
||||||
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock,
|
|
||||||
int32_t fillType, SNodeListNode* pValueNode, bool multigroupResult,
|
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
|
|
||||||
pInfo->pRes = pResBlock;
|
|
||||||
pInfo->multigroupResult = multigroupResult;
|
|
||||||
|
|
||||||
int32_t type = TSDB_FILL_NONE;
|
int32_t type = TSDB_FILL_NONE;
|
||||||
switch (fillType) {
|
switch (mode) {
|
||||||
case FILL_MODE_PREV:
|
case FILL_MODE_PREV:
|
||||||
type = TSDB_FILL_PREV;
|
type = TSDB_FILL_PREV;
|
||||||
break;
|
break;
|
||||||
|
@ -4196,26 +4187,46 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
type = TSDB_FILL_NONE;
|
type = TSDB_FILL_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t num = 0;
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
|
||||||
|
SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||||
|
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||||
|
|
||||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, pTaskInfo->id.str,
|
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||||
pInterval, type);
|
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "FillOperator";
|
pInfo->pRes = pResBlock;
|
||||||
pOperator->blocking = false;
|
pInfo->multigroupResult = multigroupResult;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->name = "FillOperator";
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||||
pOperator->pExpr = pExpr;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = num;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -4590,6 +4601,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
|
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
|
||||||
pScanPhyNode->node.pConditions);
|
pScanPhyNode->node.pConditions);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pTaskInfo->code = terrno;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4763,13 +4775,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
|
||||||
pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
|
pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
|
||||||
SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode;
|
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num);
|
|
||||||
|
|
||||||
SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
|
|
||||||
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode,
|
|
||||||
(SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
||||||
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -2133,7 +2133,7 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
||||||
pInfo->pSortHandle =
|
pInfo->pSortHandle =
|
||||||
tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||||
numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str);
|
numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
|
||||||
|
|
|
@ -154,7 +154,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
|
||||||
pInfo->startTs = taosGetTimestampUs();
|
pInfo->startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
||||||
NULL, pTaskInfo->id.str);
|
NULL, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||||
|
@ -248,7 +248,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE,
|
||||||
pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str);
|
pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||||
|
|
|
@ -71,7 +71,7 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
|
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
|
||||||
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
||||||
|
|
||||||
pSortHandle->type = type;
|
pSortHandle->type = type;
|
||||||
|
|
Loading…
Reference in New Issue