diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0450766535..3f76239ce5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -160,6 +160,7 @@ typedef enum EStreamType { STREAM_PARTITION_DELETE_DATA, STREAM_GET_RESULT, STREAM_DROP_CHILD_TABLE, + STREAM_EVENT_OPEN_WINDOW, } EStreamType; #pragma pack(push, 1) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 48afa78251..04e7884020 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -450,8 +450,10 @@ typedef struct STimeWindowAggSupp { } STimeWindowAggSupp; typedef struct SSteamOpBasicInfo { - int32_t primaryPkIndex; - bool updateOperatorInfo; + int32_t primaryPkIndex; + bool updateOperatorInfo; + SSDataBlock* pEventRes; + SArray* pEventInfo; } SSteamOpBasicInfo; typedef struct SStreamFillSupporter { diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 0a69080314..0c0ea0d6fc 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -57,7 +57,8 @@ typedef struct SSlicePoint { void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); -void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); +int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); +void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo); int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index fa6008eba7..a9a47580dc 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#include "cmdnodes.h" #include "executorInt.h" #include "filter.h" #include "function.h" @@ -53,6 +55,8 @@ void destroyStreamEventOperatorInfo(void* param) { &pInfo->groupResInfo); pInfo->pOperator = NULL; } + + destroyStreamBasicInfo(&pInfo->basic); destroyStreamAggSupporter(&pInfo->streamAggSup); clearGroupResInfo(&pInfo->groupResInfo); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); @@ -121,7 +125,7 @@ void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { } int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, - int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { + int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; @@ -143,6 +147,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin); if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) { pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin); + (*pWinCode) = TSDB_CODE_SUCCESS; goto _end; } } @@ -156,6 +161,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) { setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin); pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin); + (*pWinCode) = TSDB_CODE_SUCCESS; goto _end; } } @@ -163,6 +169,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId}; code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len); QUERY_CHECK_CODE(code, lino, _error); + (*pWinCode) = TSDB_CODE_FAILED; setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin); pCurWin->pWinFlag->startFlag = start; @@ -303,6 +310,14 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey); } +static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) { + void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey); + if (pRes != NULL) { + return TSDB_CODE_SUCCESS; + } + return terrno; +} + static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated, SSHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -373,10 +388,16 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl bool allEqual = true; SEventWindowInfo curWin = {0}; SSessionKey nextWinKey = {0}; + int32_t winCode = TSDB_CODE_SUCCESS; code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, - &nextWinKey); + &nextWinKey, &winCode); QUERY_CHECK_CODE(code, lino, _end); + if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) { + code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin); + QUERY_CHECK_CODE(code, lino, _end); + } + setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); bool rebuild = false; code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, @@ -561,12 +582,42 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { } } +static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + blockDataCleanup(pBasicInfo->pEventRes); + int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo); + code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size); + QUERY_CHECK_CODE(code, lino, _end); + for (int32_t i = 0; i < size; i++) { + SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i); + uint64_t uid = 0; + code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL); + QUERY_CHECK_CODE(code, lino, _end); + } + taosArrayClear(pBasicInfo->pEventInfo); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } +} + + static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SStreamEventAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + buildEventNotifyResult(&pInfo->basic); + if (pInfo->basic.pEventRes->info.rows > 0) { + printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->basic.pEventRes; + return code; + } + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); @@ -957,6 +1008,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno); pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey; + initStreamBasicInfo(&pInfo->basic); pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED, diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c index b94798934c..1e7fbfa446 100644 --- a/source/libs/executor/src/streamexecutorInt.c +++ b/source/libs/executor/src/streamexecutorInt.c @@ -14,6 +14,7 @@ */ #include "executorInt.h" +#include "tdatablock.h" void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) { if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) { @@ -29,7 +30,19 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->updateOperatorInfo = false; } -void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { +int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { pBasicInfo->primaryPkIndex = -1; pBasicInfo->updateOperatorInfo = false; + pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey)); + if (pBasicInfo->pEventInfo == NULL) { + return terrno; + } + return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes); +} + +void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) { + blockDataDestroy(pBasicInfo->pEventRes); + pBasicInfo->pEventRes = NULL; + taosArrayDestroy(pBasicInfo->pEventInfo); + pBasicInfo->pEventInfo = NULL; } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index d038e4d82c..45707e670e 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -651,7 +651,8 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState); - initStreamBasicInfo(&pInfo->basic); + code = initStreamBasicInfo(&pInfo->basic); + QUERY_CHECK_CODE(code, lino, _error); if (downstream) { code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 44004a4c6b..9ec6063486 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -2201,7 +2201,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); - initStreamBasicInfo(&pInfo->basic); + code = initStreamBasicInfo(&pInfo->basic); + QUERY_CHECK_CODE(code, lino, _error); if (downstream) { code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);