[td-13039] refactor.

This commit is contained in:
Haojun Liao 2022-03-02 10:34:29 +08:00
parent 804e525697
commit 3d12401119
4 changed files with 77 additions and 79 deletions

View File

@ -234,8 +234,8 @@ typedef struct STaskAttr {
} STaskAttr; } STaskAttr;
typedef int32_t (*__optr_open_fn_t)(void* param); typedef int32_t (*__optr_open_fn_t)(void* param);
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef SSDataBlock* (*__optr_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); typedef void (*__optr_close_fn_t)(void* param, int32_t num);
struct SOperatorInfo; struct SOperatorInfo;
@ -306,21 +306,21 @@ enum {
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t prepareFn; __optr_open_fn_t openFn;
__operator_fn_t exec; __optr_fn_t nextDataFn;
__optr_cleanup_fn_t cleanupFn; __optr_close_fn_t closeFn;
} SOperatorInfo; } SOperatorInfo;
typedef struct { typedef struct {
@ -654,8 +654,6 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
// SSDataBlock* doSLimit(void* param, bool* newgroup); // SSDataBlock* doSLimit(void* param, bool* newgroup);
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); // int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);

View File

@ -158,7 +158,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t st = 0; int64_t st = 0;
st = taosGetTimestampUs(); st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); *pRes = pTaskInfo->pRoot->nextDataFn(pTaskInfo->pRoot, &newgroup);
uint64_t el = (taosGetTimestampUs() - st); uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el; pTaskInfo->cost.elapsedTime += el;

View File

