From 9f6a4b3ddb8ae172a5fff0cd82785ecbc4a54a69 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 21 Jun 2022 20:51:51 +0800 Subject: [PATCH] feat(stream): add delete type --- include/common/tcommon.h | 1 + source/libs/executor/src/timewindowoperator.c | 2 ++ 2 files changed, 3 insertions(+) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 60bace3f73..73ba0f47b8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -45,6 +45,7 @@ typedef enum EStreamType { STREAM_REPROCESS, STREAM_INVALID, STREAM_GET_ALL, + STREAM_DELETE, } EStreamType; typedef struct { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ad72fd26af..7b9b56fb73 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2578,6 +2578,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); + pInfo->pDelRes->info.type = STREAM_DELETE; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL; pInfo->isFinal = false; @@ -3650,6 +3651,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; pInfo->pDelRes = createOneDataBlock(pResBlock, false); + pInfo->pDelRes->info.type = STREAM_DELETE; blockDataEnsureCapacity(pInfo->pDelRes, 64); pInfo->pChildren = NULL;