stream event window
This commit is contained in:
parent
3ff331b8b7
commit
f51c985914
|
@ -387,11 +387,13 @@ typedef struct SStateStore {
|
||||||
int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||||
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
||||||
|
int32_t (*streamStateSessionAllocWinBuffByNextPosition)(SStreamState* pState, SStreamStateCur* pCur,
|
||||||
|
const SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
||||||
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
|
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
|
||||||
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
||||||
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
|
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
|
||||||
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
|
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
|
||||||
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
|
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
|
||||||
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
|
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
|
||||||
|
|
|
@ -57,6 +57,8 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
|
||||||
int32_t streamStateSessionClear(SStreamState* pState);
|
int32_t streamStateSessionClear(SStreamState* pState);
|
||||||
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
|
||||||
|
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur,
|
||||||
|
const SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
|
||||||
|
@ -66,6 +68,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
|
||||||
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
// fill
|
||||||
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
|
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
|
||||||
|
|
|
@ -76,8 +76,10 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
|
||||||
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
|
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
|
||||||
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
|
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
|
||||||
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen);
|
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen);
|
||||||
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
|
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
|
||||||
|
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
|
||||||
|
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen);
|
||||||
|
|
||||||
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen);
|
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen);
|
||||||
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
|
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
|
||||||
|
|
|
@ -71,6 +71,7 @@ void initStateStoreAPI(SStateStore* pStore) {
|
||||||
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
||||||
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
||||||
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
||||||
|
pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition;
|
||||||
|
|
||||||
pStore->updateInfoInit = updateInfoInit;
|
pStore->updateInfoInit = updateInfoInit;
|
||||||
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
||||||
|
|
|
@ -180,6 +180,7 @@ void initStateStoreAPI(SStateStore* pStore) {
|
||||||
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
||||||
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
||||||
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
||||||
|
pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition;
|
||||||
|
|
||||||
pStore->updateInfoInit = updateInfoInit;
|
pStore->updateInfoInit = updateInfoInit;
|
||||||
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
||||||
|
|
|
@ -605,6 +605,30 @@ typedef struct SStreamStateAggOperatorInfo {
|
||||||
SSDataBlock* pCheckpointRes;
|
SSDataBlock* pCheckpointRes;
|
||||||
} SStreamStateAggOperatorInfo;
|
} SStreamStateAggOperatorInfo;
|
||||||
|
|
||||||
|
typedef struct SStreamEventAggOperatorInfo {
|
||||||
|
SOptrBasicInfo binfo;
|
||||||
|
SStreamAggSupporter streamAggSup;
|
||||||
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||||
|
SGroupResInfo groupResInfo;
|
||||||
|
int32_t primaryTsIndex; // primary timestamp slot id
|
||||||
|
STimeWindowAggSupp twAggSup;
|
||||||
|
SSDataBlock* pDelRes;
|
||||||
|
SSHashObj* pSeDeleted;
|
||||||
|
void* pDelIterator;
|
||||||
|
SArray* pChildren; // cache for children's result;
|
||||||
|
bool ignoreExpiredData;
|
||||||
|
bool ignoreExpiredDataSaved;
|
||||||
|
SArray* pUpdated;
|
||||||
|
SSHashObj* pSeUpdated;
|
||||||
|
int64_t dataVersion;
|
||||||
|
bool isHistoryOp;
|
||||||
|
SArray* historyWins;
|
||||||
|
bool reCkBlock;
|
||||||
|
SSDataBlock* pCheckpointRes;
|
||||||
|
SFilterInfo* pStartCondInfo;
|
||||||
|
SFilterInfo* pEndCondInfo;
|
||||||
|
} SStreamEventAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SStreamPartitionOperatorInfo {
|
typedef struct SStreamPartitionOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SPartitionBySupporter partitionSup;
|
SPartitionBySupporter partitionSup;
|
||||||
|
@ -757,9 +781,51 @@ void doClearBufferedBlocks(SStreamScanInfo* pInfo);
|
||||||
uint64_t calcGroupId(char* pData, int32_t len);
|
uint64_t calcGroupId(char* pData, int32_t len);
|
||||||
void streamOpReleaseState(struct SOperatorInfo* pOperator);
|
void streamOpReleaseState(struct SOperatorInfo* pOperator);
|
||||||
void streamOpReloadState(struct SOperatorInfo* pOperator);
|
void streamOpReloadState(struct SOperatorInfo* pOperator);
|
||||||
|
void destroyStreamAggSupporter(SStreamAggSupporter* pSup);
|
||||||
|
void clearGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
|
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
|
||||||
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
|
SStorageAPI* pApi);
|
||||||
|
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||||
|
STimeWindowAggSupp* pTwSup);
|
||||||
|
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||||
|
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||||
|
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
|
||||||
|
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete);
|
||||||
|
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
|
||||||
|
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
|
||||||
|
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
|
||||||
|
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
|
||||||
|
void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
|
||||||
|
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
||||||
|
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
||||||
|
struct SOperatorInfo* pOperator, int64_t winDelta);
|
||||||
|
int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
|
||||||
|
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo);
|
||||||
|
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
|
||||||
|
void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
|
||||||
|
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
|
||||||
|
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite);
|
||||||
|
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock);
|
||||||
|
void getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
|
||||||
|
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
|
||||||
|
SResultWindowInfo* pNextWin);
|
||||||
|
void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
|
||||||
|
SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
||||||
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
|
||||||
|
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
|
||||||
|
void resetWinRange(STimeWindow* winRange);
|
||||||
|
|
||||||
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
||||||
|
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
||||||
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen);
|
||||||
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen);
|
||||||
|
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
|
||||||
|
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
|
||||||
|
|
||||||
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
|
|
||||||
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
|
|
||||||
void destroyOperatorParamValue(void* pValues);
|
void destroyOperatorParamValue(void* pValues);
|
||||||
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
|
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
|
||||||
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
|
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
|
||||||
|
|
|
@ -152,6 +152,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle);
|
||||||
|
|
||||||
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -519,6 +519,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
||||||
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
|
pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
|
||||||
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
|
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT == type) {
|
||||||
|
pOptr = createStreamEventAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
|
||||||
pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
|
pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN == type) {
|
||||||
|
|
|
@ -1127,7 +1127,8 @@ static bool isSessionWindow(SStreamScanInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isStateWindow(SStreamScanInfo* pInfo) {
|
static bool isStateWindow(SStreamScanInfo* pInfo) {
|
||||||
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE ||
|
||||||
|
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
|
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
|
||||||
|
@ -2243,6 +2244,7 @@ FETCH_NEXT_BLOCK:
|
||||||
blockDataCleanup(pSup->pScanBlock);
|
blockDataCleanup(pSup->pScanBlock);
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||||
|
printSpecDataBlock(pInfo->pUpdateRes, getStreamOpName(pOperator->operatorType), "rebuild", GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,733 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "executorInt.h"
|
||||||
|
#include "filter.h"
|
||||||
|
#include "function.h"
|
||||||
|
#include "functionMgt.h"
|
||||||
|
#include "operator.h"
|
||||||
|
#include "querytask.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "tcommon.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tfill.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
|
#define IS_FINAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_EVENT)
|
||||||
|
#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState"
|
||||||
|
#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint"
|
||||||
|
|
||||||
|
typedef struct SEventWinfowFlag {
|
||||||
|
bool startFlag;
|
||||||
|
bool endFlag;
|
||||||
|
} SEventWinfowFlag;
|
||||||
|
|
||||||
|
typedef struct SEventWindowInfo {
|
||||||
|
SResultWindowInfo winInfo;
|
||||||
|
SEventWinfowFlag* pWinFlag;
|
||||||
|
} SEventWindowInfo;
|
||||||
|
|
||||||
|
void destroyStreamEventOperatorInfo(void* param) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = (SStreamEventAggOperatorInfo*)param;
|
||||||
|
cleanupBasicInfo(&pInfo->binfo);
|
||||||
|
destroyStreamAggSupporter(&pInfo->streamAggSup);
|
||||||
|
clearGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
cleanupExprSupp(&pInfo->scalarSupp);
|
||||||
|
if (pInfo->pChildren != NULL) {
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i);
|
||||||
|
destroyOperator(pChild);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pInfo->pChildren);
|
||||||
|
}
|
||||||
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
blockDataDestroy(pInfo->pDelRes);
|
||||||
|
tSimpleHashCleanup(pInfo->pSeUpdated);
|
||||||
|
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||||
|
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
|
||||||
|
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||||
|
|
||||||
|
taosArrayDestroy(pInfo->historyWins);
|
||||||
|
blockDataDestroy(pInfo->pCheckpointRes);
|
||||||
|
|
||||||
|
if (pInfo->pStartCondInfo != NULL) {
|
||||||
|
filterFreeInfo(pInfo->pStartCondInfo);
|
||||||
|
pInfo->pStartCondInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pEndCondInfo != NULL) {
|
||||||
|
filterFreeInfo(pInfo->pEndCondInfo);
|
||||||
|
pInfo->pEndCondInfo = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo) {
|
||||||
|
char* pFlagInfo = (char*)pWinInfo->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize);
|
||||||
|
pWinInfo->pWinFlag = (SEventWinfowFlag*) pFlagInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, SEventWindowInfo* pWinInfo) {
|
||||||
|
pWinInfo->winInfo.sessionWin = *pKey;
|
||||||
|
pWinInfo->winInfo.pStatePos = pPos;
|
||||||
|
setEventWindowFlag(pAggSup, pWinInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) {
|
||||||
|
for (int32_t i = start; i < rows; i++) {
|
||||||
|
if (pEnd[i]) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t size = pAggSup->resultRowSize;
|
||||||
|
TSKEY ts = pTs [index];
|
||||||
|
bool start = pStart[index];
|
||||||
|
bool end = pEnd[index];
|
||||||
|
pCurWin->winInfo.sessionWin.groupId = groupId;
|
||||||
|
pCurWin->winInfo.sessionWin.win.skey = ts;
|
||||||
|
pCurWin->winInfo.sessionWin.win.ekey = ts;
|
||||||
|
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
|
SSessionKey leftWinKey = {0};
|
||||||
|
void* pVal = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len);
|
||||||
|
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) {
|
||||||
|
bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0);
|
||||||
|
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
|
||||||
|
if(inWin || !pCurWin->pWinFlag->endFlag) {
|
||||||
|
pCurWin->winInfo.isOutput = true;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
|
pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
|
SSessionKey rightWinKey = {0};
|
||||||
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len);
|
||||||
|
if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && start && !end) {
|
||||||
|
int32_t endi = getEndCondIndex(pEnd, index, rows);
|
||||||
|
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
|
||||||
|
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
|
||||||
|
pCurWin->winInfo.isOutput = true;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
|
||||||
|
pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
|
||||||
|
setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
|
||||||
|
pCurWin->pWinFlag->startFlag = start;
|
||||||
|
pCurWin->pWinFlag->endFlag = end;
|
||||||
|
pCurWin->winInfo.isOutput = false;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
|
||||||
|
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
SET_SESSION_WIN_KEY_INVALID(pNextWinKey);
|
||||||
|
}
|
||||||
|
if(pCurWin->winInfo.pStatePos->needFree) {
|
||||||
|
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
|
||||||
|
}
|
||||||
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
|
qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
|
||||||
|
pCurWin->winInfo.sessionWin.win.ekey);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey,
|
||||||
|
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows,
|
||||||
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
|
||||||
|
*pRebuild = false;
|
||||||
|
TSKEY maxTs = INT64_MAX;
|
||||||
|
STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win;
|
||||||
|
if (pWinInfo->pWinFlag->endFlag) {
|
||||||
|
maxTs = pWin->ekey + 1;
|
||||||
|
}
|
||||||
|
if (!IS_INVALID_SESSION_WIN_KEY(*pNextWinKey)) {
|
||||||
|
maxTs = TMIN(maxTs, pNextWinKey->win.skey);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = start; i < rows; ++i) {
|
||||||
|
if (pTsData[i] >= maxTs) {
|
||||||
|
return i - 1 - start;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWin->skey > pTsData[i]) {
|
||||||
|
if (pStDeleted && pWinInfo->winInfo.isOutput) {
|
||||||
|
saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
|
||||||
|
}
|
||||||
|
removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
|
||||||
|
pWin->skey = pTsData[i];
|
||||||
|
pWinInfo->pWinFlag->startFlag = starts[i];
|
||||||
|
} else if (pWin->skey == pTsData[i]) {
|
||||||
|
pWinInfo->pWinFlag->startFlag |= starts[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWin->ekey < pTsData[i]) {
|
||||||
|
pWin->ekey = pTsData[i];
|
||||||
|
pWinInfo->pWinFlag->endFlag = ends[i];
|
||||||
|
} else if (pWin->ekey == pTsData[i]) {
|
||||||
|
pWinInfo->pWinFlag->endFlag |= ends[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
||||||
|
|
||||||
|
if (ends[i]) {
|
||||||
|
if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey ) {
|
||||||
|
*pRebuild = true;
|
||||||
|
}
|
||||||
|
return i + 1 - start;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rows - start;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated,
|
||||||
|
SSHashObj* pStDeleted, bool addGap) {
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
int32_t winNum = 0;
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SResultRow* pCurResult = NULL;
|
||||||
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
while (1) {
|
||||||
|
if (!pCurWin->pWinFlag->startFlag || pCurWin->pWinFlag->endFlag) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SEventWindowInfo nextWinInfo = {0};
|
||||||
|
getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo);
|
||||||
|
if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
setEventWindowFlag(pAggSup, &nextWinInfo);
|
||||||
|
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, pStDeleted, false);
|
||||||
|
pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag;
|
||||||
|
winNum++;
|
||||||
|
}
|
||||||
|
return winNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isWindowIncomplete(SEventWindowInfo* pWinInfo) {
|
||||||
|
return !(pWinInfo->pWinFlag->startFlag && pWinInfo->pWinFlag->endFlag);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SSessionKey* pKey) {
|
||||||
|
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, pKey);
|
||||||
|
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
|
||||||
|
SSHashObj* pStDeleted) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
|
uint64_t groupId = pSDataBlock->info.id.groupId;
|
||||||
|
int64_t code = TSDB_CODE_SUCCESS;
|
||||||
|
TSKEY* tsCols = NULL;
|
||||||
|
SResultRow* pResult = NULL;
|
||||||
|
int32_t winRows = 0;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
SColumnInfoData* pColStart = NULL;
|
||||||
|
SColumnInfoData* pColEnd = NULL;
|
||||||
|
|
||||||
|
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
|
||||||
|
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
|
||||||
|
if (pAggSup->winRange.ekey <= 0) {
|
||||||
|
pAggSup->winRange.ekey = INT64_MAX;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSDataBlock->pDataBlock != NULL) {
|
||||||
|
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||||
|
tsCols = (int64_t*)pColDataInfo->pData;
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
|
||||||
|
code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶mStart);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("set data from start slotId error.");
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
int32_t statusStart = 0;
|
||||||
|
filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart);
|
||||||
|
|
||||||
|
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
|
||||||
|
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶mEnd);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("set data from end slotId error.");
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t statusEnd = 0;
|
||||||
|
filterExecute(pInfo->pEndCondInfo, pSDataBlock, &pColEnd, NULL, paramEnd.numOfCols, &statusEnd);
|
||||||
|
|
||||||
|
int32_t rows = pSDataBlock->info.rows;
|
||||||
|
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
|
||||||
|
for (int32_t i = 0; i < rows; i += winRows) {
|
||||||
|
if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) {
|
||||||
|
i++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int32_t winIndex = 0;
|
||||||
|
bool allEqual = true;
|
||||||
|
SEventWindowInfo curWin = {0};
|
||||||
|
SSessionKey nextWinKey;
|
||||||
|
setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey);
|
||||||
|
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
|
||||||
|
bool rebuild = false;
|
||||||
|
winRows = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, rows, i,
|
||||||
|
pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
|
||||||
|
ASSERT(winRows >= 1);
|
||||||
|
if (rebuild) {
|
||||||
|
uint64_t uid = 0;
|
||||||
|
appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
|
||||||
|
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
|
||||||
|
tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
|
||||||
|
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
|
||||||
|
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
|
||||||
|
pOperator, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
|
||||||
|
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
|
||||||
|
|
||||||
|
if (isWindowIncomplete(&curWin)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
|
code = saveResult(curWin.winInfo, pSeUpdated);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
||||||
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
colDataDestroy(pColStart);
|
||||||
|
taosMemoryFree(pColStart);
|
||||||
|
colDataDestroy(pColEnd);
|
||||||
|
taosMemoryFree(pColEnd);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t doStreamEventEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = (buf == NULL) ? NULL : *buf;
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
|
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSSessionKey(buf, key);
|
||||||
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.dataVersion
|
||||||
|
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
|
||||||
|
|
||||||
|
// 4.checksum
|
||||||
|
if (buf) {
|
||||||
|
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
|
||||||
|
tlen += taosEncodeFixedU32(buf, cksum);
|
||||||
|
} else {
|
||||||
|
tlen += sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4.checksum
|
||||||
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
|
ASSERT(0); // debug
|
||||||
|
qError("stream event state is invalid");
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t mapSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
|
SSessionKey key = {0};
|
||||||
|
SResultWindowInfo winfo = {0};
|
||||||
|
buf = decodeSSessionKey(buf, &key);
|
||||||
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.dataVersion
|
||||||
|
buf = taosDecodeFixedI64(buf, &pInfo->dataVersion);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t len = doStreamEventEncodeOpState(NULL, 0, pOperator);
|
||||||
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
|
void* pBuf = buf;
|
||||||
|
len = doStreamEventEncodeOpState(&pBuf, len, pOperator);
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
|
if (pInfo->pDelRes->info.rows > 0) {
|
||||||
|
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pDelRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
|
||||||
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
|
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pBInfo->pRes;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
qDebug("===stream=== stream event agg");
|
||||||
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
SSDataBlock* resBlock = buildEventResult(pOperator);
|
||||||
|
if (resBlock != NULL) {
|
||||||
|
return resBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->reCkBlock) {
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pCheckpointRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (!pInfo->pUpdated) {
|
||||||
|
pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo));
|
||||||
|
}
|
||||||
|
if (!pInfo->pSeUpdated) {
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||||
|
}
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
|
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
|
||||||
|
continue;
|
||||||
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||||
|
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||||
|
continue;
|
||||||
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
|
return pBlock;
|
||||||
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
|
doStreamEventSaveCheckpoint(pOperator);
|
||||||
|
pInfo->reCkBlock = true;
|
||||||
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||||
|
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||||
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||||
|
}
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||||
|
doStreamEventAggImpl(pOperator, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
|
||||||
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||||
|
}
|
||||||
|
// restore the value
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
|
||||||
|
copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
||||||
|
removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated);
|
||||||
|
|
||||||
|
if (pInfo->isHistoryOp) {
|
||||||
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
}
|
||||||
|
|
||||||
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
SSDataBlock* resBlock = buildEventResult(pOperator);
|
||||||
|
if (resBlock != NULL) {
|
||||||
|
return resBlock;
|
||||||
|
}
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamEventReleaseState(SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
|
int32_t resSize = winSize + sizeof(TSKEY);
|
||||||
|
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||||
|
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||||
|
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||||
|
qDebug("===stream=== event window operator relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize);
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
|
taosMemoryFreeClear(pBuff);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamEventReloadState(SOperatorInfo* pOperator) {
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
|
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
|
||||||
|
int32_t size = 0;
|
||||||
|
void* pBuf = NULL;
|
||||||
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_EVENT_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_EVENT_OP_STATE_NAME), &pBuf, &size);
|
||||||
|
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
|
||||||
|
qDebug("===stream=== event window operator reload state. get result count:%d", num);
|
||||||
|
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
|
||||||
|
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
|
||||||
|
|
||||||
|
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
|
||||||
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
|
||||||
|
pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
|
||||||
|
|
||||||
|
if (!pInfo->pSeUpdated && num > 0) {
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||||
|
}
|
||||||
|
if (!pInfo->pSeDeleted && num > 0) {
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < num; i++) {
|
||||||
|
SEventWindowInfo curInfo = {0};
|
||||||
|
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
|
||||||
|
pSeKeyBuf[i].groupId, i);
|
||||||
|
getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
|
||||||
|
setEventWindowFlag(pAggSup, &curInfo);
|
||||||
|
compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
|
||||||
|
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
|
||||||
|
curInfo.winInfo.sessionWin.groupId);
|
||||||
|
if (isWindowIncomplete(&curInfo)) {
|
||||||
|
if (curInfo.winInfo.isOutput) {
|
||||||
|
ASSERT(0);
|
||||||
|
saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
|
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
||||||
|
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) {
|
||||||
|
saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
|
||||||
|
}
|
||||||
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
|
||||||
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
|
||||||
|
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||||
|
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
|
||||||
|
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
|
||||||
|
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
if (pEventNode->window.pExprs != NULL) {
|
||||||
|
int32_t numOfScalar = 0;
|
||||||
|
SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar);
|
||||||
|
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->twAggSup = (STimeWindowAggSupp){
|
||||||
|
.waterMark = pEventNode->window.watermark,
|
||||||
|
.calTrigger = pEventNode->window.triggerType,
|
||||||
|
.maxTs = INT64_MIN,
|
||||||
|
.minTs = INT64_MAX,
|
||||||
|
};
|
||||||
|
|
||||||
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
|
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||||
|
int32_t numOfCols = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols);
|
||||||
|
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
||||||
|
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = tsSlotId;
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
|
pInfo->pDelIterator = NULL;
|
||||||
|
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
|
||||||
|
pInfo->pChildren = NULL;
|
||||||
|
pInfo->ignoreExpiredData = pEventNode->window.igExpired;
|
||||||
|
pInfo->ignoreExpiredDataSaved = false;
|
||||||
|
pInfo->pUpdated = NULL;
|
||||||
|
pInfo->pSeUpdated = NULL;
|
||||||
|
pInfo->dataVersion = 0;
|
||||||
|
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
|
||||||
|
if (!pInfo->historyWins) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
if (pHandle) {
|
||||||
|
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
|
||||||
|
// for stream
|
||||||
|
void* buff = NULL;
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t res =
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_EVENT_OP_CHECKPOINT_NAME), &buff, &len);
|
||||||
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
|
doStreamEventDecodeOpState(buff, len, pOperator);
|
||||||
|
taosMemoryFree(buff);
|
||||||
|
}
|
||||||
|
|
||||||
|
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||||
|
pInfo, pTaskInfo);
|
||||||
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||||
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||||
|
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||||
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
destroyStreamEventOperatorInfo(pInfo);
|
||||||
|
taosMemoryFreeClear(pOperator);
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
|
@ -146,7 +146,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
|
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
|
||||||
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
|
winInfo.sessionWin.win.ekey = winInfo.sessionWin.win.skey;
|
||||||
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
|
@ -845,6 +845,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
||||||
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
|
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore);
|
||||||
pResult = (SResultRow*)pResPos->pRowBuff;
|
pResult = (SResultRow*)pResPos->pRowBuff;
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
qError("%s set interval output buff error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||||
|
@ -1073,7 +1074,6 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
|
||||||
qError("stream interval state is invalid");
|
qError("stream interval state is invalid");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1156,7 +1156,7 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
|
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) {
|
||||||
|
@ -1234,7 +1234,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
}
|
}
|
||||||
|
qInfo("%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1765,8 +1765,9 @@ int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
|
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
|
||||||
key.win.ekey = key.win.skey;
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(pKey, &key);
|
||||||
void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey));
|
void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey));
|
||||||
if (pVal) {
|
if (pVal) {
|
||||||
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
|
||||||
|
@ -1775,12 +1776,12 @@ static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMa
|
||||||
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
|
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
|
||||||
*pHashKey = *pKey;
|
*pHashKey = *pKey;
|
||||||
pHashKey->win.ekey = pKey->win.skey;
|
pHashKey->win.ekey = pKey->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
|
void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
|
||||||
if (tSimpleHashGetSize(pHashMap) == 0) {
|
if (tSimpleHashGetSize(pHashMap) == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1794,7 +1795,7 @@ static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) {
|
void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) {
|
||||||
if (tSimpleHashGetSize(pHashMap) == 0) {
|
if (tSimpleHashGetSize(pHashMap) == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1823,7 +1824,7 @@ int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo*
|
||||||
if (pStDeleted && pWinInfo->isOutput) {
|
if (pStDeleted && pWinInfo->isOutput) {
|
||||||
saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
|
saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pAggSup, pStUpdated, pResultRows, pWinInfo->sessionWin);
|
removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->sessionWin);
|
||||||
pWinInfo->sessionWin.win.skey = pStartTs[i];
|
pWinInfo->sessionWin.win.skey = pStartTs[i];
|
||||||
}
|
}
|
||||||
pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
|
pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
|
||||||
|
@ -1845,7 +1846,7 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
||||||
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
||||||
SOperatorInfo* pOperator, int64_t winDelta) {
|
SOperatorInfo* pOperator, int64_t winDelta) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
@ -1867,7 +1868,7 @@ static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKe
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
|
int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
|
||||||
void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
|
void* pVal = tSimpleHashGet(pStUpdated, &pWinInfo->sessionWin, sizeof(SSessionKey));
|
||||||
if (pVal) {
|
if (pVal) {
|
||||||
SResultWindowInfo* pWin = pVal;
|
SResultWindowInfo* pWin = pVal;
|
||||||
|
@ -1891,6 +1892,34 @@ void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated,
|
||||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
|
||||||
|
SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
||||||
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) {
|
||||||
|
SResultRow* pCurResult = NULL;
|
||||||
|
int32_t numOfOutput = pSup->numOfExprs;
|
||||||
|
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
|
SResultRow* pWinResult = NULL;
|
||||||
|
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
|
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey);
|
||||||
|
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
|
||||||
|
|
||||||
|
int64_t winDelta = 0;
|
||||||
|
if (addGap) {
|
||||||
|
winDelta = pAggSup->gap;
|
||||||
|
}
|
||||||
|
updateTimeWindowInfo(&pTwAggSup->timeWindowData, &pCurWin->sessionWin.win, winDelta);
|
||||||
|
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pTwAggSup->timeWindowData);
|
||||||
|
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
|
||||||
|
if (pNextWin->isOutput && pStDeleted) {
|
||||||
|
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey,
|
||||||
|
pNextWin->sessionWin.groupId);
|
||||||
|
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
||||||
|
}
|
||||||
|
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, &pNextWin->sessionWin);
|
||||||
|
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
|
||||||
|
releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
|
static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
|
||||||
SSHashObj* pStDeleted, bool addGap) {
|
SSHashObj* pStDeleted, bool addGap) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
@ -1902,7 +1931,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
||||||
SResultRow* pCurResult = NULL;
|
SResultRow* pCurResult = NULL;
|
||||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
// initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
// Just look for the window behind StartIndex
|
// Just look for the window behind StartIndex
|
||||||
while (1) {
|
while (1) {
|
||||||
SResultWindowInfo winInfo = {0};
|
SResultWindowInfo winInfo = {0};
|
||||||
|
@ -1912,23 +1941,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
||||||
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SResultRow* pWinResult = NULL;
|
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, &winInfo, pStUpdated, pStDeleted, true);
|
||||||
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
|
||||||
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
|
|
||||||
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
|
|
||||||
int64_t winDelta = 0;
|
|
||||||
if (addGap) {
|
|
||||||
winDelta = pAggSup->gap;
|
|
||||||
}
|
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
|
|
||||||
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
|
||||||
tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
|
|
||||||
if (winInfo.isOutput && pStDeleted) {
|
|
||||||
saveDeleteRes(pStDeleted, winInfo.sessionWin);
|
|
||||||
}
|
|
||||||
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
|
|
||||||
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
|
|
||||||
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
|
||||||
winNum++;
|
winNum++;
|
||||||
}
|
}
|
||||||
return winNum;
|
return winNum;
|
||||||
|
@ -2014,6 +2027,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
|
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
|
||||||
pOperator, winDelta);
|
pOperator, winDelta);
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
qError("%s do stream session aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
|
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
|
||||||
|
@ -2022,6 +2036,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
|
||||||
code = saveResult(winInfo, pStUpdated);
|
code = saveResult(winInfo, pStUpdated);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s do stream session aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2035,7 +2050,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
|
void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
@ -2057,7 +2072,7 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
|
inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
|
||||||
SResultWindowInfo* pWinInfo1 = (SResultWindowInfo*)pKey1;
|
SResultWindowInfo* pWinInfo1 = (SResultWindowInfo*)pKey1;
|
||||||
SResultWindowInfo* pWinInfo2 = (SResultWindowInfo*)pKey2;
|
SResultWindowInfo* pWinInfo2 = (SResultWindowInfo*)pKey2;
|
||||||
SSessionKey* pWin1 = &pWinInfo1->sessionWin;
|
SSessionKey* pWin1 = &pWinInfo1->sessionWin;
|
||||||
|
@ -2487,8 +2502,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
qError("stream session state is invalid");
|
||||||
qError("stream interval state is invalid");
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2546,6 +2560,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
if (opRes) {
|
if (opRes) {
|
||||||
return opRes;
|
return opRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->reCkBlock) {
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pCheckpointRes;
|
||||||
|
}
|
||||||
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2612,6 +2633,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* pChildOp =
|
SOperatorInfo* pChildOp =
|
||||||
createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0, NULL);
|
createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0, NULL);
|
||||||
if (!pChildOp) {
|
if (!pChildOp) {
|
||||||
|
qError("%s create stream child of final session error", GET_TASKID(pTaskInfo));
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
|
@ -2876,6 +2898,14 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
|
||||||
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
|
pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete) {
|
||||||
|
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||||
|
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||||
|
removeSessionResults(pAggSup, pMapUpdate, pWins);
|
||||||
|
copyDeleteWindowInfo(pWins, pMapDelete);
|
||||||
|
taosArrayDestroy(pWins);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||||
|
@ -2902,6 +2932,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
if (pInfo->reCkBlock) {
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pCheckpointRes;
|
||||||
|
}
|
||||||
clearFunctionContext(&pOperator->exprSupp);
|
clearFunctionContext(&pOperator->exprSupp);
|
||||||
// semi session operator clear disk buffer
|
// semi session operator clear disk buffer
|
||||||
clearStreamSessionOperator(pInfo);
|
clearStreamSessionOperator(pInfo);
|
||||||
|
@ -2930,11 +2965,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
deleteSessionWinState(pAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted);
|
||||||
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
|
||||||
removeSessionResults(pAggSup, pInfo->pStUpdated, pWins);
|
|
||||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
|
||||||
taosArrayDestroy(pWins);
|
|
||||||
pInfo->clearState = true;
|
pInfo->clearState = true;
|
||||||
break;
|
break;
|
||||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||||
|
@ -3227,7 +3258,7 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW
|
||||||
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
|
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
|
||||||
saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
|
saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pAggSup, pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
|
removeSessionResult(pAggSup, pSeUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
|
||||||
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
|
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
|
||||||
}
|
}
|
||||||
pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
|
pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
|
||||||
|
@ -3297,6 +3328,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
|
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
|
||||||
pOperator, 0);
|
pOperator, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
qError("%s do one window aggregate impl error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
|
saveSessionOutputBuf(pAggSup, &curWin.winInfo);
|
||||||
|
@ -3304,6 +3336,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
code = saveResult(curWin.winInfo, pSeUpdated);
|
code = saveResult(curWin.winInfo, pSeUpdated);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s do stream state aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3375,8 +3408,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
||||||
int32_t dataLen = len - sizeof(uint32_t);
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(0); // debug
|
qError("stream state_window state is invalid");
|
||||||
qError("stream interval state is invalid");
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3454,6 +3486,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
return resBlock;
|
return resBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->reCkBlock) {
|
||||||
|
pInfo->reCkBlock = false;
|
||||||
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
|
return pInfo->pCheckpointRes;
|
||||||
|
}
|
||||||
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -3475,11 +3513,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted);
|
||||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
|
||||||
removeSessionResults(&pInfo->streamAggSup, pInfo->pSeUpdated, pWins);
|
|
||||||
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
|
||||||
taosArrayDestroy(pWins);
|
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
|
||||||
|
@ -3488,7 +3522,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
doStreamSessionSaveCheckpoint(pOperator);
|
doStreamStateSaveCheckpoint(pOperator);
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3550,29 +3584,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
|
||||||
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
|
||||||
|
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SResultRow* pCurResult = NULL;
|
compactTimeWindow(pSup, &pInfo->streamAggSup, &pInfo->twAggSup, pTaskInfo, pCurWin, pNextWin, pStUpdated, pStDeleted, false);
|
||||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
|
||||||
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
|
||||||
SResultRow* pWinResult = NULL;
|
|
||||||
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
|
||||||
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, pNextWin->sessionWin.win.ekey);
|
|
||||||
memcpy(pCurWin->pStatePos->pKey, &pCurWin->sessionWin, sizeof(SSessionKey));
|
|
||||||
|
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
|
|
||||||
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
|
||||||
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
|
|
||||||
if (pNextWin->isOutput && pStDeleted) {
|
|
||||||
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey,
|
|
||||||
pNextWin->sessionWin.groupId);
|
|
||||||
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
|
||||||
}
|
|
||||||
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
|
|
||||||
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
|
|
||||||
releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamStateReloadState(SOperatorInfo* pOperator) {
|
void streamStateReloadState(SOperatorInfo* pOperator) {
|
||||||
|
@ -3743,12 +3756,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
|
|
||||||
pRes->info.id.groupId = pMiaInfo->groupId;
|
|
||||||
pMiaInfo->curTs = INT64_MIN;
|
|
||||||
pMiaInfo->groupId = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
||||||
for (int i = 0; i < num; i++) {
|
for (int i = 0; i < num; i++) {
|
||||||
if (type == STREAM_INVERT) {
|
if (type == STREAM_INVERT) {
|
||||||
|
|
|
@ -6883,7 +6883,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
||||||
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) {
|
crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
}
|
}
|
||||||
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
|
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
|
||||||
|
|
|
@ -122,7 +122,7 @@ int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** p
|
||||||
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
|
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
|
||||||
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
|
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
|
||||||
void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
|
void* streamDefaultIterCreate_rocksdb(SStreamState* pState);
|
||||||
int32_t streamDefaultIterValid_rocksdb(void* iter);
|
bool streamDefaultIterValid_rocksdb(void* iter);
|
||||||
void streamDefaultIterSeek_rocksdb(void* iter, const char* key);
|
void streamDefaultIterSeek_rocksdb(void* iter, const char* key);
|
||||||
void streamDefaultIterNext_rocksdb(void* iter);
|
void streamDefaultIterNext_rocksdb(void* iter);
|
||||||
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
|
char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
|
||||||
|
|
|
@ -2850,9 +2850,12 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
int32_t streamDefaultIterValid_rocksdb(void* iter) {
|
bool streamDefaultIterValid_rocksdb(void* iter) {
|
||||||
|
if (iter) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
SStreamStateCur* pCur = iter;
|
SStreamStateCur* pCur = iter;
|
||||||
return rocksdb_iter_valid(pCur->iter) ? 1 : 0;
|
return (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) ? true : false;
|
||||||
}
|
}
|
||||||
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
|
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
|
||||||
SStreamStateCur* pCur = iter;
|
SStreamStateCur* pCur = iter;
|
||||||
|
|
|
@ -72,7 +72,7 @@ bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey) {
|
static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey) {
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
ASSERT(pNewPos->pRowBuff);
|
ASSERT(pNewPos->pRowBuff);
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
|
@ -80,7 +80,7 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW
|
||||||
return pNewPos;
|
return pNewPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, SSessionKey* pKey, int32_t index) {
|
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) {
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
ASSERT(pNewPos->pRowBuff);
|
ASSERT(pNewPos->pRowBuff);
|
||||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
|
@ -270,6 +270,51 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
|
||||||
|
SRowBuffPos* pNewPos = NULL;
|
||||||
|
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||||
|
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||||
|
SArray* pWinStates = NULL;
|
||||||
|
if (!ppBuff) {
|
||||||
|
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||||
|
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
||||||
|
} else {
|
||||||
|
pWinStates = (SArray*)(*ppBuff);
|
||||||
|
}
|
||||||
|
if (!pCur) {
|
||||||
|
pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pWinStates);
|
||||||
|
if (pCur->buffIndex >= 0) {
|
||||||
|
if (pCur->buffIndex >= size) {
|
||||||
|
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex);
|
||||||
|
goto _end;
|
||||||
|
} else {
|
||||||
|
if (size > 0) {
|
||||||
|
SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
|
||||||
|
if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) {
|
||||||
|
// pCur is invalid
|
||||||
|
SSessionKey pTmpKey = *pWinKey;
|
||||||
|
int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen);
|
||||||
|
ASSERT(code == TSDB_CODE_FAILED);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
|
pNewPos->needFree = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
|
||||||
|
(*ppVal) = pNewPos;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void sessionWinStateClear(SStreamFileState* pFileState) {
|
void sessionWinStateClear(SStreamFileState* pFileState) {
|
||||||
int32_t buffSize = getRowStateRowSize(pFileState);
|
int32_t buffSize = getRowStateRowSize(pFileState);
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
|
|
|
@ -738,6 +738,14 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamStateSessionAllocWinBuffByNextPosition(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
|
#ifdef USE_ROCKSDB
|
||||||
|
return allocSessioncWinBuffByNextPosition(pState->pFileState, pCur, pKey, pVal, pVLen);
|
||||||
|
#else
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen);
|
return getSessionFlushedBuff(pState->pFileState, key, pVal, pVLen);
|
||||||
|
|
|
@ -267,6 +267,8 @@ sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 delete_mark 20s into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ;
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 delete_mark 20s into streamt1 as select _wstart as c0, count(*) c1, count(a) c2 from st interval(10s) ;
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791211000,1,2,3);
|
sql insert into t1 values(1648791211000,1,2,3);
|
||||||
|
|
||||||
sql insert into t1 values(1262275200000,2,2,3);
|
sql insert into t1 values(1262275200000,2,2,3);
|
||||||
|
|
|
@ -0,0 +1,322 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print step1
|
||||||
|
print =============== create database
|
||||||
|
sql create database test vgroups 1;
|
||||||
|
sql use test;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with a = 9;
|
||||||
|
sleep 1000
|
||||||
|
sql insert into t1 values(1648791213000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791223001,9,2,2,1.1);
|
||||||
|
sql insert into t1 values(1648791213009,0,3,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop0:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt;
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print ======data02=$data02
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 3 then
|
||||||
|
print ======data03=$data03
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791243006,1,1,1,1.1);
|
||||||
|
sql insert into t1 values(1648791253000,2,2,2,1.1);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop1:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 2 sql select * from streamt;
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791243000,0,3,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 3 sql select * from streamt;
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791253009,9,4,4,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop3:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 4 sql select * from streamt;
|
||||||
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print ======data02=$data02
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 3 then
|
||||||
|
print ======data03=$data03
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 4 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 10 then
|
||||||
|
print ======data12=$data12
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != 4 then
|
||||||
|
print ======data13=$data13
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step2
|
||||||
|
print =============== create database test2
|
||||||
|
sql create database test2 vgroups 1;
|
||||||
|
sql use test2;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9;
|
||||||
|
sleep 1000
|
||||||
|
sql insert into t1 values(1648791213000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791213009,1,2,2,2.1);
|
||||||
|
sql insert into t1 values(1648791223000,0,9,9,9.0);
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,0,9,9,9.0);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print sql select * from streamt2;
|
||||||
|
sql select * from streamt2;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 1 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step3
|
||||||
|
print =============== create database test3
|
||||||
|
sql create database test3 vgroups 1;
|
||||||
|
sql use test3;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9;
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233009,1,2,2,2.1);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,0,9,9,9.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt3;
|
||||||
|
sql select * from streamt3;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,0,9,9,9.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 2 sql select * from streamt3;
|
||||||
|
sql select * from streamt3;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 1 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 3 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,0,1,1,1.0);
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213001,1,9,9,9.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop7:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 3 sql select * from streamt3;
|
||||||
|
sql select * from streamt3;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
print $data20 $data21 $data22 $data23
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 1 then
|
||||||
|
print ======data11=$data11
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 2
|
||||||
|
if $data21 != 3 then
|
||||||
|
print ======data21=$data21
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
print event0 end
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,49 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print step1
|
||||||
|
print =============== create database test1
|
||||||
|
sql create database test1 vgroups 1;
|
||||||
|
sql use test1;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9;
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233000,0,1,1,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,1,9,9,9.0);
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,3,3,3,3.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop0:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2 then
|
||||||
|
print ======data01=$data01
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
print event1 end
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue