From f51c9859142cd825f7743a88af8ef2919a57ee41 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 25 Oct 2023 17:50:16 +0800 Subject: [PATCH] stream event window --- include/libs/executor/storageapi.h | 8 +- include/libs/stream/streamState.h | 3 + include/libs/stream/tstreamFileState.h | 4 +- source/dnode/snode/src/snodeInitApi.c | 1 + source/dnode/vnode/src/vnd/vnodeInitApi.c | 1 + source/libs/executor/inc/executorInt.h | 70 +- source/libs/executor/inc/operator.h | 2 + source/libs/executor/src/operator.c | 2 + source/libs/executor/src/scanoperator.c | 4 +- .../executor/src/streameventwindowoperator.c | 733 ++++++++++++++++++ .../executor/src/streamtimewindowoperator.c | 159 ++-- source/libs/parser/src/parTranslater.c | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 7 +- source/libs/stream/src/streamSessionState.c | 49 +- source/libs/stream/src/streamState.c | 8 + .../stream/distributeIntervalRetrive0.sim | 2 + tests/script/tsim/stream/event0.sim | 322 ++++++++ tests/script/tsim/stream/event1.sim | 49 ++ 19 files changed, 1339 insertions(+), 89 deletions(-) create mode 100644 source/libs/executor/src/streameventwindowoperator.c create mode 100644 tests/script/tsim/stream/event0.sim create mode 100644 tests/script/tsim/stream/event1.sim diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index f5392f02b1..77ff326f55 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -387,11 +387,13 @@ typedef struct SStateStore { int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); + int32_t (*streamStateSessionAllocWinBuffByNextPosition)(SStreamState* pState, SStreamStateCur* pCur, + const SSessionKey* pKey, void** pVal, int32_t* pVLen); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); - TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); - bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); + bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count); void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b7f100733b..d0a2b311ee 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -57,6 +57,8 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionClear(SStreamState* pState); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); +int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, + const SSessionKey* pKey, void** pVal, int32_t* pVLen); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key); @@ -66,6 +68,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); +// fill int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index c1974df7de..2a129c1830 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -76,8 +76,10 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState); int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen); +int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen); int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); +int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, + const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen); SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen); int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 389137f630..570feffc14 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -71,6 +71,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur; pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist; pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange; + pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition; pStore->updateInfoInit = updateInfoInit; pStore->updateInfoFillBlockData = updateInfoFillBlockData; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a6dae429c3..a6673917bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -180,6 +180,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur; pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist; pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange; + pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition; pStore->updateInfoInit = updateInfoInit; pStore->updateInfoFillBlockData = updateInfoFillBlockData; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 288919d709..04c413e941 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -605,6 +605,30 @@ typedef struct SStreamStateAggOperatorInfo { SSDataBlock* pCheckpointRes; } SStreamStateAggOperatorInfo; +typedef struct SStreamEventAggOperatorInfo { + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + int32_t primaryTsIndex; // primary timestamp slot id + STimeWindowAggSupp twAggSup; + SSDataBlock* pDelRes; + SSHashObj* pSeDeleted; + void* pDelIterator; + SArray* pChildren; // cache for children's result; + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + SArray* pUpdated; + SSHashObj* pSeUpdated; + int64_t dataVersion; + bool isHistoryOp; + SArray* historyWins; + bool reCkBlock; + SSDataBlock* pCheckpointRes; + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; +} SStreamEventAggOperatorInfo; + typedef struct SStreamPartitionOperatorInfo { SOptrBasicInfo binfo; SPartitionBySupporter partitionSup; @@ -757,9 +781,51 @@ void doClearBufferedBlocks(SStreamScanInfo* pInfo); uint64_t calcGroupId(char* pData, int32_t len); void streamOpReleaseState(struct SOperatorInfo* pOperator); void streamOpReloadState(struct SOperatorInfo* pOperator); +void destroyStreamAggSupporter(SStreamAggSupporter* pSup); +void clearGroupResInfo(SGroupResInfo* pGroupResInfo); +int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SFunctionStateStore* pStore); +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, + SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, + SStorageAPI* pApi); +void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, + STimeWindowAggSupp* pTwSup); +void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); +void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); +void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey); +void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete); +int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated); +int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed); +int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar); +int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2); +void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins); +int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, + int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, + struct SOperatorInfo* pOperator, int64_t winDelta); +int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo); +int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo); +int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated); +void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key); +void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey); +void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite); +void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock); +void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); +void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, + SResultWindowInfo* pNextWin); +void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, + SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, + SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap); +int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); +void resetWinRange(STimeWindow* winRange); + +int32_t encodeSSessionKey(void** buf, SSessionKey* key); +void* decodeSSessionKey(void* buf, SSessionKey* key); +int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen); +void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen); +int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup); +void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup); -int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup); -void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup); void destroyOperatorParamValue(void* pValues); int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc); int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq); diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 13da9f7238..0858a7e595 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -152,6 +152,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); + SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 6f9aac7595..69a8acb3d7 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -519,6 +519,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT == type) { + pOptr = createStreamEventAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index efbc978323..38f7adf261 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1127,7 +1127,8 @@ static bool isSessionWindow(SStreamScanInfo* pInfo) { } static bool isStateWindow(SStreamScanInfo* pInfo) { - return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE || + pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT; } static bool isIntervalWindow(SStreamScanInfo* pInfo) { @@ -2243,6 +2244,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pSup->pScanBlock); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; + printSpecDataBlock(pInfo->pUpdateRes, getStreamOpName(pOperator->operatorType), "rebuild", GET_TASKID(pTaskInfo)); return pInfo->pUpdateRes; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c new file mode 100644 index 0000000000..a820271705 --- /dev/null +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -0,0 +1,733 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "operator.h" +#include "querytask.h" +#include "tchecksum.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "tglobal.h" +#include "tlog.h" +#include "ttime.h" + +#define IS_FINAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_EVENT) +#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState" +#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint" + +typedef struct SEventWinfowFlag { + bool startFlag; + bool endFlag; +} SEventWinfowFlag; + +typedef struct SEventWindowInfo { + SResultWindowInfo winInfo; + SEventWinfowFlag* pWinFlag; +} SEventWindowInfo; + +void destroyStreamEventOperatorInfo(void* param) { + SStreamEventAggOperatorInfo* pInfo = (SStreamEventAggOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); + destroyStreamAggSupporter(&pInfo->streamAggSup); + clearGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); + if (pInfo->pChildren != NULL) { + int32_t size = taosArrayGetSize(pInfo->pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); + destroyOperator(pChild); + } + taosArrayDestroy(pInfo->pChildren); + } + colDataDestroy(&pInfo->twAggSup.timeWindowData); + blockDataDestroy(pInfo->pDelRes); + tSimpleHashCleanup(pInfo->pSeUpdated); + tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + cleanupGroupResInfo(&pInfo->groupResInfo); + + taosArrayDestroy(pInfo->historyWins); + blockDataDestroy(pInfo->pCheckpointRes); + + if (pInfo->pStartCondInfo != NULL) { + filterFreeInfo(pInfo->pStartCondInfo); + pInfo->pStartCondInfo = NULL; + } + + if (pInfo->pEndCondInfo != NULL) { + filterFreeInfo(pInfo->pEndCondInfo); + pInfo->pEndCondInfo = NULL; + } + + taosMemoryFreeClear(param); +} + +void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo) { + char* pFlagInfo = (char*)pWinInfo->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize); + pWinInfo->pWinFlag = (SEventWinfowFlag*) pFlagInfo; +} + +void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, SEventWindowInfo* pWinInfo) { + pWinInfo->winInfo.sessionWin = *pKey; + pWinInfo->winInfo.pStatePos = pPos; + setEventWindowFlag(pAggSup, pWinInfo); +} + +int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) { + for (int32_t i = start; i < rows; i++) { + if (pEnd[i]) { + return i; + } + } + return -1; +} + +void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t size = pAggSup->resultRowSize; + TSKEY ts = pTs [index]; + bool start = pStart[index]; + bool end = pEnd[index]; + pCurWin->winInfo.sessionWin.groupId = groupId; + pCurWin->winInfo.sessionWin.win.skey = ts; + pCurWin->winInfo.sessionWin.win.ekey = ts; + SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin); + SSessionKey leftWinKey = {0}; + void* pVal = NULL; + int32_t len = 0; + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len); + if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) { + bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0); + setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin); + if(inWin || !pCurWin->pWinFlag->endFlag) { + pCurWin->winInfo.isOutput = true; + goto _end; + } + } + pAggSup->stateStore.streamStateFreeCur(pCur); + pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); + SSessionKey rightWinKey = {0}; + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len); + if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && start && !end) { + int32_t endi = getEndCondIndex(pEnd, index, rows); + if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) { + setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin); + pCurWin->winInfo.isOutput = true; + goto _end; + } + } + + SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId}; + pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len); + setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin); + pCurWin->pWinFlag->startFlag = start; + pCurWin->pWinFlag->endFlag = end; + pCurWin->winInfo.isOutput = false; + +_end: + pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + SET_SESSION_WIN_KEY_INVALID(pNextWinKey); + } + if(pCurWin->winInfo.pStatePos->needFree) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); + } + pAggSup->stateStore.streamStateFreeCur(pCur); + qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, + pCurWin->winInfo.sessionWin.win.ekey); +} + +int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey, + TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows, + SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { + *pRebuild = false; + TSKEY maxTs = INT64_MAX; + STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win; + if (pWinInfo->pWinFlag->endFlag) { + maxTs = pWin->ekey + 1; + } + if (!IS_INVALID_SESSION_WIN_KEY(*pNextWinKey)) { + maxTs = TMIN(maxTs, pNextWinKey->win.skey); + } + + for (int32_t i = start; i < rows; ++i) { + if (pTsData[i] >= maxTs) { + return i - 1 - start; + } + + if (pWin->skey > pTsData[i]) { + if (pStDeleted && pWinInfo->winInfo.isOutput) { + saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin); + } + removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin); + pWin->skey = pTsData[i]; + pWinInfo->pWinFlag->startFlag = starts[i]; + } else if (pWin->skey == pTsData[i]) { + pWinInfo->pWinFlag->startFlag |= starts[i]; + } + + if (pWin->ekey < pTsData[i]) { + pWin->ekey = pTsData[i]; + pWinInfo->pWinFlag->endFlag = ends[i]; + } else if (pWin->ekey == pTsData[i]) { + pWinInfo->pWinFlag->endFlag |= ends[i]; + } + + memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); + + if (ends[i]) { + if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey ) { + *pRebuild = true; + } + return i + 1 - start; + } + } + return rows - start; +} + +static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated, + SSHashObj* pStDeleted, bool addGap) { + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + int32_t winNum = 0; + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + while (1) { + if (!pCurWin->pWinFlag->startFlag || pCurWin->pWinFlag->endFlag) { + break; + } + SEventWindowInfo nextWinInfo = {0}; + getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo); + if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) { + releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); + break; + } + setEventWindowFlag(pAggSup, &nextWinInfo); + compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, pStDeleted, false); + pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag; + winNum++; + } + return winNum; +} + +static bool isWindowIncomplete(SEventWindowInfo* pWinInfo) { + return !(pWinInfo->pWinFlag->startFlag && pWinInfo->pWinFlag->endFlag); +} + +bool doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SSessionKey* pKey) { + pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, pKey); + removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey); + return true; +} + +static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated, + SSHashObj* pStDeleted) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + uint64_t groupId = pSDataBlock->info.id.groupId; + int64_t code = TSDB_CODE_SUCCESS; + TSKEY* tsCols = NULL; + SResultRow* pResult = NULL; + int32_t winRows = 0; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SColumnInfoData* pColStart = NULL; + SColumnInfoData* pColEnd = NULL; + + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; + if (pAggSup->winRange.ekey <= 0) { + pAggSup->winRange.ekey = INT64_MAX; + } + + if (pSDataBlock->pDataBlock != NULL) { + SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + } else { + return; + } + + SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock}; + code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶mStart); + if (code != TSDB_CODE_SUCCESS) { + qError("set data from start slotId error."); + goto _end; + } + int32_t statusStart = 0; + filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart); + + SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock}; + code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶mEnd); + if (code != TSDB_CODE_SUCCESS) { + qError("set data from end slotId error."); + goto _end; + } + + int32_t statusEnd = 0; + filterExecute(pInfo->pEndCondInfo, pSDataBlock, &pColEnd, NULL, paramEnd.numOfCols, &statusEnd); + + int32_t rows = pSDataBlock->info.rows; + blockDataEnsureCapacity(pAggSup->pScanBlock, rows); + for (int32_t i = 0; i < rows; i += winRows) { + if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) { + i++; + continue; + } + int32_t winIndex = 0; + bool allEqual = true; + SEventWindowInfo curWin = {0}; + SSessionKey nextWinKey; + setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey); + setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); + bool rebuild = false; + winRows = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, rows, i, + pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild); + ASSERT(winRows >= 1); + if (rebuild) { + uint64_t uid = 0; + appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, + &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); + tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); + doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin); + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore); + continue; + } + code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, + pOperator, 0); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + } + compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false); + saveSessionOutputBuf(pAggSup, &curWin.winInfo); + + if (isWindowIncomplete(&curWin)) { + continue; + } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + code = saveResult(curWin.winInfo, pSeUpdated); + } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + SSessionKey key = {0}; + getSessionHashKey(&curWin.winInfo.sessionWin, &key); + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); + } + } + +_end: + colDataDestroy(pColStart); + taosMemoryFree(pColStart); + colDataDestroy(pColEnd); + taosMemoryFree(pColEnd); +} + +int32_t doStreamEventEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + void* pData = (buf == NULL) ? NULL : *buf; + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + // 4.checksum + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + + return tlen; +} + +void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return buf; + } + + // 4.checksum + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream event state is invalid"); + return buf; + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SSessionKey key = {0}; + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &key); + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); + return buf; +} + +void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamEventEncodeOpState(&pBuf, len, pOperator); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, + strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); +} + +static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); + if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); + if (pBInfo->pRes->info.rows > 0) { + printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pBInfo->pRes; + } + return NULL; +} + +static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExprSupp* pSup = &pOperator->exprSupp; + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + qDebug("===stream=== stream event agg"); + if (pOperator->status == OP_RES_TO_RETURN) { + SSDataBlock* resBlock = buildEventResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } + + setOperatorCompleted(pOperator); + return NULL; + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (!pInfo->pUpdated) { + pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo)); + } + if (!pInfo->pSeUpdated) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + } + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + + if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || + pBlock->info.type == STREAM_CLEAR) { + deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); + continue; + } else if (pBlock->info.type == STREAM_GET_ALL) { + getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + continue; + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + doStreamEventSaveCheckpoint(pOperator); + pInfo->reCkBlock = true; + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; + } else { + ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); + } + + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + doStreamEventAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); + } + // restore the value + pOperator->status = OP_RES_TO_RETURN; + + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated); + copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc); + removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated); + + if (pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } + + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + SSDataBlock* resBlock = buildEventResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + setOperatorCompleted(pOperator); + return NULL; +} + +void streamEventReleaseState(SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + int32_t resSize = winSize + sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); + memcpy(pBuff, pInfo->historyWins->pData, winSize); + memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); + qDebug("===stream=== event window operator relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME, + strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + taosMemoryFreeClear(pBuff); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } +} + +void streamEventReloadState(SOperatorInfo* pOperator) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + + SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_EVENT_OP_STATE_NAME, + strlen(STREAM_EVENT_OP_STATE_NAME), &pBuf, &size); + int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); + qDebug("===stream=== event window operator reload state. get result count:%d", num); + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; + ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); + + TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts); + + if (!pInfo->pSeUpdated && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); + } + if (!pInfo->pSeDeleted && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + } + for (int32_t i = 0; i < num; i++) { + SEventWindowInfo curInfo = {0}; + qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, + pSeKeyBuf[i].groupId, i); + getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); + setEventWindowFlag(pAggSup, &curInfo); + compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false); + qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, + curInfo.winInfo.sessionWin.groupId); + if (isWindowIncomplete(&curInfo)) { + if (curInfo.winInfo.isOutput) { + ASSERT(0); + saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin); + } + continue; + } + + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveResult(curInfo.winInfo, pInfo->pSeUpdated); + } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) { + saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin); + } + SSessionKey key = {0}; + getSessionHashKey(&curInfo.winInfo.sessionWin, &key); + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); + } + + if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); + } + } + taosMemoryFree(pBuf); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + +SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { + SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode; + int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId; + int32_t code = TSDB_CODE_SUCCESS; + SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + initResultSizeInfo(&pOperator->resultInfo, 4096); + if (pEventNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pEventNode->window.watermark, + .calTrigger = pEventNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + }; + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + + SExprSupp* pExpSup = &pOperator->exprSupp; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, + sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->primaryTsIndex = tsSlotId; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); + pInfo->pDelIterator = NULL; + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + pInfo->pChildren = NULL; + pInfo->ignoreExpiredData = pEventNode->window.igExpired; + pInfo->ignoreExpiredDataSaved = false; + pInfo->pUpdated = NULL; + pInfo->pSeUpdated = NULL; + pInfo->dataVersion = 0; + pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); + if (!pInfo->historyWins) { + goto _error; + } + if (pHandle) { + pInfo->isHistoryOp = pHandle->fillHistory; + } + + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + pInfo->reCkBlock = false; + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME, + strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamEventDecodeOpState(buff, len, pOperator); + taosMemoryFree(buff); + } + + setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED, + pInfo, pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyStreamEventOperatorInfo(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8bfa8e1a5d..167eb4b689 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -146,7 +146,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { return TSDB_CODE_SUCCESS; } -static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { +int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey; return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } @@ -845,6 +845,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore); pResult = (SResultRow*)pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + qError("%s set interval output buff error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } if (IS_FINAL_INTERVAL_OP(pOperator)) { @@ -1073,7 +1074,6 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t dataLen = len - sizeof(uint32_t); void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug qError("stream interval state is invalid"); return; } @@ -1156,7 +1156,7 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return NULL; } -static int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) { +int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) { @@ -1234,7 +1234,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; } - + qInfo("%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -1765,8 +1765,9 @@ int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { return TSDB_CODE_SUCCESS; } -static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) { - key.win.ekey = key.win.skey; +void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { + SSessionKey key = {0}; + getSessionHashKey(pKey, &key); void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey)); if (pVal) { releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore); @@ -1775,12 +1776,12 @@ static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMa tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); } -static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { +void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { *pHashKey = *pKey; pHashKey->win.ekey = pKey->win.skey; } -static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) { +void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; } @@ -1794,7 +1795,7 @@ static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) { } } -static void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) { +void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; } @@ -1823,7 +1824,7 @@ int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* if (pStDeleted && pWinInfo->isOutput) { saveDeleteRes(pStDeleted, pWinInfo->sessionWin); } - removeSessionResult(pAggSup, pStUpdated, pResultRows, pWinInfo->sessionWin); + removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->sessionWin); pWinInfo->sessionWin.win.skey = pStartTs[i]; } pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]); @@ -1845,7 +1846,7 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR return TSDB_CODE_SUCCESS; } -static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, +int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, SOperatorInfo* pOperator, int64_t winDelta) { SExprSupp* pSup = &pOperator->exprSupp; @@ -1867,7 +1868,7 @@ static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKe return true; } -static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) { +int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) { void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey)); if (pVal) { SResultWindowInfo* pWin = pVal; @@ -1891,6 +1892,34 @@ void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, pAggSup->stateStore.streamStateFreeCur(pCur); } +void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, + SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, + SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) { + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pSup->numOfExprs; + initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + SResultRow* pWinResult = NULL; + initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); + pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey); + memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); + + int64_t winDelta = 0; + if (addGap) { + winDelta = pAggSup->gap; + } + updateTimeWindowInfo(&pTwAggSup->timeWindowData, &pCurWin->sessionWin.win, winDelta); + compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pTwAggSup->timeWindowData); + tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); + if (pNextWin->isOutput && pStDeleted) { + qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, + pNextWin->sessionWin.groupId); + saveDeleteRes(pStDeleted, pNextWin->sessionWin); + } + removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, &pNextWin->sessionWin); + doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); + releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore); +} + static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) { SExprSupp* pSup = &pOperator->exprSupp; @@ -1902,7 +1931,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* SResultRow* pCurResult = NULL; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; - initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + // initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); // Just look for the window behind StartIndex while (1) { SResultWindowInfo winInfo = {0}; @@ -1912,23 +1941,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); break; } - SResultRow* pWinResult = NULL; - initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); - pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); - memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); - int64_t winDelta = 0; - if (addGap) { - winDelta = pAggSup->gap; - } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta); - compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey)); - if (winInfo.isOutput && pStDeleted) { - saveDeleteRes(pStDeleted, winInfo.sessionWin); - } - removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); - doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); - releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); + compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, &winInfo, pStUpdated, pStDeleted, true); winNum++; } return winNum; @@ -2014,6 +2027,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, pOperator, winDelta); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + qError("%s do stream session aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap); @@ -2022,6 +2036,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { code = saveResult(winInfo, pStUpdated); if (code != TSDB_CODE_SUCCESS) { + qError("%s do stream session aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } } @@ -2035,7 +2050,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } -static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) { +void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -2057,7 +2072,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc } } -static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) { +inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) { SResultWindowInfo* pWinInfo1 = (SResultWindowInfo*)pKey1; SResultWindowInfo* pWinInfo2 = (SResultWindowInfo*)pKey2; SSessionKey* pWin1 = &pWinInfo1->sessionWin; @@ -2487,8 +2502,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t dataLen = len - sizeof(uint32_t); void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug - qError("stream interval state is invalid"); + qError("stream session state is invalid"); return buf; } } @@ -2546,6 +2560,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (opRes) { return opRes; } + + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); return NULL; } @@ -2612,6 +2633,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0, NULL); if (!pChildOp) { + qError("%s create stream child of final session error", GET_TASKID(pTaskInfo)); T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } taosArrayPush(pInfo->pChildren, &pChildOp); @@ -2876,6 +2898,14 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState); } +void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete) { + SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); + doDeleteTimeWindows(pAggSup, pBlock, pWins); + removeSessionResults(pAggSup, pMapUpdate, pWins); + copyDeleteWindowInfo(pWins, pMapDelete); + taosArrayDestroy(pWins); +} + static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -2902,6 +2932,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } clearFunctionContext(&pOperator->exprSupp); // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); @@ -2930,11 +2965,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { // gap must be 0 - SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); - doDeleteTimeWindows(pAggSup, pBlock, pWins); - removeSessionResults(pAggSup, pInfo->pStUpdated, pWins); - copyDeleteWindowInfo(pWins, pInfo->pStDeleted); - taosArrayDestroy(pWins); + deleteSessionWinState(pAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted); pInfo->clearState = true; break; } else if (pBlock->info.type == STREAM_GET_ALL) { @@ -3227,7 +3258,7 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW if (pSeDeleted && pWinInfo->winInfo.isOutput) { saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin); } - removeSessionResult(pAggSup, pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin); + removeSessionResult(pAggSup, pSeUpdated, pResultRows, &pWinInfo->winInfo.sessionWin); pWinInfo->winInfo.sessionWin.win.skey = pTs[i]; } pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]); @@ -3297,6 +3328,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, pOperator, 0); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + qError("%s do one window aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } saveSessionOutputBuf(pAggSup, &curWin.winInfo); @@ -3304,6 +3336,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { code = saveResult(curWin.winInfo, pSeUpdated); if (code != TSDB_CODE_SUCCESS) { + qError("%s do stream state aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } } @@ -3375,8 +3408,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato int32_t dataLen = len - sizeof(uint32_t); void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug - qError("stream interval state is invalid"); + qError("stream state_window state is invalid"); return buf; } } @@ -3454,6 +3486,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return resBlock; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); return NULL; } @@ -3475,11 +3513,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); - doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins); - removeSessionResults(&pInfo->streamAggSup, pInfo->pSeUpdated, pWins); - copyDeleteWindowInfo(pWins, pInfo->pSeDeleted); - taosArrayDestroy(pWins); + deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); @@ -3488,7 +3522,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); - doStreamSessionSaveCheckpoint(pOperator); + doStreamStateSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; } else { @@ -3550,29 +3584,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur SSHashObj* pStUpdated, SSHashObj* pStDeleted) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - SResultRow* pCurResult = NULL; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; - initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); - SResultRow* pWinResult = NULL; - initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); - pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey); - memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey)); - - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1); - compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); - if (pNextWin->isOutput && pStDeleted) { - qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, - pNextWin->sessionWin.groupId); - saveDeleteRes(pStDeleted, pNextWin->sessionWin); - } - removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); - doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); - releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore); + compactTimeWindow(pSup, &pInfo->streamAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, pNextWin, pStUpdated, pStDeleted, false); } void streamStateReloadState(SOperatorInfo* pOperator) { @@ -3743,12 +3756,6 @@ _error: return NULL; } -static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { - pRes->info.id.groupId = pMiaInfo->groupId; - pMiaInfo->curTs = INT64_MIN; - pMiaInfo->groupId = 0; -} - static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { for (int i = 0; i < num; i++) { if (type == STREAM_INVERT) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 72293e2f8c..d84cdbc032 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6883,7 +6883,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || - crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) { + crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index b34b3420fe..95031505dc 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -122,7 +122,7 @@ int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** p int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); -int32_t streamDefaultIterValid_rocksdb(void* iter); +bool streamDefaultIterValid_rocksdb(void* iter); void streamDefaultIterSeek_rocksdb(void* iter, const char* key); void streamDefaultIterNext_rocksdb(void* iter); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ce4feb38eb..422db0f591 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2850,9 +2850,12 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { pCur->number = pState->number; return pCur; } -int32_t streamDefaultIterValid_rocksdb(void* iter) { +bool streamDefaultIterValid_rocksdb(void* iter) { + if (iter) { + return false; + } SStreamStateCur* pCur = iter; - return rocksdb_iter_valid(pCur->iter) ? 1 : 0; + return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false; } void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { SStreamStateCur* pCur = iter; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index cc96778762..45d656c456 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -72,7 +72,7 @@ bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { return false; } -static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey) { +static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); @@ -80,7 +80,7 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW return pNewPos; } -static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey, int32_t index) { +static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); @@ -270,6 +270,51 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP return TSDB_CODE_SUCCESS; } +int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) { + SRowBuffPos* pNewPos = NULL; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; + if (!ppBuff) { + pWinStates = taosArrayInit(16, POINTER_BYTES); + tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + } else { + pWinStates = (SArray*)(*ppBuff); + } + if (!pCur) { + pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey); + goto _end; + } + + int32_t size = taosArrayGetSize(pWinStates); + if (pCur->buffIndex >= 0) { + if (pCur->buffIndex >= size) { + pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size); + goto _end; + } + pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex); + goto _end; + } else { + if (size > 0) { + SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0); + if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) { + // pCur is invalid + SSessionKey pTmpKey = *pWinKey; + int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen); + ASSERT(code == TSDB_CODE_FAILED); + goto _end; + } + } + pNewPos = getNewRowPosForWrite(pFileState); + pNewPos->needFree = true; + } + +_end: + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); + (*ppVal) = pNewPos; + return TSDB_CODE_SUCCESS; +} + void sessionWinStateClear(SStreamFileState* pFileState) { int32_t buffSize = getRowStateRowSize(pFileState); void* pIte = NULL; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4a056563ee..e9960a0059 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -738,6 +738,14 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void #endif } +int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) { +#ifdef USE_ROCKSDB + return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen); +#else + return TSDB_CODE_FAILED; +#endif +} + int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen); diff --git a/tests/script/tsim/stream/distributeIntervalRetrive0.sim b/tests/script/tsim/stream/distributeIntervalRetrive0.sim index 052bf441d5..194b5b1ae5 100644 --- a/tests/script/tsim/stream/distributeIntervalRetrive0.sim +++ b/tests/script/tsim/stream/distributeIntervalRetrive0.sim @@ -267,6 +267,8 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 delete_mark 20s into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ; +sleep 1000 + sql insert into t1 values(1648791211000,1,2,3); sql insert into t1 values(1262275200000,2,2,3); diff --git a/tests/script/tsim/stream/event0.sim b/tests/script/tsim/stream/event0.sim new file mode 100644 index 0000000000..57116bbc79 --- /dev/null +++ b/tests/script/tsim/stream/event0.sim @@ -0,0 +1,322 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with a = 9; +sleep 1000 +sql insert into t1 values(1648791213000,0,1,1,1.0); +sql insert into t1 values(1648791223001,9,2,2,1.1); +sql insert into t1 values(1648791213009,0,3,3,1.0); + +$loop_count = 0 +loop0: + +sleep 300 +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop0 +endi + +if $data02 != 6 then + print ======data02=$data02 + goto loop0 +endi + +if $data03 != 3 then + print ======data03=$data03 + goto loop0 +endi + +sql insert into t1 values(1648791243006,1,1,1,1.1); +sql insert into t1 values(1648791253000,2,2,2,1.1); + + +$loop_count = 0 +loop1: + +sleep 300 +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +sql insert into t1 values(1648791243000,0,3,3,1.1); + +$loop_count = 0 +loop2: + +sleep 300 +print 3 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop2 +endi + +sql insert into t1 values(1648791253009,9,4,4,1.1); + +$loop_count = 0 +loop3: + +sleep 300 +print 4 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 2 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop3 +endi + +if $data02 != 6 then + print ======data02=$data02 + goto loop3 +endi + +if $data03 != 3 then + print ======data03=$data03 + goto loop3 +endi + +# row 1 +if $data11 != 4 then + print ======data11=$data11 + goto loop3 +endi + +if $data12 != 10 then + print ======data12=$data12 + goto loop3 +endi + +if $data13 != 4 then + print ======data13=$data13 + goto loop3 +endi + +print step2 +print =============== create database test2 +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9; +sleep 1000 +sql insert into t1 values(1648791213000,0,1,1,1.0); +sql insert into t1 values(1648791213009,1,2,2,2.1); +sql insert into t1 values(1648791223000,0,9,9,9.0); + + +sql insert into t1 values(1648791233000,0,9,9,9.0); + + +$loop_count = 0 +loop4: + +sleep 300 +print sql select * from streamt2; +sql select * from streamt2; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 2 then + print ======rows=$rows + goto loop4 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop4 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop4 +endi + +print step3 +print =============== create database test3 +sql create database test3 vgroups 1; +sql use test3; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9; +sleep 1000 + +sql insert into t1 values(1648791233009,1,2,2,2.1); + +sql insert into t1 values(1648791233000,0,1,1,1.0); +sql insert into t1 values(1648791243000,0,9,9,9.0); + +$loop_count = 0 +loop5: + +sleep 300 +print 1 sql select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop5 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop5 +endi + +sql insert into t1 values(1648791223000,0,9,9,9.0); + +$loop_count = 0 +loop6: + +sleep 300 +print 2 sql select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print ======rows=$rows + goto loop6 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop6 +endi + +# row 1 +if $data11 != 3 then + print ======data11=$data11 + goto loop6 +endi + +sql insert into t1 values(1648791213000,0,1,1,1.0); + +sleep 300 + +sql insert into t1 values(1648791213001,1,9,9,9.0); + +$loop_count = 0 +loop7: + +sleep 300 +print 3 sql select * from streamt3; +sql select * from streamt3; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 2 then + print ======data01=$data01 + goto loop7 +endi + +# row 1 +if $data11 != 1 then + print ======data11=$data11 + goto loop7 +endi + +# row 2 +if $data21 != 3 then + print ======data21=$data21 + goto loop7 +endi + +print event0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/event1.sim b/tests/script/tsim/stream/event1.sim new file mode 100644 index 0000000000..d84cc9c5d3 --- /dev/null +++ b/tests/script/tsim/stream/event1.sim @@ -0,0 +1,49 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database test1 +sql create database test1 vgroups 1; +sql use test1; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9; +sleep 1000 + +sql insert into t1 values(1648791233000,0,1,1,1.0); +sql insert into t1 values(1648791243000,1,9,9,9.0); + + +sql insert into t1 values(1648791223000,3,3,3,3.0); + +$loop_count = 0 +loop0: + +sleep 300 +print 1 sql select * from streamt1; +sql select * from streamt1; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print ======data01=$data01 + goto loop0 +endi + +print event1 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT