fix(query): support scalar function before applying interval aggregate.
This commit is contained in:
parent
916adb82c6
commit
ff72e66dba
|
@ -400,7 +400,7 @@ typedef struct SIntervalAggOperatorInfo {
|
||||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||||
SOptrBasicInfo binfo; // basic info
|
SOptrBasicInfo binfo; // basic info
|
||||||
SAggSupporter aggSup; // aggregate supporter
|
SAggSupporter aggSup; // aggregate supporter
|
||||||
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||||
SInterval interval; // interval info
|
SInterval interval; // interval info
|
||||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||||
|
@ -730,7 +730,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream);
|
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
|
||||||
|
|
||||||
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
|
|
|
@ -4216,8 +4216,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
|
||||||
pOptr =
|
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
|
||||||
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
|
pTaskInfo, isStream);
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
|
||||||
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
|
SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
|
||||||
|
|
|
@ -448,7 +448,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
}
|
}
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
@ -467,7 +467,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pTableScanInfo->scanTimes < total) {
|
while (pTableScanInfo->scanTimes < total) {
|
||||||
|
@ -492,7 +492,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
}
|
}
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
|
|
|
@ -956,6 +956,12 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
||||||
|
|
||||||
|
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||||
|
SExprSupp* pExprSup =& pInfo->scalarSupp;
|
||||||
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx,
|
||||||
|
pExprSup->numOfExprs, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true);
|
||||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
|
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
|
||||||
|
@ -1324,6 +1330,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||||
|
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||||
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx,
|
||||||
|
pExprSup->numOfExprs, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||||
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
|
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
|
||||||
NULL);
|
NULL);
|
||||||
|
@ -1446,18 +1458,27 @@ void increaseTs(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) {
|
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream) {
|
||||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->win = pTaskInfo->window;
|
pInfo->win = pTaskInfo->window;
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
pInfo->execModel = pTaskInfo->execModel;
|
pInfo->execModel = pTaskInfo->execModel;
|
||||||
pInfo->twAggSup = *pTwAggSupp;
|
pInfo->twAggSup = *pTwAggSupp;
|
||||||
|
|
||||||
|
if (pPhyNode->window.pExprs != NULL) {
|
||||||
|
int32_t numOfScalar = 0;
|
||||||
|
SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||||
|
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue