From 3e49f40c748057fa95f2bcccaf9748cc46598305 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 16 Oct 2024 12:16:59 +0800 Subject: [PATCH] stream twa --- include/common/tmsg.h | 1 + include/libs/executor/storageapi.h | 13 +- include/libs/nodes/querynodes.h | 1 + include/libs/stream/streamState.h | 7 +- include/libs/stream/tstreamFileState.h | 6 + source/dnode/snode/src/snodeInitApi.c | 10 +- source/dnode/vnode/src/vnd/vnodeInitApi.c | 5 +- source/libs/executor/inc/executorInt.h | 29 + source/libs/executor/inc/streamexecutorInt.h | 26 +- source/libs/executor/src/scanoperator.c | 12 +- source/libs/executor/src/streamfilloperator.c | 333 +++++++++- .../src/streamintervalsliceoperator.c | 591 ++++++++++++++++++ .../executor/src/streamtimesliceoperator.c | 27 +- .../executor/src/streamtimewindowoperator.c | 20 +- source/libs/function/src/builtins.c | 4 +- source/libs/nodes/src/nodesCodeFuncs.c | 7 + source/libs/parser/src/parTranslater.c | 29 + source/libs/stream/inc/streamBackendRocksdb.h | 2 + source/libs/stream/src/streamBackendRocksdb.c | 72 ++- source/libs/stream/src/streamData.c | 6 +- source/libs/stream/src/streamSliceState.c | 17 +- source/libs/stream/src/streamState.c | 14 +- source/libs/stream/src/tstreamFileState.c | 196 +++++- 23 files changed, 1359 insertions(+), 69 deletions(-) create mode 100644 source/libs/executor/src/streamintervalsliceoperator.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c07f250a74..4da4bf2d2d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -459,6 +459,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, + QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, } ENodeType; typedef struct { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index da3a63dcc4..929248f5cb 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -39,9 +39,10 @@ extern "C" { #define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 -#define STREAM_STATE_BUFF_HASH 1 -#define STREAM_STATE_BUFF_SORT 2 -#define STREAM_STATE_BUFF_HASH_SORT 3 +#define STREAM_STATE_BUFF_HASH 1 +#define STREAM_STATE_BUFF_SORT 2 +#define STREAM_STATE_BUFF_HASH_SORT 3 +#define STREAM_STATE_BUFF_HASH_SEARCH 4 typedef struct SMeta SMeta; typedef TSKEY (*GetTsFun)(void*); @@ -341,6 +342,8 @@ typedef struct SStateStore { void (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used); void (*streamStateClearBuff)(SStreamState* pState, void* pVal); void (*streamStateFreeVal)(void* val); + int32_t (*streamStateGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, + int32_t* pVLen, int32_t* pWinCode); int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); @@ -372,9 +375,11 @@ typedef struct SStateStore { SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); void (*streamStateFreeCur)(SStreamStateCur* pCur); - int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); + void (*streamStateClearExpiredState)(SStreamState* pState); + int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen, int32_t* pWinCode); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index f5567c735e..1095dcd81c 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -448,6 +448,7 @@ typedef struct SSelectStmt { bool hasCountFunc; bool hasUdaf; bool hasStateKey; + bool hasTwaOrElapsedFunc; bool onlyHasKeepOrderFunc; bool groupSort; bool tagScan; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index be5c83b503..3b05e3abf2 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -49,6 +49,8 @@ void streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex); void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); +int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode); // session window int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen, @@ -102,9 +104,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* void streamStateFreeCur(SStreamStateCur* pCur); void streamStateResetCur(SStreamStateCur* pCur); -int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +// twa +void streamStateClearExpiredState(SStreamState* pState); + void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 64cb0456c9..a6f55fca76 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -142,6 +142,12 @@ int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, v void streamFileStateGroupCurNext(SStreamStateCur* pCur); int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen); SSHashObj* getGroupIdCache(SStreamFileState* pFileState); +int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos); +int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, + int32_t* pVLen, int32_t* pWinCode); + +//twa +void clearExpiredState(SStreamFileState* pFileState); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 5485c000d1..628b4b392d 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -46,6 +46,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; pStore->streamStateSetNumber = streamStateSetNumber; + pStore->streamStateGetPrev = streamStateGetPrev; pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillGet = streamStateFillGet; @@ -63,9 +64,11 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev; pStore->streamStateFreeCur = streamStateFreeCur; - pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur; + pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; + pStore->streamStateClearExpiredState = streamStateClearExpiredState; + pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; pStore->streamStateSessionPut = streamStateSessionPut; pStore->streamStateSessionGet = streamStateSessionGet; @@ -78,11 +81,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateCountGetKeyByRange = streamStateCountGetKeyByRange; pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition; - // void initStreamStateAPI(SStorageAPI* pAPI) { - // initStateStoreAPI(&pAPI->stateStore); - // initFunctionStateStore(&pAPI->functionStore); - // } - pStore->updateInfoInit = updateInfoInit; pStore->updateInfoFillBlockData = updateInfoFillBlockData; pStore->updateInfoIsUpdated = updateInfoIsUpdated; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 02e1a59c22..3a36dacbf0 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -162,6 +162,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateSaveInfo = streamStateSaveInfo; pStore->streamStateGetInfo = streamStateGetInfo; pStore->streamStateSetNumber = streamStateSetNumber; + pStore->streamStateGetPrev = streamStateGetPrev; pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillGet = streamStateFillGet; @@ -179,9 +180,11 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev; pStore->streamStateFreeCur = streamStateFreeCur; - pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur; + pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; + pStore->streamStateClearExpiredState = streamStateClearExpiredState; + pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; pStore->streamStateSessionPut = streamStateSessionPut; pStore->streamStateSessionGet = streamStateSessionGet; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2427a3d42a..a0ad574d54 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -536,6 +536,7 @@ typedef struct SStreamScanInfo { SSDataBlock* pCheckpointRes; int8_t pkColType; int32_t pkColLen; + bool useGetResultRange; } SStreamScanInfo; typedef struct { @@ -818,6 +819,10 @@ typedef struct SStreamFillOperatorInfo { int32_t primaryTsCol; int32_t primarySrcSlotId; SStreamFillInfo* pFillInfo; + SStreamAggSupporter* pStreamAggSup; + SArray* pCloseTs; + SArray* pUpdated; + SGroupResInfo groupResInfo; } SStreamFillOperatorInfo; typedef struct SStreamTimeSliceOperatorInfo { @@ -850,8 +855,32 @@ typedef struct SStreamTimeSliceOperatorInfo { SGroupResInfo groupResInfo; bool ignoreNull; bool isHistoryOp; + struct SOperatorInfo* pOperator; } SStreamTimeSliceOperatorInfo; +typedef struct SStreamIntervalSliceOperatorInfo { + SSteamOpBasicInfo basic; + SOptrBasicInfo binfo; + STimeWindowAggSupp twAggSup; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSup; + SInterval interval; + bool recvCkBlock; + SSDataBlock* pCheckpointRes; + int32_t primaryTsIndex; + SSHashObj* pUpdatedMap; // SWinKey + SArray* pUpdated; // SWinKey + SSHashObj* pDeletedMap; + SArray* pDelWins; + SSDataBlock* pDelRes; + int32_t delIndex; + bool destHasPrimaryKey; + int64_t endTs; + SGroupResInfo groupResInfo; + struct SOperatorInfo* pOperator; + bool hasFill; +} SStreamIntervalSliceOperatorInfo; + #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 29e165631d..1acfb4d205 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -30,6 +30,16 @@ extern "C" { #define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) #define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN) +#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN) +#define IS_VALID_WIN_KEY(ts) ((ts) != INT64_MIN) +#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN) + +#define IS_NORMAL_INTERVAL_OP(op) \ + ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \ + (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) + +#define IS_CONTINUE_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL) + typedef struct SSliceRowData { TSKEY key; char pRowVal[]; @@ -73,9 +83,23 @@ int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, void resetStreamFillSup(SStreamFillSupporter* pFillSup); void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup); +int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap); + int winPosCmprImpl(const void* pKey1, const void* pKey2); -void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); +void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index); +int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol); +void destroyFlusedppPos(void* ppRes); +void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, + SGroupResInfo* pGroupResInfo); +void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol); +int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull); + +int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, + struct SOperatorInfo** ppOptInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 670956ab80..f10cb14686 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -25,6 +25,7 @@ #include "tdatablock.h" #include "tmsg.h" +#include "ttime.h" #include "operator.h" #include "query.h" @@ -3397,9 +3398,7 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) { isTimeSlice(pInfo); } -static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) { - TSKEY start = src->info.window.skey; - TSKEY end = src->info.window.ekey; +static int32_t copyGetResultBlock(SSDataBlock* dest, TSKEY start, TSKEY end) { int32_t code = blockDataEnsureCapacity(dest, 1); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3635,7 +3634,11 @@ FETCH_NEXT_BLOCK: case STREAM_GET_RESULT: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; - code = copyGetResultBlock(pInfo->pUpdateRes, pBlock); + TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval); + if (pInfo->useGetResultRange == true) { + endKey = pBlock->info.window.ekey; + } + code = copyGetResultBlock(pInfo->pUpdateRes, pBlock->info.window.skey, endKey); QUERY_CHECK_CODE(code, lino, _end); pInfo->pUpdateInfo->maxDataVersion = -1; prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); @@ -4445,6 +4448,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->pFillSup = NULL; + pInfo->useGetResultRange = false; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 8df8170f3b..d3766a5eb5 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -86,6 +86,9 @@ void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) } void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { + if (pFillSup == NULL) { + return; + } pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols); tSimpleHashCleanup(pFillSup->pResMap); pFillSup->pResMap = NULL; @@ -139,6 +142,10 @@ static void destroyStreamFillOperatorInfo(void* param) { pInfo->pDelRes = NULL; taosArrayDestroy(pInfo->matchInfo.pList); pInfo->matchInfo.pList = NULL; + taosArrayDestroy(pInfo->pUpdated); + clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroy(pInfo->pCloseTs); + taosMemoryFree(pInfo); } @@ -183,7 +190,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; void* preVal = NULL; int32_t preVLen = 0; - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); + code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); if (code == TSDB_CODE_SUCCESS) { pFillSup->prev.key = preKey.ts; @@ -202,7 +209,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId}; void* nextVal = NULL; int32_t nextVLen = 0; - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen); + code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen); if (code == TSDB_CODE_SUCCESS) { pFillSup->next.key = nextKey.ts; pFillSup->next.pRowVal = nextVal; @@ -211,7 +218,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SWinKey nextNextKey = {.groupId = groupId}; void* nextNextVal = NULL; int32_t nextNextVLen = 0; - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen); + code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen); if (code == TSDB_CODE_SUCCESS) { pFillSup->nextNext.key = nextNextKey.ts; pFillSup->nextNext.pRowVal = nextNextVal; @@ -859,7 +866,7 @@ static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_ qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key); - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len); + code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len); pAPI->stateStore.streamStateFreeCur(pCur); qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code); } @@ -937,7 +944,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) { SWinKey delKey = {.groupId = delGroupId, .ts = delTs}; if (delTs == nextKey.ts) { pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur); - winCode = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL); + winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL); // ts will be deleted later if (delTs != ts) { pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey); @@ -1148,6 +1155,251 @@ _end: return code; } +void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, + SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + void* pState = pOperator->pTaskInfo->streamInfo.pState; + bool res = false; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { + SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pKey->groupId; + } else if (pBlock->info.id.groupId != pKey->groupId) { + break; + } + void* val = NULL; + int32_t len = 0; + int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL); + qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode); + if (winCode == TSDB_CODE_SUCCESS) { + pFillSup->cur.key = pKey->ts; + pFillSup->cur.pRowVal = val; + buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); + resetFillWindow(&pFillSup->cur); + } else { + SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey); + SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; + void* preVal = NULL; + int32_t preVLen = 0; + winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); + if (winCode == TSDB_CODE_SUCCESS) { + pFillSup->cur.key = pKey->ts; + pFillSup->cur.pRowVal = preVal; + if (pFillInfo->type == TSDB_FILL_PREV) { + code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); + QUERY_CHECK_CODE(code, lino, _end); + } else { + copyNotFillExpData(pFillSup, pFillInfo); + pFillInfo->pResRow->key = pKey->ts; + code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res); + QUERY_CHECK_CODE(code, lino, _end); + } + resetFillWindow(&pFillSup->cur); + } + pAPI->stateStore.streamStateFreeCur(pCur); + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } +} + +void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, + SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { + blockDataCleanup(pBlock); + if (!hasRemainResults(pGroupResInfo)) { + return; + } + + // clear the existed group id + pBlock->info.id.groupId = 0; + doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo); +} + +static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamFillOperatorInfo* pInfo = pOperator->info; + uint16_t opType = pOperator->operatorType; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); + if (pInfo->pRes->info.rows != 0) { + printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pRes; + goto _end; + } + + (*ppRes) = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +// force window close impl +static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamFillOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamFillSupporter* pFillSup = pInfo->pFillSup; + SStreamFillInfo* pFillInfo = pInfo->pFillInfo; + SSDataBlock* pBlock = pInfo->pSrcBlock; + uint64_t groupId = pBlock->info.id.groupId; + SSDataBlock* pRes = pInfo->pRes; + SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup; + SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); + TSKEY* tsCol = (TSKEY*)pTsCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++){ + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); + } + code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + +static int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int64_t groupId = 0; + SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState); + while (1) { + int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); + if (winCode != TSDB_CODE_SUCCESS) { + break; + } + SWinKey key = {.ts = ts, .groupId = groupId}; + void* pPushRes = taosArrayPush(pUpdated, &key); + QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); + + pAggSup->stateStore.streamStateGroupCurNext(pCur); + } + pAggSup->stateStore.streamStateFreeCur(pCur); + pCur = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + pAggSup->stateStore.streamStateFreeCur(pCur); + pCur = NULL; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +// force window close +static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamFillOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (pOperator->status == OP_EXEC_DONE) { + (*ppRes) = NULL; + return code; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + SSDataBlock* resBlock = NULL; + code = buildForceFillResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); + + if (resBlock != NULL) { + (*ppRes) = resBlock; + goto _end; + } + + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + setStreamOperatorCompleted(pOperator); + (*ppRes) = NULL; + goto _end; + } + + SSDataBlock* fillResult = NULL; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while (1) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + if (pBlock == NULL) { + pOperator->status = OP_RES_TO_RETURN; + qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType)); + break; + } + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); + + switch (pBlock->info.type) { + case STREAM_NORMAL: + case STREAM_INVALID: { + code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); + QUERY_CHECK_CODE(code, lino, _end); + + memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + pInfo->srcRowIndex = -1; + } break; + case STREAM_CHECKPOINT: + case STREAM_CREATE_CHILD_TABLE: { + (*ppRes) = pBlock; + goto _end; + } break; + case STREAM_GET_RESULT: { + void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey); + QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); + continue; + } + default: + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + } + + doStreamForceFillImpl(pOperator); + } + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { + TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); + code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated); + QUERY_CHECK_CODE(code, lino, _end); + } + taosArrayClear(pInfo->pCloseTs); + taosArraySort(pInfo->pUpdated, winKeyCmprImpl); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->groupResInfo.freeItem = false; + + pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); + + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _end); + + code = buildForceFillResult(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); + + if ((*ppRes) == NULL) { + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + setStreamOperatorCompleted(pOperator); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + pTaskInfo->code = code; + } + return code; +} + static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) { int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock); pFillSup->rowSize = sizeof(SResultCellData) * numOfCols; @@ -1370,6 +1622,50 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* } } +int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + if (IS_NORMAL_INTERVAL_OP(downstream)) { + SStreamIntervalOperatorInfo* pInfo = downstream->info; + *triggerType = pInfo->twAggSup.calTrigger; + *pInterval = pInfo->interval; + (*ppAggSup) = NULL; + } else if (IS_CONTINUE_INTERVAL_OP(downstream)) { + SStreamIntervalSliceOperatorInfo* pInfo = downstream->info; + *triggerType = pInfo->twAggSup.calTrigger; + *pInterval = pInfo->interval; + pInfo->hasFill = true; + (*ppAggSup) = &pInfo->streamAggSup; + } else { + code = TSDB_CODE_STREAM_INTERNAL_ERROR; + } + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t initForceFillDownStream(SOperatorInfo* downstream) { + SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + if (downstream == NULL) { + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + code = initForceFillDownStream(downstream->pDownstream[0]); + return code; + } + SStreamScanInfo* pInfo = (SStreamScanInfo*) downstream->info; + pInfo->useGetResultRange = true; + return code; +} + int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); @@ -1383,7 +1679,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi QUERY_CHECK_CODE(code, lino, _error); } - SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; SExprInfo* pFillExprInfo = NULL; @@ -1396,7 +1691,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno); - pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, + int8_t triggerType = 0; + SInterval interval = {0}; + code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup); + QUERY_CHECK_CODE(code, lino, _error); + + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, pInfo->pSrcBlock); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; @@ -1426,6 +1726,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _error); + pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno); + + pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY)); + QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno); + pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; @@ -1440,8 +1746,17 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, - optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + + if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + code = initForceFillDownStream(downstream); + QUERY_CHECK_CODE(code, lino, _error); + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + } else { + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + } setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c new file mode 100644 index 0000000000..77a75754b2 --- /dev/null +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -0,0 +1,591 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "executorInt.h" +#include "functionMgt.h" +#include "operator.h" +#include "querytask.h" +#include "storageapi.h" +#include "streamexecutorInt.h" +#include "tcommon.h" +#include "tdatablock.h" +#include "ttime.h" + +#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint" + +typedef struct SInervalSlicePoint { + SSessionKey winKey; + SSliceRowData* pLastRow; + SRowBuffPos* pResPos; +} SInervalSlicePoint; + +typedef enum SIntervalSliceType { + INTERVAL_SLICE_START = 1, + INTERVAL_SLICE_END = 2, +} SIntervalSliceType; + +void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) { +} + +void streamIntervalSliceReloadState(SOperatorInfo* pOperator) { +} + +void destroyStreamIntervalSliceOperatorInfo(void* param) { + SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param; + if (param == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); + if (pInfo->pOperator) { + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; + } + + clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; + + if (pInfo->pUpdatedMap != NULL) { + tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos); + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + } + destroyStreamAggSupporter(&pInfo->streamAggSup); + + colDataDestroy(&pInfo->twAggSup.timeWindowData); + cleanupExprSupp(&pInfo->scalarSup); + + tSimpleHashCleanup(pInfo->pDeletedMap); + taosArrayDestroy(pInfo->pDelWins); + blockDataDestroy(pInfo->pDelRes); + + blockDataDestroy(pInfo->pCheckpointRes); + + taosMemoryFreeClear(param); +} + +static int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, pInfo->pDelWins, &pInfo->delIndex, + pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->pDelRes; + return code; + } + + doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo); + if (pInfo->binfo.pRes->info.rows != 0) { + printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + (*ppRes) = pInfo->binfo.pRes; + goto _end; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) { +// } + +void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) { + pPoint->winKey.groupId = groupId; + pPoint->winKey.win = *pTWin; + pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize); +} + +static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, STimeWindow* pTWin, int64_t groupId, + SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId}; + int32_t curVLen = 0; + code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos, + &curVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", + curKey.ts, curKey.groupId, *pWinCode); + + initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint); + + SWinKey prevKey = {.groupId = groupId}; + SET_WIN_KEY_INVALID(prevKey.ts); + int32_t prevVLen = 0; + int32_t prevWinCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, + (void**)&pPrevPoint->pResPos, &prevVLen, &prevWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + if (prevWinCode == TSDB_CODE_SUCCESS) { + STimeWindow prevSTW = {.skey = prevKey.ts}; + prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval); + initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint); + } else { + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey); + SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock, + int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) { + SqlFunctionCtx* pCtx = pSup->pCtx; + for (int32_t k = 0; k < pSup->numOfExprs; ++k) { + if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { + pCtx[k].start.key = INT64_MIN; + continue; + } + + SFunctParam* pParam = &pCtx[k].param[0]; + SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId); + + double prevVal = 0, curVal = 0, winVal = 0; + SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId); + GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData); + GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); + + SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal}; + SPoint point2 = (SPoint){.key = curTs, .val = &curVal}; + SPoint point = (SPoint){.key = winKey, .val = &winVal}; + + if (!fmIsElapsedFunc(pCtx[k].functionId)) { + taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE); + } + + if (type == INTERVAL_SLICE_START) { + pCtx[k].start.key = point.key; + pCtx[k].start.val = winVal; + } else { + pCtx[k].end.key = point.key; + pCtx[k].end.val = winVal; + } + } +} + +static void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) { + for (int32_t k = 0; k < numOfOutput; ++k) { + pCtx[k].start.key = INT64_MIN; + pCtx[k].end.key = INT64_MIN; + } +} + +int32_t setIntervalSliceOutputBuf(SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx, int32_t numOfOutput, + int32_t* rowEntryInfoOffset) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SResultRow* res = pPoint->pResPos->pRowBuff; + + // set time window for current result + res->win = pPoint->winKey.win; + code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap, + SSHashObj* pDeletedMap) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info; + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + int32_t numOfOutput = pSup->numOfExprs; + TSKEY* tsCols = NULL; + int64_t groupId = pBlock->info.id.groupId; + SResultRow* pResult = NULL; + int32_t forwardRows = 0; + + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + tsCols = (int64_t*)pColDataInfo->pData; + + int32_t startPos = 0; + TSKEY curTs = getStartTsKey(&pBlock->info.window, tsCols); + SInervalSlicePoint curPoint = {0}; + SInervalSlicePoint prevPoint = {0}; + STimeWindow curWin = + getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC); + while (1) { + if (curTs > pInfo->endTs) { + break; + } + + int32_t winCode = TSDB_CODE_SUCCESS; + code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curWin, groupId, &curPoint, &prevPoint, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + + if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) { + setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); + doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, + 0, pBlock->info.rows, numOfOutput); + QUERY_CHECK_CODE(code, lino, _end); + SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId}; + saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap); + } + + setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); + if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { + doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); + } + forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); + int32_t prevEndPos = (forwardRows - 1) + startPos; + if (winCode != TSDB_CODE_SUCCESS) { + int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); + TSKEY endRowTs = tsCols[endRowId]; + transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); + } + SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId}; + if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { + code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0); + QUERY_CHECK_CODE(code, lino, _end); + } + + saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap); + + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1); + code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, + forwardRows, pBlock->info.rows, numOfOutput); + QUERY_CHECK_CODE(code, lino, _end); + + startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC); + if (startPos < 0) { + break; + } + curTs = tsCols[startPos]; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + +static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); + + if (pOperator->status == OP_EXEC_DONE) { + (*ppRes) = NULL; + goto _end; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + SSDataBlock* resBlock = NULL; + code = buildIntervalSliceResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (resBlock != NULL) { + (*ppRes) = resBlock; + return code; + } + + if (pInfo->hasFill == false) { + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); + } + setStreamOperatorCompleted(pOperator); + (*ppRes) = NULL; + return code; + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t numOfDatapack = 0; + + while (1) { + SSDataBlock* pBlock = NULL; + code = downstream->fpSet.getNextFn(downstream, &pBlock); + QUERY_CHECK_CODE(code, lino, _end); + + if (pBlock == NULL) { + pOperator->status = OP_RES_TO_RETURN; + break; + } + + switch (pBlock->info.type) { + case STREAM_NORMAL: + case STREAM_INVALID: { + SExprSupp* pExprSup = &pInfo->scalarSup; + if (pExprSup->pExprInfo != NULL) { + code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + QUERY_CHECK_CODE(code, lino, _end); + } + } break; + case STREAM_CHECKPOINT: { + pInfo->recvCkBlock = true; + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + // doStreamIntervalSliceSaveCheckpoint(pOperator); + pInfo->recvCkBlock = true; + code = copyDataBlock(pInfo->pCheckpointRes, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + continue; + } break; + case STREAM_CREATE_CHILD_TABLE: { + (*ppRes) = pBlock; + goto _end; + } break; + case STREAM_GET_RESULT: { + pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval); + if (pInfo->hasFill) { + (*ppRes) = pBlock; + goto _end; + } else { + continue; + } + } + default: + ASSERTS(false, "invalid SSDataBlock type"); + } + + code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); + QUERY_CHECK_CODE(code, lino, _end); + code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, pInfo->pDeletedMap); + QUERY_CHECK_CODE(code, lino, _end); + + } + + if (!pInfo->destHasPrimaryKey) { + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + } + + if (pInfo->destHasPrimaryKey) { + code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); + QUERY_CHECK_CODE(code, lino, _end); + } + + code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl); + QUERY_CHECK_CODE(code, lino, _end); + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); + + code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _end); + + (*ppRes) = NULL; + code = buildIntervalSliceResult(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); + + if ((*ppRes) == NULL) { + if (pInfo->hasFill == false) { + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); + } + setStreamOperatorCompleted(pOperator); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, + int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic, + SInterval* pInterval) { + SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { + SStreamPartitionOperatorInfo* pPartionInfo = downstream->info; + pPartionInfo->tsColIndex = tsColIndex; + pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex; + } + + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + code = + initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval); + return code; + } + SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->igCheckUpdate = true; + pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; + pScanInfo->pState = pAggSup->pState; + if (!pScanInfo->pUpdateInfo) { + code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, + pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, + &pScanInfo->pUpdateInfo); + QUERY_CHECK_CODE(code, lino, _end); + } + pScanInfo->twAggSup = *pTwSup; + pScanInfo->interval = *pInterval; + pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; + if (!hasSrcPrimaryKeyCol(pBasic)) { + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + +int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** ppOptInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo)); + QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno); + + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno) + + pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno); + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno); + + pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); + + pInfo->delIndex = 0; + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); + + code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); + QUERY_CHECK_CODE(code, lino, _error); + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + + code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->recvCkBlock = false; + + SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(&pOperator->resultInfo, 4096); + SExprSupp* pExpSup = &pOperator->exprSupp; + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs); + QUERY_CHECK_CODE(code, lino, _error); + + code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + + pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + + pInfo->twAggSup = + (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; + code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + QUERY_CHECK_CODE(code, lino, _error); + + if (pIntervalPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar); + QUERY_CHECK_CODE(code, lino, _error); + + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + } + + SSDataBlock* pDownRes = NULL; + SColumnInfo* pPkCol = NULL; + code = getDownstreamRes(downstream, &pDownRes, &pPkCol); + QUERY_CHECK_CODE(code, lino, _error); + + int32_t keyBytes = sizeof(TSKEY); + keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock); + if (pPkCol) { + keyBytes += pPkCol->bytes; + } + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0, + &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), + &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1); + + pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey; + pInfo->pOperator = pOperator; + pInfo->hasFill = false; + + setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED, + pInfo, pTaskInfo); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState); + + initStreamBasicInfo(&pInfo->basic); + if (downstream) { + code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic, &pInfo->interval); + QUERY_CHECK_CODE(code, lino, _error); + + code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); + } + + (*ppOptInfo) = pOperator; + return code; + +_error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + if (pInfo != NULL) { + destroyStreamIntervalSliceOperatorInfo(pInfo); + } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); + pTaskInfo->code = code; + (*ppOptInfo) = NULL; + return code; +} diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 57643d29e8..289c641c43 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -29,8 +29,6 @@ #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" -#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN) -#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN) int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); @@ -146,6 +144,11 @@ static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { void destroyStreamTimeSliceOperatorInfo(void* param) { SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param; + if (pInfo->pOperator) { + cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp, + &pInfo->groupResInfo); + pInfo->pOperator = NULL; + } colDataDestroy(&pInfo->twAggSup.timeWindowData); destroyStreamAggSupporter(&pInfo->streamAggSup); resetPrevAndNextWindow(pInfo->pFillSup); @@ -353,7 +356,7 @@ _end: } } -static SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { +SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { if (!pRowVal) { return NULL; } @@ -608,8 +611,8 @@ static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, i return -1; } -static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, - bool ignoreNull) { +int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, + bool ignoreNull) { TSKEY ts = tsCols[rowId]; int32_t resRow = -1; for (; rowId >= 0; rowId--) { @@ -1219,8 +1222,8 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream return false; } -static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, - int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { +void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, + int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); @@ -1344,7 +1347,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1363,7 +1366,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); if (left) { - transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1388,7 +1391,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); if (right) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); + transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); @@ -1680,7 +1683,7 @@ _end: return code; } -static int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) { +int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int64_t groupId = 0; @@ -1793,7 +1796,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR pInfo->recvCkBlock = true; pAggSup->stateStore.streamStateCommit(pAggSup->pState); doStreamTimeSliceSaveCheckpoint(pOperator); - pInfo->recvCkBlock = true; code = copyDataBlock(pInfo->pCheckpointRes, pBlock); QUERY_CHECK_CODE(code, lino, _end); continue; @@ -2075,6 +2077,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } + pInfo->pOperator = pOperator; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index d68b633b29..11e7681a30 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -30,9 +30,6 @@ #define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) #define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) -#define IS_NORMAL_INTERVAL_OP(op) \ - ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \ - (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) #define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) #define IS_NORMAL_SESSION_OP(op) \ @@ -2240,7 +2237,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, stateType, &pSup->pState->pFileState); - } else if (stateType == STREAM_STATE_BUFF_HASH_SORT) { + } else if (stateType == STREAM_STATE_BUFF_HASH_SORT || stateType == STREAM_STATE_BUFF_HASH_SEARCH) { pSup->pState->pFileState = NULL; code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, @@ -5304,8 +5301,8 @@ _end: return code; } -int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, - SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +static int32_t createStreamSingleIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -5458,6 +5455,17 @@ _error: return code; } +int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { + SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; + if (pIntervalPhyNode->window.triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + return createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, pOptrInfo); + } else { + return createStreamSingleIntervalOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, pOptrInfo); + } + return TSDB_CODE_SUCCESS; +} + static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pUpdatedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b781509556..15d88b304e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3252,7 +3252,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "elapsed", .type = FUNCTION_TYPE_ELAPSED, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED, .dataRequiredFunc = statisDataRequired, .translateFunc = translateElapsed, @@ -3508,7 +3508,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "twa", .type = FUNCTION_TYPE_TWA, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateInNumOutDou, .dataRequiredFunc = statisDataRequired, diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 364bb160aa..f7c29c64f7 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -5748,6 +5748,7 @@ static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs"; static const char* jkSelectStmtInterpFuncs = "HasInterpFuncs"; static const char* jkSelectStmtInterpFill = "InterpFill"; static const char* jkSelectStmtInterpEvery = "InterpEvery"; +static const char* jkSelectStmtTwaOrElapsedFuncs = "HasTwaOrElapsedFuncs"; static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { const SSelectStmt* pNode = (const SSelectStmt*)pObj; @@ -5798,6 +5799,9 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkSelectStmtInterpFuncs, pNode->hasInterpFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSelectStmtTwaOrElapsedFuncs, pNode->hasTwaOrElapsedFunc); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSelectStmtInterpFill, nodeToJson, pNode->pFill); } @@ -5857,6 +5861,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkSelectStmtInterpFuncs, &pNode->hasInterpFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSelectStmtTwaOrElapsedFuncs, &pNode->hasTwaOrElapsedFunc); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkSelectStmtInterpFill, &pNode->pFill); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9003053773..edd460db08 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2753,6 +2753,9 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc) pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType); pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType); pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); + pSelect->hasTwaOrElapsedFunc = pSelect->hasTwaOrElapsedFunc ? true + : (FUNCTION_TYPE_TWA == pFunc->funcType || + FUNCTION_TYPE_ELAPSED == pFunc->funcType); pSelect->hasInterpPseudoColFunc = pSelect->hasInterpPseudoColFunc ? true : fmIsInterpPseudoColumnFunc(pFunc->funcId); pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); @@ -10690,6 +10693,32 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm } } + if (pSelect->hasTwaOrElapsedFunc) { + if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream twa or elapsed function only support force window close"); + } + if (pSelect->pWindow->type != QUERY_NODE_INTERVAL_WINDOW) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream twa or elapsed function only support interval"); + } + + if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta && + TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !hasTbnameFunction(pSelect->pPartitionByList)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "twa or elapsed on super table must patitioned by table name"); + } + } + + if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (pStmt->pOptions->fillHistory) { + return generateSyntaxErrMsgExt( + &pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "When trigger was force window close, Stream unsupported Fill history"); + } + } + if (NULL != pSelect->pGroupByList) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported Group by"); } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 871a8bdbdb..7dd07a28e3 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -172,6 +172,7 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur); int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState); SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); @@ -210,6 +211,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState); +int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); // partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ae03af8083..d72a46b2d6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3359,7 +3359,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { return streamStateDel_rocksdb(pState, &tmp); } -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { if (!pCur) { return -1; } @@ -3422,7 +3422,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); if (pCur) { - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); + int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); if (code == 0) return pCur; streamStateFreeCur(pCur); } @@ -4239,6 +4239,16 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { rocksdb_iter_next(pCur->iter); } + + if (rocksdb_iter_valid(pCur->iter)) { + int64_t curGroupId; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr)); + if (curGroupId > groupId) return ; + + rocksdb_iter_next(pCur->iter); + } } int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) { @@ -5148,3 +5158,61 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { return code; } #endif + +SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { + stDebug("streamStateSeekKeyPrev_rocksdb"); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = createStreamStateCursor(); + if (pCur == NULL) { + return NULL; + } + + pCur->db = wrapper->db; + pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; + + char buf[128] = {0}; + int len = winKeyEncode((void*)key, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; + } + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } + + if (rocksdb_iter_valid(pCur->iter)) { + SWinKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr)); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) { + return pCur; + } + rocksdb_iter_prev(pCur->iter); + return pCur; + } + + streamStateFreeCur(pCur); + return NULL; +} + +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + if (!pCur) { + return -1; + } + uint64_t groupId = pKey->groupId; + + int32_t code = streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + if (code == 0) { + if (pKey->groupId == groupId) { + return 0; + } + if (pVal != NULL) { + taosMemoryFree((void*)*pVal); + *pVal = NULL; + } + } + return -1; +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 688b58622a..a315c9c726 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -330,9 +330,11 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp int64_t now = taosGetTimestampMs(); STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger); - p->pBlock->info.window = window; + p->pBlock->info.window.skey = window.skey; + p->pBlock->info.window.ekey = TMAX(now, window.ekey); p->pBlock->info.type = STREAM_GET_RESULT; - stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, window.skey, window.ekey); + stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, + p->pBlock->info.window.skey, p->pBlock->info.window.ekey); } else { p->pBlock->info.type = STREAM_GET_ALL; } diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 2ec8ef202c..0660a68ed2 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -23,11 +23,6 @@ #define NUM_OF_CACHE_WIN 64 #define MAX_NUM_OF_CACHE_WIN 128 -int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { - SWinKey* pWin2 = taosArrayGet(pDatas, pos); - return winKeyCmprImpl((SWinKey*)pWin1, pWin2); -} - int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -57,7 +52,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { SWinKey tmpKey = {.groupId = pKey->groupId}; - int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); + int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); if (tmpRes != TSDB_CODE_SUCCESS) { break; } @@ -134,7 +129,7 @@ int32_t getStateFromRocksdbByCur(SStreamFileState* pFileState, SStreamStateCur* int32_t lino = 0; void* tmpVal = NULL; int32_t len = 0; - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); if ((*pWinCode) == TSDB_CODE_SUCCESS) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); if (!pNewPos || !pNewPos->pRowBuff) { @@ -167,7 +162,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); if ((*pWinCode) == TSDB_CODE_SUCCESS && ppVal != NULL) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); if (!pNewPos || !pNewPos->pRowBuff) { @@ -188,7 +183,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); if ((*pWinCode) == TSDB_CODE_SUCCESS) { if (ppVal != NULL) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); @@ -241,7 +236,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); if ((*pWinCode) == TSDB_CODE_SUCCESS) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); if (!pNewPos || !pNewPos->pRowBuff) { @@ -270,7 +265,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); void* tmpVal = NULL; int32_t len = 0; - (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + (*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); if ((*pWinCode) == TSDB_CODE_SUCCESS) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); if (!pNewPos || !pNewPos->pRowBuff) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 66e7888d63..99801ab0eb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -229,6 +229,7 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { if (pState->pFileState) { + // todo(liuyao) 改这里 return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode); } return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); @@ -316,8 +317,8 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); } -int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen); +int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + return streamStateFillGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen); } SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { @@ -582,3 +583,12 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** } return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen); } + +void streamStateClearExpiredState(SStreamState* pState) { + clearExpiredState(pState->pFileState); +} + +int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode) { + return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); +} diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index a95b166fb1..ee6629704f 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -30,6 +30,8 @@ #define MIN_NUM_SEARCH_BUCKET 128 #define MAX_ARRAY_SIZE 1024 #define MAX_GROUP_ID_NUM 200000 +#define NUM_OF_CACHE_WIN 64 +#define MAX_NUM_OF_CACHE_WIN 128 #define TASK_KEY "streamFileState" #define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" @@ -68,6 +70,11 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; +int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { + SWinKey* pWin2 = taosArrayGet(pDatas, pos); + return winKeyCmprImpl((SWinKey*)pWin1, pWin2); +} + int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) { SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen); if (pos) { @@ -111,6 +118,17 @@ void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) { return pStateKey; } +void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) { + SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey)); + if (pStateKey == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return NULL; + } + SWinKey* pWinKey = pPos->pKey; + *pStateKey = *pWinKey; + return pStateKey; +} + int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey); } @@ -171,7 +189,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); - if (type == STREAM_STATE_BUFF_HASH) { + if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; @@ -200,7 +218,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn; - pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey; + pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey; pFileState->stateFileRemoveFn = hashSortFileRemoveFn; pFileState->stateFileGetFn = hashSortFileGetFn; @@ -213,6 +231,11 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno); QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno); + if (type == STREAM_STATE_BUFF_HASH_SEARCH) { + pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn); + QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno); + } + pFileState->keyLen = keySize; pFileState->rowSize = rowSize; pFileState->selectivityRowSize = selectRowSize; @@ -230,8 +253,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn); QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno); - // todo(liuyao) optimize - if (type == STREAM_STATE_BUFF_HASH) { + if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) { code = recoverSnapshot(pFileState, checkpointId); } else if (type == STREAM_STATE_BUFF_SORT) { code = recoverSesssion(pFileState, checkpointId); @@ -1160,6 +1182,9 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) { return; } + int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL); + pCur->minGpId = TMAX(pCur->minGpId, gpId); + SSHashObj* pHash = pFileState->pGroupIdMap; pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter); if (!pCur->pHashData) { @@ -1167,8 +1192,6 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) { streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur); return; } - int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL); - pCur->minGpId = TMIN(pCur->minGpId, gpId); } int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) { @@ -1183,3 +1206,164 @@ int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, voi SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { return pFileState->pGroupIdMap; } + +void clearExpiredState(SStreamFileState* pFileState) { + SSHashObj* pSearchBuff = pFileState->searchBuff; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pFileState->searchBuff, pIte, &iter)) != NULL) { + SArray* pWinStates = *((void**)pIte); + int32_t size = taosArrayGetSize(pWinStates); + for (int32_t i = 0; i < size - 1; i++) { + SWinKey* pKey = taosArrayGet(pWinStates, i); + int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey)); + qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff); + + int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); + qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); + + streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); + } + taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); + } +} + +int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + SArray* pWinStates = NULL; + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, sizeof(SWinKey)); + QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno); + + code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + QUERY_CHECK_CODE(code, lino, _end); + } + + // recover + if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { + TSKEY ts = getFlushMark(pFileState); + SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; + void* pState = getStateFileStore(pFileState); + SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start); + for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) { + SWinKey tmpKey = {.groupId = pKey->groupId}; + int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); + if (tmpRes != TSDB_CODE_SUCCESS) { + break; + } + void* tmp = taosArrayPush(pWinStates, &tmpKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + streamStateCurPrev_rocksdb(pCur); + } + taosArraySort(pWinStates, winKeyCmprImpl); + streamStateFreeCur(pCur); + } + + int32_t size = taosArrayGetSize(pWinStates); + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) { + // find the first position which is smaller than the pKey + if (index >= 0) { + SWinKey* pTmpKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmpKey, pKey) == 0) { + goto _end; + } + } + index++; + void* tmp = taosArrayInsert(pWinStates, index, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + } + + if (size >= MAX_NUM_OF_CACHE_WIN) { + int32_t num = size - NUM_OF_CACHE_WIN; + taosArrayRemoveBatch(pWinStates, 0, num, NULL); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, + int32_t* pVLen, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SArray* pWinStates = NULL; + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void* pState = getStateFileStore(pFileState); + void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId); + SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } + streamStateFreeCur(pCur); + return code; + } + int32_t size = taosArrayGetSize(pWinStates); + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (index >= 0) { + SWinKey* pCurKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pCurKey, pKey) == 0) { + index--; + } else { + qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__); + } + } + if (index == -1) { + SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey); + void* tmpVal = NULL; + int32_t len = 0; + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); + if ((*pWinCode) == TSDB_CODE_SUCCESS) { + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, tmpVal, len); + taosMemoryFreeClear(tmpVal); + *pVLen = getRowStateRowSize(pFileState); + (*ppVal) = pNewPos; + } + streamStateFreeCur(pCur); + return code; + } else { + SWinKey* pPrevKey = taosArrayGet(pWinStates, index); + *pResKey = *pPrevKey; + return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode); + } + (*pWinCode) = TSDB_CODE_FAILED; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +}