add stream event notify

This commit is contained in:
54liuyao 2024-12-26 14:33:14 +08:00 committed by Jinqing Kuang
parent 86f1c843d4
commit f3b38edb98
7 changed files with 79 additions and 8 deletions

View File

@ -160,6 +160,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
STREAM_EVENT_OPEN_WINDOW,
} EStreamType;
#pragma pack(push, 1)

View File

@ -450,8 +450,10 @@ typedef struct STimeWindowAggSupp {
} STimeWindowAggSupp;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
bool updateOperatorInfo;
int32_t primaryPkIndex;
bool updateOperatorInfo;
SSDataBlock* pEventRes;
SArray* pEventInfo;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {

View File

@ -57,7 +57,8 @@ typedef struct SSlicePoint {
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption);
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins);

View File

@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
@ -53,6 +55,8 @@ void destroyStreamEventOperatorInfo(void* param) {
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@ -121,7 +125,7 @@ void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
}
int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd,
int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
@ -143,6 +147,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) {
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
(*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@ -156,6 +161,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
(*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@ -163,6 +169,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
QUERY_CHECK_CODE(code, lino, _error);
(*pWinCode) = TSDB_CODE_FAILED;
setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
pCurWin->pWinFlag->startFlag = start;
@ -303,6 +310,14 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
if (pRes != NULL) {
return TSDB_CODE_SUCCESS;
}
return terrno;
}
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -373,10 +388,16 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
bool allEqual = true;
SEventWindowInfo curWin = {0};
SSessionKey nextWinKey = {0};
int32_t winCode = TSDB_CODE_SUCCESS;
code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
&nextWinKey);
&nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
QUERY_CHECK_CODE(code, lino, _end);
}
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
@ -561,12 +582,42 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
blockDataCleanup(pBasicInfo->pEventRes);
int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
uint64_t uid = 0;
code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pBasicInfo->pEventInfo);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
}
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
buildEventNotifyResult(&pInfo->basic);
if (pInfo->basic.pEventRes->info.rows > 0) {
printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->basic.pEventRes;
return code;
}
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@ -957,6 +1008,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey;
initStreamBasicInfo(&pInfo->basic);
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,

View File

@ -14,6 +14,7 @@
*/
#include "executorInt.h"
#include "tdatablock.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
@ -29,7 +30,19 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
if (pBasicInfo->pEventInfo == NULL) {
return terrno;
}
return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
}
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
blockDataDestroy(pBasicInfo->pEventRes);
pBasicInfo->pEventRes = NULL;
taosArrayDestroy(pBasicInfo->pEventInfo);
pBasicInfo->pEventInfo = NULL;
}

View File

@ -651,7 +651,8 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
initStreamBasicInfo(&pInfo->basic);
code = initStreamBasicInfo(&pInfo->basic);
QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);

View File

@ -2201,7 +2201,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
initStreamBasicInfo(&pInfo->basic);
code = initStreamBasicInfo(&pInfo->basic);
QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);