Merge pull request #11913 from taosdata/feature/3.0_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
60db2c1c2e
|
@ -27,9 +27,14 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
|
|||
}
|
||||
}
|
||||
|
||||
// todo : to convert data according to SSDatablock
|
||||
static void buildRspData(const STableMeta* pMeta, char* pData) {
|
||||
int32_t* pColSizes = (int32_t*)pData;
|
||||
pData += DESCRIBE_RESULT_COLS * sizeof(int32_t);
|
||||
int32_t* payloadLen = (int32_t*) pData;
|
||||
uint64_t* groupId = (uint64_t*)(pData + sizeof(int32_t));
|
||||
|
||||
int32_t* pColSizes = (int32_t*)(pData + sizeof(int32_t) + sizeof(uint64_t));
|
||||
pData = (char*) pColSizes + DESCRIBE_RESULT_COLS * sizeof(int32_t);
|
||||
|
||||
int32_t numOfRows = TABLE_TOTAL_COL_NUM(pMeta);
|
||||
|
||||
// Field
|
||||
|
@ -79,6 +84,9 @@ static void buildRspData(const STableMeta* pMeta, char* pData) {
|
|||
for (int32_t i = 0; i < DESCRIBE_RESULT_COLS; ++i) {
|
||||
pColSizes[i] = htonl(pColSizes[i]);
|
||||
}
|
||||
|
||||
|
||||
*payloadLen = (int32_t)(pData - (char*)payloadLen);
|
||||
}
|
||||
|
||||
static int32_t calcRspSize(const STableMeta* pMeta) {
|
||||
|
@ -87,7 +95,8 @@ static int32_t calcRspSize(const STableMeta* pMeta) {
|
|||
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_FIELD_LEN) +
|
||||
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_TYPE_LEN) +
|
||||
(BitmapLen(numOfRows) + numOfRows * sizeof(int32_t)) +
|
||||
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN);
|
||||
(numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN) +
|
||||
sizeof(int32_t) + sizeof(uint64_t);
|
||||
}
|
||||
|
||||
static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) {
|
||||
|
|
|
@ -254,6 +254,17 @@ enum {
|
|||
OP_EXEC_DONE = 0x9,
|
||||
};
|
||||
|
||||
typedef struct SOperatorFpSet {
|
||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||
__optr_fn_t getNextFn;
|
||||
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
|
||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||
__optr_close_fn_t closeFn;
|
||||
__optr_encode_fn_t encodeResultRow;
|
||||
__optr_decode_fn_t decodeResultRow;
|
||||
__optr_get_explain_fn_t getExplainFn;
|
||||
} SOperatorFpSet;
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
uint8_t operatorType;
|
||||
bool blockingOptr; // block operator or not
|
||||
|
@ -267,15 +278,7 @@ typedef struct SOperatorInfo {
|
|||
SResultInfo resultInfo;
|
||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||
// todo extract struct
|
||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||
__optr_fn_t getNextFn;
|
||||
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
|
||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||
__optr_close_fn_t closeFn;
|
||||
__optr_encode_fn_t encodeResultRow;
|
||||
__optr_decode_fn_t decodeResultRow;
|
||||
__optr_get_explain_fn_t getExplainFn;
|
||||
SOperatorFpSet fpSet;
|
||||
} SOperatorInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -609,6 +612,10 @@ typedef struct SJoinOperatorInfo {
|
|||
SNode *pOnCondition;
|
||||
} SJoinOperatorInfo;
|
||||
|
||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
||||
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
||||
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
|
||||
|
||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||
|
@ -653,6 +660,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
|||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
|
|
@ -159,7 +159,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
int64_t st = 0;
|
||||
|
||||
st = taosGetTimestampUs();
|
||||
*pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup);
|
||||
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
|
||||
|
||||
uint64_t el = (taosGetTimestampUs() - st);
|
||||
pTaskInfo->cost.elapsedTime += el;
|
||||
|
|
|
@ -226,6 +226,23 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
||||
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
||||
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain) {
|
||||
SOperatorFpSet fpSet = {
|
||||
._openFn = openFn,
|
||||
.getNextFn = nextFn,
|
||||
.getStreamResFn = streamFn,
|
||||
.cleanupFn = cleanup,
|
||||
.closeFn = closeFn,
|
||||
.encodeResultRow = encode,
|
||||
.decodeResultRow = decode,
|
||||
.getExplainFn = explain,
|
||||
};
|
||||
|
||||
return fpSet;
|
||||
}
|
||||
|
||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||
|
||||
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||
|
@ -4081,7 +4098,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
SExchangeInfo* pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4176,9 +4193,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
|
|||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pBlock->info.numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
|
||||
pOperator->getNextFn = doLoadRemoteData;
|
||||
pOperator->closeFn = destroyExchangeOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
#if 1
|
||||
{ // todo refactor
|
||||
|
@ -4289,7 +4306,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
SSDataBlock* loadNextDataBlock(void* param) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||
bool newgroup = false;
|
||||
return pOperator->getNextFn(pOperator, &newgroup);
|
||||
return pOperator->fpSet.getNextFn(pOperator, &newgroup);
|
||||
}
|
||||
|
||||
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
|
||||
|
@ -4586,9 +4603,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
pOperator->pExpr = pExprInfo;
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->getNextFn = doSortedMerge;
|
||||
pOperator->closeFn = destroySortedMergeOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
code = appendDownstream(pOperator, downstream, numOfDownstream);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -4667,8 +4684,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
|||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->getNextFn = doSort;
|
||||
pOperator->closeFn = destroyOrderOperatorInfo;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -4710,7 +4727,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
bool newgroup = true;
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -4765,7 +4782,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
|
|||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5007,7 +5024,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
|
|||
|
||||
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -5070,7 +5087,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -5121,9 +5138,9 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
|||
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||
|
||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||
return pOperator->getStreamResFn(pOperator, newgroup);
|
||||
return pOperator->fpSet.getStreamResFn(pOperator, newgroup);
|
||||
} else {
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -5165,7 +5182,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -5216,7 +5233,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -5265,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -5400,7 +5417,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -5451,7 +5468,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -5531,7 +5548,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup);
|
||||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup);
|
||||
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (*newgroup) {
|
||||
|
@ -5603,8 +5620,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pOperator->closeFn != NULL) {
|
||||
pOperator->closeFn(pOperator->info, pOperator->numOfOutput);
|
||||
if (pOperator->fpSet.closeFn != NULL) {
|
||||
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput);
|
||||
}
|
||||
|
||||
if (pOperator->pDownstream != NULL) {
|
||||
|
@ -5739,12 +5756,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->_openFn = doOpenAggregateOptr;
|
||||
pOperator->getNextFn = getAggregateResult;
|
||||
pOperator->closeFn = destroyAggOperatorInfo;
|
||||
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5871,9 +5885,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = num;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = doProjectOperation;
|
||||
pOperator->closeFn = destroyProjectOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -5929,12 +5943,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = doOpenIntervalAgg;
|
||||
pOperator->getNextFn = doBuildIntervalResult;
|
||||
pOperator->getStreamResFn = doStreamIntervalAgg;
|
||||
pOperator->closeFn = destroyIntervalOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5951,6 +5962,65 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(pOperator, numOfRows);
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
|
||||
pOperator->name = "StreamTimeIntervalAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
||||
}
|
||||
|
||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) {
|
||||
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
||||
|
@ -5969,8 +6039,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->getNextFn = doAllIntervalAgg;
|
||||
pOperator->closeFn = destroyBasicOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -6010,10 +6081,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doStateWindowAgg;
|
||||
pOperator->closeFn = destroyStateWindowOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -6057,10 +6127,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doSessionWindowAgg;
|
||||
pOperator->closeFn = destroySWindowOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -6147,12 +6216,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = doFill;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->closeFn = destroySFillOperatorInfo;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
|
@ -7017,8 +7084,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
(*pRes)[*resNum].startupCost = operatorInfo->cost.openCost;
|
||||
(*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost;
|
||||
|
||||
if (operatorInfo->getExplainFn) {
|
||||
int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
|
||||
if (operatorInfo->fpSet.getExplainFn) {
|
||||
int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
|
||||
if (code) {
|
||||
qError("operator getExplainFn failed, error:%s", tstrerror(code));
|
||||
return code;
|
||||
|
@ -7055,7 +7122,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
|
|||
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||
SOperatorInfo* ds1 = pOperator->pDownstream[0];
|
||||
publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup);
|
||||
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup);
|
||||
publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
pJoinInfo->leftPos = 0;
|
||||
|
@ -7068,7 +7135,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup)
|
|||
if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||
SOperatorInfo* ds2 = pOperator->pDownstream[1];
|
||||
publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup);
|
||||
pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup);
|
||||
publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
pJoinInfo->rightPos = 0;
|
||||
|
@ -7161,9 +7228,9 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
|
|||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->getNextFn = doMergeJoin;
|
||||
pOperator->closeFn = destroyBasicOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||
return pOperator;
|
||||
|
||||
|
|
|
@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -360,12 +360,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashGroupbyAggregate;
|
||||
pOperator->closeFn = destroyGroupOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
|
@ -562,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
|
@ -618,14 +614,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
|
||||
pInfo->binfo.pRes = pResultBlock;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashPartition;
|
||||
pOperator->closeFn = destroyPartitionOperatorInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
|
|
@ -405,7 +405,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->getNextFn = doTableScan;
|
||||
pOperator->fpSet.getNextFn = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
static int32_t cost = 0;
|
||||
|
@ -429,7 +429,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
|
|||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doTableScanImpl;
|
||||
pOperator->fpSet.getNextFn = doTableScanImpl;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
@ -502,8 +502,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
|
|||
// pOperator->operatorType = OP_TableBlockInfoScan;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = doBlockInfoScan;
|
||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
||||
pOperator->fpSet.getNextFn = doBlockInfoScan;
|
||||
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
@ -532,7 +532,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -659,9 +659,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = doStreamBlockScan;
|
||||
pOperator->closeFn = operatorDummyCloseFn;
|
||||
pOperator->fpSet._openFn = operatorDummyOpenFn;
|
||||
pOperator->fpSet.getNextFn = doStreamBlockScan;
|
||||
pOperator->fpSet.closeFn = operatorDummyCloseFn;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
return pOperator;
|
||||
|
@ -981,8 +981,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||
pOperator->getNextFn = doSysTableScan;
|
||||
pOperator->closeFn = destroySysScanOperator;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator,
|
||||
NULL, NULL, NULL);
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
return pOperator;
|
||||
|
@ -1139,11 +1139,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr,
|
|||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doTagScan;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->closeFn = destroyTagScanOperatorInfo;
|
||||
|
||||
return pOperator;
|
||||
_error:
|
||||
|
|
|
@ -199,9 +199,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
|
|||
pOperator->name = "dummyInputOpertor4Test";
|
||||
|
||||
if (numOfCols == 1) {
|
||||
pOperator->getNextFn = getDummyBlock;
|
||||
pOperator->fpSet.getNextFn = getDummyBlock;
|
||||
} else {
|
||||
pOperator->getNextFn = get2ColsDummyBlock;
|
||||
pOperator->fpSet.getNextFn = get2ColsDummyBlock;
|
||||
}
|
||||
|
||||
SDummyInputInfo *pInfo = (SDummyInputInfo*) taosMemoryCalloc(1, sizeof(SDummyInputInfo));
|
||||
|
|
Loading…
Reference in New Issue