Merge pull request #17248 from taosdata/feature/3_liaohj
enh(query): support scalar expressions in the state window aggregate.
This commit is contained in:
commit
9eab350e90
|
@ -818,6 +818,7 @@ typedef struct SStateWindowOperatorInfo {
|
|||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
SExprSupp scalarSup;
|
||||
|
||||
SGroupResInfo groupResInfo;
|
||||
SWindowRowsSup winSup;
|
||||
|
|
|
@ -366,7 +366,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
}
|
||||
|
||||
// set the tag value for final result
|
||||
// setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
|
||||
SInterval* pInterval = &pFillInfo->interval;
|
||||
pFillInfo->currentKey =
|
||||
taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision);
|
||||
|
@ -523,14 +522,6 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order) {
|
||||
if (pFillInfo == NULL || (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC)) {
|
||||
return;
|
||||
}
|
||||
|
||||
pFillInfo->order = order;
|
||||
}
|
||||
|
||||
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
|
||||
if (pFillInfo->type == TSDB_FILL_NONE) {
|
||||
return;
|
||||
|
|
|
@ -1218,37 +1218,15 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
pBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
|
||||
if (OPTR_IS_OPENED(pOperator)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStateWindowOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||
|
||||
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
|
@ -1262,13 +1240,40 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
|
||||
pInfo->scalarSup.numOfExprs, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
}
|
||||
|
||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||
}
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SStateWindowOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
@ -1284,6 +1289,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
|
||||
}
|
||||
|
@ -1716,6 +1722,7 @@ static void destroyStateWindowOperatorInfo(void* param) {
|
|||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
taosMemoryFreeClear(pInfo->stateKey.pData);
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -2747,6 +2754,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
|||
|
||||
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
|
||||
|
||||
if (pStateNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
|
||||
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
|
||||
pInfo->stateKey.type = pInfo->stateCol.type;
|
||||
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
|
||||
|
@ -2769,7 +2785,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
|||
|
||||
pInfo->twAggSup =
|
||||
(STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};
|
||||
;
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->tsSlotId = tsSlotId;
|
||||
|
@ -2780,7 +2796,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
|
||||
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
|
||||
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
Loading…
Reference in New Issue