stream count window

This commit is contained in:
54liuyao 2024-02-05 16:01:46 +08:00
parent c588770699
commit 891249191e
3 changed files with 4 additions and 2 deletions

View File

@ -66,7 +66,7 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
// 1.如果group id发生变化获取新group id上一次的window的缓存并把旧group id的信息存入缓存。
// todo(liuyao) 1.如果group id发生变化获取新group id上一次的window的缓存并把旧group id的信息存入缓存。
// 2.计算 当前需要合并的行数
// 3.做聚集计算。
// 4.达到行数将结果存入pInfo->res中。

View File

@ -541,6 +541,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT == type) {
pOptr = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
pOptr = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else {
terrno = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = terrno;

View File

@ -1284,7 +1284,7 @@ static bool isSlidingWindow(SStreamScanInfo* pInfo) {
}
static bool isCountSlidingWindow(SStreamScanInfo* pInfo) {
return pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding;
return pInfo->windowSup.pStreamAggSup && (pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding);
}
static bool isCountWindow(SStreamScanInfo* pInfo) {