From 1c52b59344dfeb8bcaee8d287dc6527e7470aaea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Jun 2022 15:48:32 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 5 +- source/libs/executor/inc/tsort.h | 2 +- source/libs/executor/src/executorimpl.c | 62 ++++++++++++++----------- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/sortoperator.c | 4 +- source/libs/executor/src/tsort.c | 2 +- 6 files changed, 41 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 034e2893df..6452f7cf7f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -831,10 +831,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, + SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, - bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 86ee841cc2..363f379ee4 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr); +SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr); /** * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 40b019eb5d..6847605979 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3073,7 +3073,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); @@ -4162,18 +4162,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t } } -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, - int32_t fillType, SNodeListNode* pValueNode, bool multigroupResult, - SExecTaskInfo* pTaskInfo) { - SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - - pInfo->pRes = pResBlock; - pInfo->multigroupResult = multigroupResult; - +static int32_t convertFillType(int32_t mode) { int32_t type = TSDB_FILL_NONE; - switch (fillType) { + switch (mode) { case FILL_MODE_PREV: type = TSDB_FILL_PREV; break; @@ -4196,26 +4187,46 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp type = TSDB_FILL_NONE; } + return type; +} + +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult, + SExecTaskInfo* pTaskInfo) { + SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + int32_t num = 0; + SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num); + SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval; + int32_t type = convertFillType(pPhyFillNode->mode); + SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(pOperator, 4096); - int32_t code = initFillInfo(pInfo, pExpr, numOfCols, pValueNode, *pWindow, pResultInfo->capacity, pTaskInfo->id.str, - pInterval, type); + int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, + pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pOperator->name = "FillOperator"; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; + pInfo->pRes = pResBlock; + pInfo->multigroupResult = multigroupResult; + pOperator->name = "FillOperator"; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = num; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); - pOperator->pTaskInfo = pTaskInfo; + code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -4590,6 +4601,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions); if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; return NULL; } @@ -4763,13 +4775,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num); pOptr = createMergeJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { - SFillPhysiNode* pFillNode = (SFillPhysiNode*)pPhyNode; - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pFillNode->pTargets, NULL, &num); - - SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval; - pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, - (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); + pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 80276007de..b0325ef8d1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2133,7 +2133,7 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = - tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 77eeb24210..35e153f8c5 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -154,7 +154,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, -1, -1, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -248,7 +248,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 00e07c7199..f21cad2dd6 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -71,7 +71,7 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) { +SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type;