@ -5281,7 +5281,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = size; pOperator->numOfOutput = size;
pOperator->exec = doLoadRemoteData; pOperator->nextDataFn = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
#if 1 #if 1
@ -5361,7 +5361,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->exec = doTableScan; pOperator->nextDataFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator; return pOperator;
@ -5386,7 +5386,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doTableScanImpl; pOperator->nextDataFn = doTableScanImpl;
return pOperator; return pOperator;
} }
@ -5410,7 +5410,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; // pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->exec = doBlockInfoScan; pOperator->nextDataFn = doBlockInfoScan;
return pOperator; return pOperator;
} }
@ -5452,7 +5452,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->exec = doStreamBlockScan; pOperator->nextDataFn = doStreamBlockScan;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
return pOperator; return pOperator;
} }
@ -5663,7 +5663,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
bool newgroup = false; bool newgroup = false;
return pOperator->exec(pOperator, &newgroup); return pOperator->nextDataFn(pOperator, &newgroup);
} }
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) {
@ -5983,8 +5983,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortedMerge; pOperator->nextDataFn = doSortedMerge;
pOperator->cleanupFn = destroySortedMergeOperatorInfo; pOperator->closeFn = destroySortedMergeOperatorInfo;
code = appendDownstream(pOperator, downstream, numOfDownstream); code = appendDownstream(pOperator, downstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -6079,8 +6079,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSort; pOperator->nextDataFn = doSort;
pOperator->cleanupFn = destroyOrderOperatorInfo; pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -6105,7 +6105,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6155,7 +6155,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6241,7 +6241,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6299,7 +6299,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6350,7 +6350,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
while (1) { while (1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); SSDataBlock *pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6393,7 +6393,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6453,7 +6453,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6516,7 +6516,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6571,7 +6571,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6706,7 +6706,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -6768,7 +6768,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
@ -6821,7 +6821,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->exec(downstream, newgroup); SSDataBlock* pBlock = downstream->nextDataFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
@ -6906,7 +6906,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) { if (*newgroup) {
@ -6979,8 +6979,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
return; return;
} }
if (pOperator->cleanupFn != NULL) { if (pOperator->closeFn != NULL) {
pOperator->cleanupFn(pOperator->info, pOperator->numOfOutput); pOperator->closeFn(pOperator->info, pOperator->numOfOutput);
} }
if (pOperator->pDownstream != NULL) { if (pOperator->pDownstream != NULL) {
@ -7067,8 +7067,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doAggregate; pOperator->nextDataFn = doAggregate;
pOperator->cleanupFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7164,8 +7164,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->exec = doMultiTableAggregate; pOperator->nextDataFn = doMultiTableAggregate;
pOperator->cleanupFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7194,8 +7194,8 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doProjectOperation; pOperator->nextDataFn = doProjectOperation;
pOperator->cleanupFn = destroyProjectOperatorInfo; pOperator->closeFn = destroyProjectOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7250,10 +7250,10 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->exec = doFilter; pOperator->nextDataFn = doFilter;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanupFn = destroyConditionOperatorInfo; pOperator->closeFn = destroyConditionOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7269,7 +7269,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
// pOperator->operatorType = OP_Limit; // pOperator->operatorType = OP_Limit;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->exec = doLimit; pOperator->nextDataFn = doLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7311,8 +7311,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doIntervalAgg; pOperator->nextDataFn = doIntervalAgg;
pOperator->cleanupFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7336,8 +7336,8 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAllIntervalAgg; pOperator->nextDataFn = doAllIntervalAgg;
pOperator->cleanupFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7360,8 +7360,8 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doStateWindowAgg; pOperator->nextDataFn = doStateWindowAgg;
pOperator->cleanupFn = destroyStateWindowOperatorInfo; pOperator->closeFn = destroyStateWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7385,8 +7385,8 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg; pOperator->nextDataFn = doSessionWindowAgg;
pOperator->cleanupFn = destroySWindowOperatorInfo; pOperator->closeFn = destroySWindowOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7409,8 +7409,8 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableIntervalAgg; pOperator->nextDataFn = doSTableIntervalAgg;
pOperator->cleanupFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7433,8 +7433,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAllSTableIntervalAgg; pOperator->nextDataFn = doAllSTableIntervalAgg;
pOperator->cleanupFn = destroyBasicOperatorInfo; pOperator->closeFn = destroyBasicOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
@ -7465,8 +7465,8 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashGroupbyAggregate; pOperator->nextDataFn = hashGroupbyAggregate;
pOperator->cleanupFn = destroyGroupbyOperatorInfo; pOperator->closeFn = destroyGroupbyOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7504,8 +7504,8 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doFill; pOperator->nextDataFn = doFill;
pOperator->cleanupFn = destroySFillOperatorInfo; pOperator->closeFn = destroySFillOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7553,7 +7553,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
// pOperator->exec = doSLimit; // pOperator->exec = doSLimit;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanupFn = destroySlimitOperatorInfo; pOperator->closeFn = destroySlimitOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -7707,11 +7707,11 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doTagScan; pOperator->nextDataFn = doTagScan;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanupFn = destroyTagScanOperatorInfo; pOperator->closeFn = destroyTagScanOperatorInfo;
return pOperator; return pOperator;
} }
@ -7777,7 +7777,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
while(1) { while(1) {
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); pBlock = pOperator->pDownstream[0]->nextDataFn(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -7849,9 +7849,9 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashDistinct; pOperator->nextDataFn = hashDistinct;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->cleanupFn = destroyDistinctOperatorInfo; pOperator->closeFn = destroyDistinctOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;

View File

@ -201,9 +201,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->name = "dummyInputOpertor4Test"; pOperator->name = "dummyInputOpertor4Test";
if (numOfCols == 1) { if (numOfCols == 1) {
pOperator->exec = getDummyBlock; pOperator->nextDataFn = getDummyBlock;
} else { } else {
pOperator->exec = get2ColsDummyBlock; pOperator->nextDataFn = get2ColsDummyBlock;
} }
SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo));
@ -435,7 +435,7 @@ TEST(testCase, external_sort_Test) {
int64_t s2 = taosGetTimestampUs(); int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1); printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2); pOperator->closeFn(pOperator->info, 2);
tfree(exp); tfree(exp);
tfree(exp1); tfree(exp1);
taosArrayDestroy(pExprInfo); taosArrayDestroy(pExprInfo);
@ -507,7 +507,7 @@ TEST(testCase, sorted_merge_Test) {
int64_t s2 = taosGetTimestampUs(); int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1); printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2); pOperator->closeFn(pOperator->info, 2);
tfree(exp); tfree(exp);
tfree(exp1); tfree(exp1);
taosArrayDestroy(pExprInfo); taosArrayDestroy(pExprInfo);
@ -560,7 +560,7 @@ TEST(testCase, time_interval_Operator_Test) {
while(1) { while(1) {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
pRes = pOperator->exec(pOperator, &newgroup); pRes = pOperator->nextDataFn(pOperator, &newgroup);
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
if (t++ == 1) { if (t++ == 1) {
@ -583,7 +583,7 @@ TEST(testCase, time_interval_Operator_Test) {
int64_t s2 = taosGetTimestampUs(); int64_t s2 = taosGetTimestampUs();
printf("total:%ld\n", s2 - s1); printf("total:%ld\n", s2 - s1);
pOperator->cleanupFn(pOperator->info, 2); pOperator->closeFn(pOperator->info, 2);
tfree(exp); tfree(exp);
tfree(exp1); tfree(exp1);
taosArrayDestroy(pExprInfo); taosArrayDestroy(pExprInfo);