diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 4d4ac6c1e4..621ea7b7fc 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -27,9 +27,14 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { } } +// todo : to convert data according to SSDatablock static void buildRspData(const STableMeta* pMeta, char* pData) { - int32_t* pColSizes = (int32_t*)pData; - pData += DESCRIBE_RESULT_COLS * sizeof(int32_t); + int32_t* payloadLen = (int32_t*) pData; + uint64_t* groupId = (uint64_t*)(pData + sizeof(int32_t)); + + int32_t* pColSizes = (int32_t*)(pData + sizeof(int32_t) + sizeof(uint64_t)); + pData = (char*) pColSizes + DESCRIBE_RESULT_COLS * sizeof(int32_t); + int32_t numOfRows = TABLE_TOTAL_COL_NUM(pMeta); // Field @@ -79,6 +84,9 @@ static void buildRspData(const STableMeta* pMeta, char* pData) { for (int32_t i = 0; i < DESCRIBE_RESULT_COLS; ++i) { pColSizes[i] = htonl(pColSizes[i]); } + + + *payloadLen = (int32_t)(pData - (char*)payloadLen); } static int32_t calcRspSize(const STableMeta* pMeta) { @@ -87,7 +95,8 @@ static int32_t calcRspSize(const STableMeta* pMeta) { (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_FIELD_LEN) + (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_TYPE_LEN) + (BitmapLen(numOfRows) + numOfRows * sizeof(int32_t)) + - (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN); + (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN) + + sizeof(int32_t) + sizeof(uint64_t); } static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 00ec92465f..b841224fe4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -254,6 +254,17 @@ enum { OP_EXEC_DONE = 0x9, }; +typedef struct SOperatorFpSet { + __optr_open_fn_t _openFn; // DO NOT invoke this function directly + __optr_fn_t getNextFn; + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_close_fn_t closeFn; + __optr_encode_fn_t encodeResultRow; + __optr_decode_fn_t decodeResultRow; + __optr_get_explain_fn_t getExplainFn; +} SOperatorFpSet; + typedef struct SOperatorInfo { uint8_t operatorType; bool blockingOptr; // block operator or not @@ -267,15 +278,7 @@ typedef struct SOperatorInfo { SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - // todo extract struct - __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP - __optr_close_fn_t closeFn; - __optr_encode_fn_t encodeResultRow; - __optr_decode_fn_t decodeResultRow; - __optr_get_explain_fn_t getExplainFn; + SOperatorFpSet fpSet; } SOperatorInfo; typedef struct { @@ -609,6 +612,10 @@ typedef struct SJoinOperatorInfo { SNode *pOnCondition; } SJoinOperatorInfo; +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_get_explain_fn_t explain); + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -653,6 +660,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 5cbda90733..516afe5553 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -159,7 +159,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3541013015..318d87e100 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -226,6 +226,23 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_get_explain_fn_t explain) { + SOperatorFpSet fpSet = { + ._openFn = openFn, + .getNextFn = nextFn, + .getStreamResFn = streamFn, + .cleanupFn = cleanup, + .closeFn = closeFn, + .encodeResultRow = encode, + .decodeResultRow = decode, + .getExplainFn = explain, + }; + + return fpSet; +} + void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, @@ -4081,7 +4098,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4176,9 +4193,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->info = pInfo; pOperator->numOfOutput = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. - pOperator->getNextFn = doLoadRemoteData; - pOperator->closeFn = destroyExchangeOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, + NULL, NULL, NULL); #if 1 { // todo refactor @@ -4289,7 +4306,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); + return pOperator->fpSet.getNextFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { @@ -4586,9 +4603,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSortedMerge; - pOperator->closeFn = destroySortedMergeOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo, + NULL, NULL, NULL); code = appendDownstream(pOperator, downstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4667,8 +4684,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -4710,7 +4727,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { bool newgroup = true; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -4765,7 +4782,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -5007,7 +5024,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5070,7 +5087,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5121,9 +5138,9 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro SSDataBlock* pBlock = pInfo->binfo.pRes; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - return pOperator->getStreamResFn(pOperator, newgroup); + return pOperator->fpSet.getStreamResFn(pOperator, newgroup); } else { - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -5165,7 +5182,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5216,7 +5233,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5265,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5400,7 +5417,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5451,7 +5468,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5531,7 +5548,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup); + SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup); publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -5603,8 +5620,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } - if (pOperator->closeFn != NULL) { - pOperator->closeFn(pOperator->info, pOperator->numOfOutput); + if (pOperator->fpSet.closeFn != NULL) { + pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput); } if (pOperator->pDownstream != NULL) { @@ -5739,12 +5756,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = doOpenAggregateOptr; - pOperator->getNextFn = getAggregateResult; - pOperator->closeFn = destroyAggOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5871,9 +5885,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = num; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doProjectOperation; - pOperator->closeFn = destroyProjectOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, + NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -5929,12 +5943,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doBuildIntervalResult; - pOperator->getStreamResFn = doStreamIntervalAgg; - pOperator->closeFn = destroyIntervalOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5951,6 +5962,65 @@ _error: return NULL; } +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = OPTR_EXEC_MODEL_STREAM; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; + pInfo->primaryTsIndex = primaryTsSlotId; + + int32_t numOfRows = 4096; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + goto _error; + } + + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "StreamTimeIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + destroyIntervalOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; + +} + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); @@ -5969,8 +6039,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doAllIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6010,10 +6081,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->getNextFn = doStateWindowAgg; - pOperator->closeFn = destroyStateWindowOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6057,10 +6127,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = doSessionWindowAgg; - pOperator->closeFn = destroySWindowOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -6147,12 +6216,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doFill; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, + NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; - - pOperator->closeFn = destroySFillOperatorInfo; - code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7017,8 +7084,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo (*pRes)[*resNum].startupCost = operatorInfo->cost.openCost; (*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost; - if (operatorInfo->getExplainFn) { - int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo); + if (operatorInfo->fpSet.getExplainFn) { + int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo); if (code) { qError("operator getExplainFn failed, error:%s", tstrerror(code)); return code; @@ -7055,7 +7122,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup); + pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup); publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->leftPos = 0; @@ -7068,7 +7135,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { SOperatorInfo* ds2 = pOperator->pDownstream[1]; publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup); + pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup); publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->rightPos = 0; @@ -7161,9 +7228,9 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doMergeJoin; - pOperator->closeFn = destroyBasicOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); return pOperator; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 27c616498e..beee11ec18 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -360,12 +360,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -562,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -618,14 +614,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pInfo->binfo.pRes = pResultBlock; pOperator->numOfOutput = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashPartition; - pOperator->closeFn = destroyPartitionOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, + NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 277ab9bc6f..f4eee01007 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -405,7 +405,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; + pOperator->fpSet.getNextFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; static int32_t cost = 0; @@ -429,7 +429,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->getNextFn = doTableScanImpl; + pOperator->fpSet.getNextFn = doTableScanImpl; return pOperator; } @@ -502,8 +502,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doBlockInfoScan; + pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->fpSet.getNextFn = doBlockInfoScan; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -532,7 +532,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -659,9 +659,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; + pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->fpSet.getNextFn = doStreamBlockScan; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -981,8 +981,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; - pOperator->closeFn = destroySysScanOperator; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, + NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -1139,11 +1139,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->getNextFn = doTagScan; + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL); pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; - pOperator->closeFn = destroyTagScanOperatorInfo; return pOperator; _error: diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 543df30686..de8df7b916 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -199,9 +199,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->getNextFn = getDummyBlock; + pOperator->fpSet.getNextFn = getDummyBlock; } else { - pOperator->getNextFn = get2ColsDummyBlock; + pOperator->fpSet.getNextFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) taosMemoryCalloc(1, sizeof(SDummyInputInfo));