fix: add filter to session/state/interval operator
This commit is contained in:
parent
eef1fc035e
commit
90b3912061
|
@ -586,6 +586,7 @@ typedef struct SSessionAggOperatorInfo {
|
|||
int64_t gap; // session window gap
|
||||
int32_t tsSlotId; // primary timestamp slot id
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SNode *pCondition;
|
||||
} SSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SResultWindowInfo {
|
||||
|
@ -646,6 +647,7 @@ typedef struct SStateWindowOperatorInfo {
|
|||
int32_t tsSlotId; // primary timestamp column slot id
|
||||
STimeWindowAggSupp twAggSup;
|
||||
// bool reptScan;
|
||||
const SNode *pCondition;
|
||||
} SStateWindowOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -803,7 +805,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
|||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -817,7 +819,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
|
||||
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
|
|
@ -4452,7 +4452,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
|
||||
pOptr =
|
||||
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
||||
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
|
||||
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
|
||||
|
@ -4474,7 +4474,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
|
||||
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
|
||||
SColumn col = extractColumnFromColumnNode(pColNode);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
|
||||
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
|
||||
|
|
|
@ -302,14 +302,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pRes);
|
||||
|
||||
size_t rows = pRes->info.rows;
|
||||
if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
return (pRes->info.rows == 0)? NULL:pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -1146,13 +1146,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes);
|
||||
|
||||
return pBInfo->pRes;
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
@ -1178,15 +1187,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes);
|
||||
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pBInfo->pRes->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
return (rows == 0) ? NULL : pBInfo->pRes;
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||
|
@ -1880,12 +1896,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes);
|
||||
|
||||
return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL;
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -1914,15 +1940,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBInfo->pRes);
|
||||
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pBInfo->pRes->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
return (rows == 0) ? NULL : pBInfo->pRes;
|
||||
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
|
||||
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
|
||||
}
|
||||
|
||||
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
|
@ -2235,7 +2268,7 @@ _error:
|
|||
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
|
||||
SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) {
|
||||
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -2246,6 +2279,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
pInfo->stateKey.type = pInfo->stateCol.type;
|
||||
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
|
||||
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
|
||||
pInfo->pCondition = pCondition;
|
||||
if (pInfo->stateKey.pData == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -2289,7 +2323,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -2315,6 +2349,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->winSup.prevTs = INT64_MIN;
|
||||
pInfo->reptScan = false;
|
||||
pInfo->pCondition = pCondition;
|
||||
pOperator->name = "SessionWindowAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
|
||||
pOperator->blocking = true;
|
||||
|
|
|
@ -890,7 +890,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
if (NULL == pLogicNode->pConditions ||
|
||||
OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -921,7 +921,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
|
|||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||
code = pushDownCondOptDealLogicNode(pCxt, pLogicNode);
|
||||
code = pushDownCondOptTrivialPushDown(pCxt, pLogicNode);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue