enh(query): add a new operator.
This commit is contained in:
parent
3e5ab6b54c
commit
0244c4ff94
|
@ -493,11 +493,11 @@ typedef struct SAggOperatorInfo {
|
||||||
|
|
||||||
typedef struct SProjectOperatorInfo {
|
typedef struct SProjectOperatorInfo {
|
||||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo; // todo remove it
|
||||||
SAggSupporter aggSup;
|
SAggSupporter aggSup; // todo remove it
|
||||||
|
|
||||||
SSDataBlock* existDataBlock;
|
SSDataBlock* existDataBlock; // todo remove it
|
||||||
SArray* pPseudoColInfo;
|
SArray* pPseudoColInfo; // todo remove it
|
||||||
SLimit limit;
|
SLimit limit;
|
||||||
SLimit slimit;
|
SLimit slimit;
|
||||||
|
|
||||||
|
@ -509,6 +509,17 @@ typedef struct SProjectOperatorInfo {
|
||||||
int64_t curOutput;
|
int64_t curOutput;
|
||||||
} SProjectOperatorInfo;
|
} SProjectOperatorInfo;
|
||||||
|
|
||||||
|
typedef struct SIndefOperatorInfo {
|
||||||
|
SOptrBasicInfo binfo;
|
||||||
|
SAggSupporter aggSup;
|
||||||
|
SArray* pPseudoColInfo;
|
||||||
|
|
||||||
|
SExprInfo* pScalarExpr;
|
||||||
|
int32_t numOfScalarExpr;
|
||||||
|
SqlFunctionCtx* pScalarCtx;
|
||||||
|
int32_t* rowCellInfoOffset;
|
||||||
|
} SIndefOperatorInfo;
|
||||||
|
|
||||||
typedef struct SFillOperatorInfo {
|
typedef struct SFillOperatorInfo {
|
||||||
struct SFillInfo* pFillInfo;
|
struct SFillInfo* pFillInfo;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
|
@ -731,6 +742,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
|
||||||
|
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
|
|
@ -3750,18 +3750,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
// Return result of the previous group in the firstly.
|
|
||||||
if (false) {
|
|
||||||
if (pRes->info.rows > 0) {
|
|
||||||
pProjectInfo->existDataBlock = pBlock;
|
|
||||||
break;
|
|
||||||
} else { // init output buffer for a new group data
|
|
||||||
initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -3917,6 +3905,17 @@ static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||||
|
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||||
|
SExprInfo* pExprInfo = &pExpr[i];
|
||||||
|
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
||||||
|
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pExprInfo->base.pParam);
|
||||||
|
taosMemoryFree(pExprInfo->pExpr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
if (pOperator == NULL) {
|
if (pOperator == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -3936,14 +3935,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->pExpr != NULL) {
|
if (pOperator->pExpr != NULL) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
destroyExprInfo(pOperator->pExpr, pOperator->numOfExprs);
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
|
||||||
if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
|
|
||||||
taosMemoryFree(pExprInfo->base.pParam[0].pCol);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pExprInfo->base.pParam);
|
|
||||||
taosMemoryFree(pExprInfo->pExpr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pOperator->pExpr);
|
taosMemoryFreeClear(pOperator->pExpr);
|
||||||
|
@ -4132,6 +4124,19 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosArrayDestroy(pInfo->pPseudoColInfo);
|
taosArrayDestroy(pInfo->pPseudoColInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*) param;
|
||||||
|
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||||
|
|
||||||
|
taosArrayDestroy(pInfo->pPseudoColInfo);
|
||||||
|
cleanupAggSup(&pInfo->aggSup);
|
||||||
|
|
||||||
|
destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput);
|
||||||
|
destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr);
|
||||||
|
|
||||||
|
taosMemoryFree(pInfo->rowCellInfoOffset);
|
||||||
|
}
|
||||||
|
|
||||||
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
|
||||||
taosArrayDestroy(pExInfo->pSources);
|
taosArrayDestroy(pExInfo->pSources);
|
||||||
|
@ -4209,6 +4214,133 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||||
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||||
|
|
||||||
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t st = 0;
|
||||||
|
int32_t order = 0;
|
||||||
|
int32_t scanFlag = 0;
|
||||||
|
|
||||||
|
if (pOperator->cost.openCost == 0) {
|
||||||
|
st = taosGetTimestampUs();
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// The downstream exec may change the value of the newgroup, so use a local variable instead.
|
||||||
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
|
if (pIndefInfo->pScalarExpr != NULL) {
|
||||||
|
code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx,
|
||||||
|
pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
|
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pIndefInfo->pPseudoColInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rows = pInfo->pRes->info.rows;
|
||||||
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
|
||||||
|
if (pOperator->cost.openCost == 0) {
|
||||||
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (rows > 0) ? pInfo->pRes : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo) {
|
||||||
|
SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||||
|
|
||||||
|
int32_t numOfExpr = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr);
|
||||||
|
|
||||||
|
int32_t numOfScalarExpr = 0;
|
||||||
|
if (pPhyNode->pExprs != NULL) {
|
||||||
|
pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr);
|
||||||
|
pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);;
|
||||||
|
|
||||||
|
int32_t numOfRows = 4096;
|
||||||
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
|
||||||
|
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
|
||||||
|
int32_t TWOMB = 2 * 1024 * 1024;
|
||||||
|
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
|
||||||
|
numOfRows = TWOMB / pResBlock->info.rowSize;
|
||||||
|
}
|
||||||
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
|
||||||
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
|
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo);
|
||||||
|
|
||||||
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
pInfo->numOfScalarExpr = numOfScalarExpr;
|
||||||
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr);
|
||||||
|
|
||||||
|
pOperator->name = "IndefinitOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
|
pOperator->blocking = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->pExpr = pExprInfo;
|
||||||
|
pOperator->numOfExprs = numOfExpr;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||||
|
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
|
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
|
||||||
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
|
||||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
|
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
|
||||||
|
@ -4710,6 +4842,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
|
SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval;
|
||||||
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode,
|
pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode,
|
||||||
(SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
|
(SNodeListNode*)pFillNode->pValues, false, pTaskInfo);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
||||||
|
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue