From 1c9011e82085437b490ce3dd0b879c1150583dd5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 12 Jul 2024 11:03:45 +0800 Subject: [PATCH] stream time slice --- include/common/tmsg.h | 1 + include/libs/executor/storageapi.h | 7 +- include/libs/nodes/plannodes.h | 48 +- include/libs/stream/streamState.h | 2 + include/libs/stream/tstreamFileState.h | 17 +- source/dnode/snode/src/snodeInitApi.c | 2 + source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 + source/libs/executor/inc/executorInt.h | 34 + source/libs/executor/inc/operator.h | 2 + source/libs/executor/inc/streamexecutorInt.h | 32 + source/libs/executor/src/executil.c | 2 + source/libs/executor/src/operator.c | 2 + source/libs/executor/src/scanoperator.c | 11 + .../executor/src/streamcountwindowoperator.c | 2 +- .../executor/src/streameventwindowoperator.c | 2 +- source/libs/executor/src/streamfilloperator.c | 52 +- .../executor/src/streamtimesliceoperator.c | 778 ++++++++++++++++++ .../executor/src/streamtimewindowoperator.c | 34 +- source/libs/executor/src/timesliceoperator.c | 8 +- source/libs/function/src/builtins.c | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 2 +- source/libs/nodes/src/nodesCodeFuncs.c | 15 +- source/libs/nodes/src/nodesMsgFuncs.c | 6 +- source/libs/nodes/src/nodesUtilFuncs.c | 5 +- source/libs/parser/src/parTranslater.c | 11 +- source/libs/planner/src/planLogicCreater.c | 13 + source/libs/planner/src/planPhysiCreater.c | 11 +- source/libs/planner/src/planValidator.c | 1 + source/libs/stream/inc/streamBackendRocksdb.h | 3 +- source/libs/stream/src/streamBackendRocksdb.c | 17 +- source/libs/stream/src/streamSessionState.c | 8 +- source/libs/stream/src/streamSliceState.c | 164 ++++ source/libs/stream/src/streamState.c | 11 + source/libs/stream/src/tstreamFileState.c | 84 +- 34 files changed, 1286 insertions(+), 105 deletions(-) create mode 100644 source/libs/executor/src/streamtimesliceoperator.c create mode 100644 source/libs/stream/src/streamSliceState.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a83aa4da44..4715290f72 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -458,6 +458,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, } ENodeType; typedef struct { diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 45f9f73fb1..89bc208905 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -39,8 +39,9 @@ extern "C" { #define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 -#define STREAM_STATE_BUFF_HASH 1 -#define STREAM_STATE_BUFF_SORT 2 +#define STREAM_STATE_BUFF_HASH 1 +#define STREAM_STATE_BUFF_SORT 2 +#define STREAM_STATE_BUFF_HASH_SORT 3 typedef struct SMeta SMeta; typedef TSKEY (*GetTsFun)(void*); @@ -349,6 +350,8 @@ typedef struct SStateStore { int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateFillDel)(SStreamState* pState, const SWinKey* key); + int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); + int32_t (*streamStateFillGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur); int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index a691433ee6..cc09ceb858 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -194,14 +194,25 @@ typedef struct SIndefRowsFuncLogicNode { bool isTimeLineFunc; } SIndefRowsFuncLogicNode; +typedef struct SStreamOption { + int8_t triggerType; + int64_t watermark; + int64_t deleteMark; + int8_t igExpired; + int8_t igCheckUpdate; + int8_t destHasPrimaryKey; +} SStreamOption; + typedef struct SInterpFuncLogicNode { - SLogicNode node; - SNodeList* pFuncs; - STimeWindow timeRange; - int64_t interval; - EFillMode fillMode; - SNode* pFillValues; // SNodeListNode - SNode* pTimeSeries; // SColumnNode + SLogicNode node; + SNodeList* pFuncs; + STimeWindow timeRange; + int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode + //todo(liuyao) 补充clone和json等 + SStreamOption streamOption; } SInterpFuncLogicNode; typedef struct SGroupCacheLogicNode { @@ -496,17 +507,20 @@ typedef struct SIndefRowsFuncPhysiNode { } SIndefRowsFuncPhysiNode; typedef struct SInterpFuncPhysiNode { - SPhysiNode node; - SNodeList* pExprs; - SNodeList* pFuncs; - STimeWindow timeRange; - int64_t interval; - int8_t intervalUnit; - EFillMode fillMode; - SNode* pFillValues; // SNodeListNode - SNode* pTimeSeries; // SColumnNode + SPhysiNode node; + SNodeList* pExprs; + SNodeList* pFuncs; + STimeWindow timeRange; + int64_t interval; + int8_t intervalUnit; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode + SStreamOption streamOption; //todo(liuyao) 补充clone和json等 } SInterpFuncPhysiNode; +typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode; + typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; @@ -635,7 +649,7 @@ typedef struct SWindowPhysiNode { int64_t watermark; int64_t deleteMark; int8_t igExpired; - int8_t destHasPrimayKey; + int8_t destHasPrimaryKey; bool mergeDataBlock; } SWindowPhysiNode; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 5768160fdb..7ece16e73d 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -76,6 +76,8 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key); +int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); +int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 68b9c4baa2..8ab935a75d 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -37,13 +37,15 @@ typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num); typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey); -typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen); +typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen); typedef int32_t (*_state_file_clear_fn)(SStreamState* pState); typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2); +typedef int (*__session_compare_fn_t)(const void* pWin, const void* pDatas, int pos); + SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type); @@ -69,9 +71,11 @@ int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); void* getRowStateBuff(SStreamFileState* pFileState); +void* getSearchBuff(SStreamFileState* pFileState); void* getStateFileStore(SStreamFileState* pFileState); bool isDeteled(SStreamFileState* pFileState, TSKEY ts); bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap); +TSKEY getFlushMark(SStreamFileState* pFileState); SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState); int32_t getRowStateRowSize(SStreamFileState* pFileState); @@ -98,6 +102,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur); int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey, range_cmpr_fn cmpFn); +int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn); + // state window int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); @@ -110,6 +116,15 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +//fill +int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen); +int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen); +int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey); +void clearSearchBuff(SStreamFileState* pFileState); +int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); +int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); +int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); + #ifdef __cplusplus } #endif diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 196fa56c99..9a353a0b7c 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -50,6 +50,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillGet = streamStateFillGet; pStore->streamStateFillDel = streamStateFillDel; + pStore->streamStateFillGetNext = streamStateFillGetNext; + pStore->streamStateFillGetPrev = streamStateFillGetPrev; pStore->streamStateCurNext = streamStateCurNext; pStore->streamStateCurPrev = streamStateCurPrev; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 48852dd159..d03b04d573 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -166,6 +166,8 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillPut = streamStateFillPut; pStore->streamStateFillGet = streamStateFillGet; pStore->streamStateFillDel = streamStateFillDel; + pStore->streamStateFillGetNext = streamStateFillGetNext; + pStore->streamStateFillGetPrev = streamStateFillGetPrev; pStore->streamStateCurNext = streamStateCurNext; pStore->streamStateCurPrev = streamStateCurPrev; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 95414ff70e..4b3875cc23 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -806,6 +806,36 @@ typedef struct SStreamFillOperatorInfo { SStreamFillInfo* pFillInfo; } SStreamFillOperatorInfo; +typedef struct SStreamTimeSliceOperatorInfo { + SSteamOpBasicInfo basic; + STimeWindowAggSupp twAggSup; + SStreamAggSupporter streamAggSup; + SStreamFillSupporter* pFillSup; + SStreamFillInfo* pFillInfo; + SSDataBlock* pRes; + SSDataBlock* pDelRes; + bool recvCkBlock; + SSDataBlock* pCheckpointRes; + int32_t fillType; + SResultRowData leftRow; + SResultRowData valueRow; + SResultRowData rightRow; + int32_t primaryTsIndex; + SExprSupp scalarSup; // scalar calculation + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + bool destHasPrimaryKey; + SArray* historyPoints; + SArray* pUpdated; // SWinKey + SSHashObj* pUpdatedMap; + int32_t delIndex; + SArray* pDelWins; // SWinKey + SSHashObj* pDeletedMap; + uint64_t numOfDatapack; + SGroupResInfo groupResInfo; + bool ignoreNull; +} SStreamTimeSliceOperatorInfo; + #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -963,6 +993,8 @@ void resetUnCloseSessionWinInfo(SSHashObj* winMap); void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); void destroyFlusedPos(void* pRes); +bool isIrowtsPseudoColumn(SExprInfo* pExprInfo); +bool isIsfilledPseudoColumn(SExprInfo* pExprInfo); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); @@ -995,6 +1027,8 @@ void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArr int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, int32_t prevPosition, int32_t order); void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); +bool getIgoreNullRes(SExprSupp* pExprSup); +bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull); diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 95208545bd..dfd986271c 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -166,6 +166,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle); + // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 8b61b93601..e16d14700a 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -21,10 +21,42 @@ extern "C" { #include "executorInt.h" +#define FILL_POS_INVALID 0 +#define FILL_POS_START 1 +#define FILL_POS_MID 2 +#define FILL_POS_END 3 + void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); +int64_t getDeleteMarkFromOption(SStreamOption* pOption); +void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); +void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins); +bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo); +void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal); +int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey, + STimeWindow* pNextWin); +int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap); +void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup); +void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, + SSDataBlock* pBlock); + +SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index); +int32_t initResultBuf(SStreamFillSupporter* pFillSup); +void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup); +void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol); +void resetFillWindow(SResultRowData* pRowData); +bool hasPrevWindow(SStreamFillSupporter* pFillSup); +bool hasNextWindow(SStreamFillSupporter* pFillSup); +void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo); +void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo); +void setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); +bool hasRemainCalc(SStreamFillInfo* pFillInfo); +void destroySPoint(void* ptr); + +int winPosCmprImpl(const void* pKey1, const void* pKey2); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index cb2a75407a..142fa6d861 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2317,6 +2317,8 @@ char* getStreamOpName(uint16_t opType) { return "stream event"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: return "stream count"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: + return "stream interp"; } return ""; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 494794f8c2..f68a14bc2d 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -546,6 +546,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) { pOptr = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC == type) { + pOptr = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); } else { terrno = TSDB_CODE_INVALID_PARA; pTaskInfo->code = terrno; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 490f6b86fa..ed70b31845 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1299,6 +1299,10 @@ static bool isCountWindow(SStreamScanInfo* pInfo) { return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT; } +static bool isTimeSlice(SStreamScanInfo* pInfo) { + return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; +} + static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData; @@ -1795,6 +1799,11 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB return TSDB_CODE_SUCCESS; } +static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { + // todo(liuyao) add code 获取delete range的左邻居和右邻居,作为range + return TSDB_CODE_SUCCESS; +} + static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { blockDataCleanup(pDestBlock); if (pSrcBlock->info.rows == 0) { @@ -1980,6 +1989,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type); } else if (isCountWindow(pInfo)) { code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type); + } else if (isTimeSlice(pInfo)) { + code = generateTimeSliceScanRange(pInfo, pSrcBlock, pDestBlock, type); } else { code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock); } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 857d048457..869af23acd 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -715,7 +715,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); - pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true, diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 1809d0cc67..64f7f936d3 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -765,7 +765,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->reCkBlock = false; pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); - pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey; setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c1a38b66ba..ab4d03c1cd 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -21,6 +21,7 @@ #include "ttypes.h" #include "executorInt.h" +#include "streamexecutorInt.h" #include "tcommon.h" #include "thash.h" #include "ttime.h" @@ -32,12 +33,6 @@ #include "operator.h" #include "querytask.h" - -#define FILL_POS_INVALID 0 -#define FILL_POS_START 1 -#define FILL_POS_MID 2 -#define FILL_POS_END 3 - typedef struct STimeRange { TSKEY skey; TSKEY ekey; @@ -133,12 +128,12 @@ static void destroyStreamFillOperatorInfo(void* param) { taosMemoryFree(pInfo); } -static void resetFillWindow(SResultRowData* pRowData) { +void resetFillWindow(SResultRowData* pRowData) { pRowData->key = INT64_MIN; taosMemoryFreeClear(pRowData->pRowVal); } -void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStorageAPI* pAPI) { +void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) { resetFillWindow(&pFillSup->cur); } else { @@ -154,7 +149,7 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; void* pState = pOperator->pTaskInfo->streamInfo.pState; - resetPrevAndNextWindow(pFillSup, pState, pAPI); + resetPrevAndNextWindow(pFillSup); SWinKey key = {.ts = ts, .groupId = groupId}; int32_t curVLen = 0; @@ -167,7 +162,7 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; void* pState = pOperator->pTaskInfo->streamInfo.pState; - resetPrevAndNextWindow(pFillSup, pState, pAPI); + resetPrevAndNextWindow(pFillSup); SWinKey key = {.ts = ts, .groupId = groupId}; void* curVal = NULL; @@ -233,14 +228,14 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pAPI->stateStore.streamStateFreeCur(pCur); } -static bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; } -static bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; } +bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; } +bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; } static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; return false; } -static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) { +void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); @@ -262,30 +257,7 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE pRowVal->key = ts; } -static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pRowVal, SArray* pDelta, - SFillColInfo* pFillCol, int32_t numOfCol, int32_t winCount, int32_t order) { - for (int32_t i = 0; i < numOfCol; i++) { - if (!pFillCol[i].notFillCol) { - int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - char* var = colDataGetData(pCol, rowId); - double start = 0; - GET_TYPED_DATA(start, double, pCol->info.type, var); - SResultCellData* pCell = getResultCell(pRowVal, slotId); - double end = 0; - GET_TYPED_DATA(end, double, pCell->type, pCell->pData); - double delta = 0; - if (order == TSDB_ORDER_ASC) { - delta = (end - start) / winCount; - } else { - delta = (start - end) / winCount; - } - taosArraySet(pDelta, slotId, &delta); - } - } -} - -static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, +void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { for (int32_t i = 0; i < numOfCol; i++) { if (!pFillCol[i].notFillCol) { @@ -308,7 +280,7 @@ static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFil pFillInfo->end = ts; } -static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) { +void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) { setFillInfoStart(start, pInterval, pFillInfo); pFillInfo->current = pFillInfo->start; setFillInfoEnd(end, pInterval, pFillInfo); @@ -519,7 +491,7 @@ static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill return true; } -static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { +bool hasRemainCalc(SStreamFillInfo* pFillInfo) { if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) { return true; } @@ -1004,7 +976,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { return pInfo->pRes; } -static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { +int32_t initResultBuf(SStreamFillSupporter* pFillSup) { pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols; for (int i = 0; i < pFillSup->numOfAllCols; i++) { SFillColInfo* pCol = &pFillSup->pAllColInfo[i]; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c new file mode 100644 index 0000000000..5ba985c02b --- /dev/null +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -0,0 +1,778 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "executorInt.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "operator.h" +#include "querytask.h" +#include "storageapi.h" +#include "streamexecutorInt.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "ttime.h" + +#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" +#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" +#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) + +typedef struct SSlicePoint { + SWinKey key; + SResultRowData* pLeftRow; + SResultRowData* pRightRow; + SRowBuffPos* pResPos; +} SSlicePoint; + +void streamTimeSliceReleaseState(SOperatorInfo* pOperator) { + // todo(liuyao) add +} + +void streamTimeSliceReloadState(SOperatorInfo* pOperator) { + // todo(liuyao) add +} + +void destroyStreamTimeSliceOperatorInfo(void* param) { + SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param; + clearGroupResInfo(&pInfo->groupResInfo); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); + pInfo->pUpdated = NULL; + + taosArrayDestroy(pInfo->pDelWins); + blockDataDestroy(pInfo->pDelRes); + destroyStreamAggSupporter(&pInfo->streamAggSup); + + colDataDestroy(&pInfo->twAggSup.timeWindowData); + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + tSimpleHashCleanup(pInfo->pDeletedMap); + + blockDataDestroy(pInfo->pCheckpointRes); + // todo(liuyao) 看是否有遗漏 + taosMemoryFreeClear(param); +} + +static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) { + // todo(liuyao) add +} + +static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock) { + if (pBlock->info.rows >= pBlock->info.capacity) { + return false; + } + for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol); + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + + if (isIrowtsPseudoColumn(pFillCol->pExpr)) { + colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false); + } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) { + bool isFilled = false; + colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + } else { + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pCell = getResultCell(pResRow, srcSlot); + setRowCell(pDstCol, pBlock->info.rows, pCell); + } + } + + pBlock->info.rows += 1; + return true; +} + +static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { + STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; + if (inWinRange(&pFillSup->winRange, &st)) { + fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock); + } + pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, + pFillSup->interval.precision); + } +} + +static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol); + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + int16_t type = pDstCol->info.type; + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); + int32_t index = pBlock->info.rows; + if (isIrowtsPseudoColumn(pFillCol->pExpr)) { + colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false); + } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) { + bool isFilled = true; + colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + } else { + if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { + colDataSetNULL(pDstCol, index); + continue; + } + SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, srcSlot); + double vCell = 0; + SPoint start = {0}; + start.key = pFillInfo->pResRow->key; + start.val = pCell->pData; + + SPoint cur = {0}; + cur.key = pFillInfo->current; + cur.val = taosMemoryCalloc(1, pCell->bytes); + taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); + colDataSetVal(pDstCol, index, (const char*)cur.val, false); + destroySPoint(&cur); + } + } + pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, + pFillSup->interval.precision); + pBlock->info.rows++; + } +} + +static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, + SSDataBlock* pRes) { + if (pFillInfo->needFill == false) { + fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes); + return; + } + + if (pFillInfo->pos == FILL_POS_START) { + if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } + } + if (pFillInfo->type != TSDB_FILL_LINEAR) { + fillNormalRange(pFillSup, pFillInfo, pRes); + } else { + fillLinearRange(pFillSup, pFillInfo, pRes); + + if (pFillInfo->pos == FILL_POS_MID) { + if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } + } + + if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { + pFillInfo->pLinearInfo->hasNext = false; + taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints); + pFillInfo->pResRow = &pFillSup->cur; + setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo); + fillLinearRange(pFillSup, pFillInfo, pRes); + } + } + if (pFillInfo->pos == FILL_POS_END) { + if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + pFillInfo->pos = FILL_POS_INVALID; + } + } +} + +static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) { + if (!ignoreNull) { + return rowId; + } + + for (int32_t i = rowId; rowId < pBlock->info.rows; i++) { + if (!checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) { + return i; + } + } + return -1; +} + +static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, + bool ignoreNull) { + TSKEY ts = tsCols[rowId]; + int32_t resRow = -1; + for (; rowId >= 0; rowId--) { + if (checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) { + continue; + } + + if (ts != tsCols[rowId]) { + if (resRow >= 0) { + break; + } else { + ts = tsCols[rowId]; + } + } + resRow = rowId; + } + return resRow; +} + +static void getPointRowDataFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, + SSlicePoint* pPoint) { + int32_t curVLen = 0; + int32_t code = + pAggSup->stateStore.streamStateFillGet(pAggSup->pState, &pPoint->key, (void**)&pPoint->pResPos, &curVLen); + pPoint->pLeftRow = pPoint->pResPos->pRowBuff; + if (pFillSup->type == TSDB_FILL_LINEAR) { + pPoint->pRightRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize); + } else { + pPoint->pRightRow = NULL; + } +} + +static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, + int64_t groupId) { + void* pState = pAggSup->pState; + resetPrevAndNextWindow(pFillSup); + + SWinKey key = {.ts = ts, .groupId = groupId}; + void* curVal = NULL; + int32_t curVLen = 0; + int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen); + if (code == TSDB_CODE_SUCCESS) { + pFillSup->cur.key = key.ts; + pFillSup->cur.pRowVal = curVal; + } else { + qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); + pFillSup->cur.key = ts; + pFillSup->cur.pRowVal = NULL; + } + + SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; + void* preVal = NULL; + int32_t preVLen = 0; + code = pAggSup->stateStore.streamStateFillGetPrev(pState, &key, &preKey, &preVal, &preVLen); + if (code == TSDB_CODE_SUCCESS) { + pFillSup->prev.key = preKey.ts; + pFillSup->prev.pRowVal = preVal; + } + + SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId}; + void* nextVal = NULL; + int32_t nextVLen = 0; + code = pAggSup->stateStore.streamStateFillGetNext(pState, &key, &nextKey, &nextVal, &nextVLen); + if (code == TSDB_CODE_SUCCESS) { + pFillSup->next.key = nextKey.ts; + pFillSup->next.pRowVal = nextVal; + } +} + +static void setTimeSliceFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { + if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) { + pFillInfo->needFill = false; + pFillInfo->pos = FILL_POS_START; + return; + } + TSKEY prevWKey = INT64_MIN; + TSKEY nextWKey = INT64_MIN; + if (hasPrevWindow(pFillSup)) { + prevWKey = pFillSup->prev.key; + } + if (hasNextWindow(pFillSup)) { + nextWKey = pFillSup->next.key; + } + + pFillInfo->needFill = true; + pFillInfo->pos = FILL_POS_INVALID; + switch (pFillInfo->type) { + case TSDB_FILL_NULL: + case TSDB_FILL_NULL_F: + case TSDB_FILL_SET_VALUE: + case TSDB_FILL_SET_VALUE_F: { + if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) { + pFillInfo->needFill = false; + pFillInfo->pos = FILL_POS_START; + } else if (hasPrevWindow(pFillSup)) { + setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_END; + } else { + setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_START; + } + copyNotFillExpData(pFillSup, pFillInfo); + } break; + case TSDB_FILL_PREV: { + if (hasNextWindow(pFillSup)) { + setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_START; + resetFillWindow(&pFillSup->prev); + pFillSup->prev.key = pFillSup->cur.key; + pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; + } else { + ASSERT(hasPrevWindow(pFillSup)); + setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_END; + pFillInfo->preRowKey = INT64_MIN; + } + pFillInfo->pResRow = &pFillSup->prev; + } break; + case TSDB_FILL_NEXT: { + if (hasPrevWindow(pFillSup)) { + setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); + pFillSup->next.key = pFillSup->cur.key; + pFillSup->next.pRowVal = pFillSup->cur.pRowVal; + pFillInfo->preRowKey = INT64_MIN; + } else { + ASSERT(hasNextWindow(pFillSup)); + setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_START; + } + pFillInfo->pResRow = &pFillSup->next; + } break; + case TSDB_FILL_LINEAR: { + if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) { + setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_MID; + pFillInfo->pLinearInfo->nextEnd = nextWKey; + calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); + pFillInfo->pResRow = &pFillSup->prev; + + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); + pFillInfo->pLinearInfo->hasNext = true; + } else if (hasPrevWindow(pFillSup)) { + setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_END; + pFillInfo->pLinearInfo->nextEnd = INT64_MIN; + calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); + pFillInfo->pResRow = &pFillSup->prev; + pFillInfo->pLinearInfo->hasNext = false; + } else { + ASSERT(hasNextWindow(pFillSup)); + setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_START; + pFillInfo->pLinearInfo->nextEnd = INT64_MIN; + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); + pFillInfo->pResRow = &pFillSup->cur; + pFillInfo->pLinearInfo->hasNext = false; + } + } break; + default: + ASSERT(0); + break; + } + ASSERT(pFillInfo->pos != FILL_POS_INVALID); +} + +static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { + switch (fillType) { + case TSDB_FILL_NULL: + case TSDB_FILL_NULL_F: + case TSDB_FILL_SET_VALUE: + case TSDB_FILL_SET_VALUE_F: { + if (HAS_NON_ROW_DATA(pPoint->pRightRow) && HAS_NON_ROW_DATA(pPoint->pLeftRow)) { + return true; + } + } break; + case TSDB_FILL_PREV: { + if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) { + return true; + } + } break; + case TSDB_FILL_NEXT: { + if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) { + return true; + } + } break; + case TSDB_FILL_LINEAR: { + if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) { + return true; + } else if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) { + return true; + } + } break; + default: + ASSERT(0); + } + return false; +} + +static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExprSupp* pExprSup = &pOperator->exprSupp; + int32_t numOfOutput = pExprSup->numOfExprs; + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); + TSKEY* tsCols = (int64_t*)pColDataInfo->pData; + void* pPkVal = NULL; + int32_t pkLen = 0; + int64_t groupId = pBlock->info.id.groupId; + SColumnInfoData* pPkColDataInfo = NULL; + SStreamFillSupporter* pFillSup = pInfo->pFillSup; + SStreamFillInfo* pFillInfo = pInfo->pFillInfo; + if (hasSrcPrimaryKeyCol(&pInfo->basic)) { + pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->basic.primaryPkIndex); + } + + pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; + if (pFillSup->winRange.ekey <= 0) { + pFillSup->winRange.ekey = INT64_MIN; + } + + int32_t startPos = 0; + for (; startPos < pBlock->info.rows; startPos++) { + if (hasSrcPrimaryKeyCol(&pInfo->basic) && pInfo->ignoreExpiredData) { + pPkVal = colDataGetData(pPkColDataInfo, startPos); + pkLen = colDataGetRowLength(pPkColDataInfo, startPos); + } + + if (pInfo->ignoreExpiredData && checkExpiredData(&pAggSup->stateStore, pAggSup->pUpdateInfo, &pInfo->twAggSup, + pBlock->info.id.uid, tsCols[startPos], pPkVal, pkLen)) { + qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, tsCols[startPos], + pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark); + continue; + } + + if (checkNullRow(pExprSup, pBlock, startPos, pInfo->ignoreNull)) { + continue; + } + } + + if (startPos >= pBlock->info.rows) { + return; + } + + SResultRowInfo dumyInfo = {0}; + dumyInfo.cur.pageId = -1; + STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); + SSlicePoint point = {.key.ts = curWin.skey, .key.groupId = groupId}; + getPointRowDataFromState(pAggSup, pFillSup, &point); + if (needAdjValue(&point, tsCols[startPos], true, pFillSup->type)) { + transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pLeftRow); + saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap); + } + + while (startPos < pBlock->info.rows) { + int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); + startPos += numOfWin; + int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); + startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull); + if (startPos < 0) { + break; + } + curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); + getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId); + bool left = needAdjValue(&point, tsCols[leftRowId], true, pFillSup->type); + if (left) { + transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], point.pLeftRow); + } + bool right = needAdjValue(&point, tsCols[startPos], false, pFillSup->type); + if (right) { + transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pRightRow); + } + + if (left || right) { + saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap); + } + } +} + +void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, + SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { + blockDataCleanup(pBlock); + if (!hasRemainResults(pGroupResInfo)) { + return; + } + + // clear the existed group id + pBlock->info.id.groupId = 0; + int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); + for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { + SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); + // todo(liuyao) fill 增加接口,get buff from pos设置pFillSup->cur + SWinKey* pKey = (SWinKey*)pPos->pKey; + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pKey->groupId; + } else if (pBlock->info.id.groupId != pKey->groupId) { + pGroupResInfo->index--; + break; + } + getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId); + setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts); + doStreamFillRange(pFillSup, pFillInfo, pBlock); + } +} + +static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) { + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + doBuildDeleteResultImpl(&pAggSup->stateStore, pAggSup->pState, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); + if (pInfo->pRes->info.rows != 0) { + printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pRes; + } + + return NULL; +} + +static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + if (hasRemainCalc(pInfo->pFillInfo) || + (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) { + blockDataCleanup(pInfo->pRes); + doStreamFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes); + if (pInfo->pRes->info.rows > 0) { + printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pRes; + } + } + + SSDataBlock* resBlock = buildTimeSliceResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + + if (pInfo->recvCkBlock) { + pInfo->recvCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } + + setStreamOperatorCompleted(pOperator); + return NULL; + } + + SSDataBlock* fillResult = NULL; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + while (1) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + if (pBlock == NULL) { + pOperator->status = OP_RES_TO_RETURN; + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType), + pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; + break; + } + pInfo->numOfDatapack++; + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + + switch (pBlock->info.type) { + case STREAM_DELETE_RESULT: { + // todo(liuyao) add + } break; + case STREAM_NORMAL: + case STREAM_INVALID: { + SExprSupp* pExprSup = &pInfo->scalarSup; + if (pExprSup->pExprInfo != NULL) { + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + } break; + case STREAM_CHECKPOINT: { + pInfo->recvCkBlock = true; + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + doStreamTimeSliceSaveCheckpoint(pOperator); + pInfo->recvCkBlock = true; + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; + } break; + case STREAM_CREATE_CHILD_TABLE: { + return pBlock; + } break; + default: + ASSERTS(false, "invalid SSDataBlock type"); + } + + doStreamTimeSliceImpl(pOperator, pBlock); + } + + if (!pInfo->destHasPrimaryKey) { + removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); + } else { + copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); + } + + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { + taosArrayPush(pInfo->pUpdated, pIte); + } + taosArraySort(pInfo->pUpdated, winPosCmprImpl); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); + pInfo->pUpdated = NULL; + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + tSimpleHashCleanup(pInfo->pUpdatedMap); + pInfo->pUpdatedMap = NULL; + + return buildTimeSliceResult(pOperator); +} + +int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) { + // todo(liuyao) add + return 0; +} + +void* doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { + // todo(liuyao) add + return NULL; +} + +static SStreamFillSupporter* initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, + int32_t numOfExprs) { + SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); + if (!pFillSup) { + return NULL; + } + pFillSup->numOfFillCols = numOfExprs; + int32_t numOfNotFillCols = 0; + pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, + (const SNodeListNode*)(pPhyFillNode->pFillValues)); + pFillSup->type = convertFillType(pPhyFillNode->fillMode); + pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols; + pFillSup->interval.interval = pPhyFillNode->interval; + // todo(liuyao) 初始化 pFillSup->interval其他属性 + pFillSup->pAPI = NULL; + + int32_t code = initResultBuf(pFillSup); + if (code != TSDB_CODE_SUCCESS) { + destroyStreamFillSupporter(pFillSup); + return NULL; + } + pFillSup->pResMap = NULL; + pFillSup->hasDelete = false; + return pFillSup; +} + +SOperatorInfo* createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { + SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } + SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode; + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(&pOperator->resultInfo, 4096); + SExprSupp* pExpSup = &pOperator->exprSupp; + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + int32_t code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pInterpPhyNode->pExprs != NULL) { + int32_t num = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pInterpPhyNode->streamOption.watermark, + .calTrigger = pInterpPhyNode->streamOption.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamOption), + }; + + pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, + sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + + pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + pInfo->delIndex = 0; + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + + pInfo->ignoreExpiredData = pInterpPhyNode->streamOption.igExpired; + pInfo->ignoreExpiredDataSaved = false; + pInfo->pUpdated = NULL; + pInfo->pUpdatedMap = NULL; + pInfo->historyPoints = taosArrayInit(4, sizeof(SWinKey)); + if (!pInfo->historyPoints) { + goto _error; + } + + pInfo->recvCkBlock = false; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + pInfo->destHasPrimaryKey = pInterpPhyNode->streamOption.destHasPrimaryKey; + pInfo->numOfDatapack = 0; + pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs); + + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; + setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, + true, OP_NOT_OPENED, pInfo, pTaskInfo); + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pTaskInfo->storageAPI.stateStore.streamStateGetInfo( + pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME), + &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamTimeSliceDecodeOpState(buff, len, pOperator); + taosMemoryFree(buff); + } + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamTimeSlice, NULL, destroyStreamTimeSliceOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); + + if (downstream) { + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ) { + SStreamScanInfo* pScanInfo = downstream->info; + pScanInfo->igCheckUpdate = true; + } + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, + &pInfo->basic); + code = appendDownstream(pOperator, &downstream, 1); + } + return pOperator; + +_error: + if (pInfo != NULL) { + destroyStreamTimeSliceOperatorInfo(pInfo); + } + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5b9c018bba..78d14578a6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -161,7 +161,7 @@ int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } -static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { +int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) { return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES); } @@ -189,7 +189,7 @@ static int32_t compareWinKey(void* pKey, void* data, int32_t index) { return winKeyCmprImpl(pKey, pDataPos); } -static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { +void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) { taosArraySort(pDelWins, winKeyCmprImpl); taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL); int32_t delSize = taosArrayGetSize(pDelWins); @@ -364,6 +364,10 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index, SSDataBlock* pBlock) { + doBuildDeleteResultImpl(&pInfo->stateStore, pInfo->pState, pWins, index, pBlock); +} + +void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, SSDataBlock* pBlock) { blockDataCleanup(pBlock); int32_t size = taosArrayGetSize(pWins); if (*index == size) { @@ -376,7 +380,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); void* tbname = NULL; - pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname, false); + pAPI->streamStateGetParName(pState, pWin->groupId, &tbname, false); if (tbname == NULL) { appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { @@ -384,7 +388,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } - pInfo->stateStore.streamStateFreeVal(tbname); + pAPI->streamStateFreeVal(tbname); (*index)++; } } @@ -994,7 +998,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } } -static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { +int winPosCmprImpl(const void* pKey1, const void* pKey2) { SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1; SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2; SWinKey* pWin1 = (SWinKey*)pos1->pKey; @@ -1224,7 +1228,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { } } -static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { +void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pMap, pIte, &iter)) != NULL) { @@ -1468,6 +1472,14 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { return deleteMark; } +int64_t getDeleteMarkFromOption(SStreamOption* pOption) { + if (pOption->deleteMark <= 0) { + return DEAULT_DELETE_MARK; + } + int64_t deleteMark = TMAX(pOption->deleteMark, pOption->watermark); + return deleteMark; +} + static TSKEY compareTs(void* pKey) { SWinKey* pWinKey = (SWinKey*)pKey; return pWinKey->ts; @@ -1633,7 +1645,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); - pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey; pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -3080,7 +3092,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->clearState = false; pInfo->recvGetAll = false; - pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimaryKey; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; @@ -3999,7 +4011,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvGetAll = false; pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); - pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimaryKey; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4272,7 +4284,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); - pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey; + pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey; // for stream void* buff = NULL; @@ -4599,6 +4611,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } void setStreamOperatorCompleted(SOperatorInfo* pOperator) { - setOperatorCompleted(pOperator); qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); + setOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index cda22fa320..c63b6aba9a 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -168,12 +168,12 @@ static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* return TSDB_CODE_SUCCESS; } -static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) { +bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) { char *name = pExprInfo->pExpr->_function.functionName; return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0); } -static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { +bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { char *name = pExprInfo->pExpr->_function.functionName; return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); } @@ -233,7 +233,7 @@ static bool isGroupKeyFunc(SExprInfo* pExprInfo) { return (functionType == FUNCTION_TYPE_GROUP_KEY); } -static bool getIgoreNullRes(SExprSupp* pExprSup) { +bool getIgoreNullRes(SExprSupp* pExprSup) { for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { SExprInfo* pExprInfo = &pExprSup->pExprInfo[i]; @@ -250,7 +250,7 @@ static bool getIgoreNullRes(SExprSupp* pExprSup) { return false; } -static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) { +bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) { if (!ignoreNull) { return false; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e3e84ac20b..26bdf7e51e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2893,7 +2893,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "interp", .type = FUNCTION_TYPE_INTERP, .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, + FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC, .translateFunc = translateInterp, .getEnvFunc = getSelectivityFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f02fefd977..991ec8d2e5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -762,7 +762,7 @@ static int32_t physiWindowCopy(const SWindowPhysiNode* pSrc, SWindowPhysiNode* p COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(igExpired); - COPY_SCALAR_FIELD(destHasPrimayKey); + COPY_SCALAR_FIELD(destHasPrimaryKey); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index d5b36aa1a8..38a4a2d1b6 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -413,6 +413,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiIndefRowsFunc"; case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: return "PhysiInterpFunc"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: + return "PhysiStreamInterpFunc"; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return "PhysiDispatch"; case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -2740,7 +2742,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddBoolToObject(pJson, jkWindowPhysiPlanMergeDataBlock, pNode->mergeDataBlock); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDestHasPrimaryKey, pNode->destHasPrimayKey); + code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDestHasPrimaryKey, pNode->destHasPrimaryKey); } return code; @@ -2778,7 +2780,7 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkWindowPhysiPlanMergeDataBlock, &pNode->mergeDataBlock); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanDestHasPrimaryKey, &pNode->destHasPrimayKey); + code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanDestHasPrimaryKey, &pNode->destHasPrimaryKey); } return code; @@ -5619,6 +5621,7 @@ static const char* jkSelectStmtLimit = "Limit"; static const char* jkSelectStmtSlimit = "Slimit"; static const char* jkSelectStmtStmtName = "StmtName"; static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs"; +static const char* jkSelectStmtInterpFuncs = "HasInterpFuncs"; static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { const SSelectStmt* pNode = (const SSelectStmt*)pObj; @@ -5666,6 +5669,9 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkSelectStmtHasAggFuncs, pNode->hasAggFuncs); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSelectStmtInterpFuncs, pNode->hasInterpFunc); + } return code; } @@ -5716,6 +5722,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkSelectStmtHasAggFuncs, &pNode->hasAggFuncs); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSelectStmtInterpFuncs, &pNode->hasInterpFunc); + } return code; } @@ -7793,6 +7802,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return physiIndefRowsFuncNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: return physiInterpFuncNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return physiDispatchNodeToJson(pObj, pJson); @@ -8151,6 +8161,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: return jsonToPhysiIndefRowsFuncNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: return jsonToPhysiInterpFuncNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return jsonToPhysiDispatchNode(pJson, pObj); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 54cf685235..926e5aa198 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3183,7 +3183,7 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = tlvEncodeBool(pEncoder, PHY_WINDOW_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY, pNode->destHasPrimayKey); + code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY, pNode->destHasPrimaryKey); } return code; @@ -3227,7 +3227,7 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock); break; case PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY: - code = tlvDecodeI8(pTlv, &pNode->destHasPrimayKey); + code = tlvDecodeI8(pTlv, &pNode->destHasPrimaryKey); break; default: break; @@ -4539,6 +4539,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = physiIndefRowsFuncNodeToMsg(pObj, pEncoder); break; case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: code = physiInterpFuncNodeToMsg(pObj, pEncoder); break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: @@ -4701,6 +4702,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { code = msgToPhysiIndefRowsFuncNode(pDecoder, pObj); break; case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: code = msgToPhysiInterpFuncNode(pDecoder, pObj); break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f118c15b7a..9d24b3512e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -723,6 +723,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSubplan)); case QUERY_NODE_PHYSICAL_PLAN: return makeNode(type, sizeof(SQueryPlan)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: + return makeNode(type, sizeof(SStreamInterpFuncPhysiNode)); default: break; } @@ -1652,7 +1654,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pPhyNode->pFuncs); break; } - case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: { + case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: { SInterpFuncPhysiNode* pPhyNode = (SInterpFuncPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyList(pPhyNode->pExprs); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b741db3ae6..f6072be962 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5496,8 +5496,15 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) { // single point interp every can be omitted } else { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, - "Missing RANGE clause, EVERY clause or FILL clause"); + if (pCxt->createStream) { + if (NULL == pSelect->pEvery || NULL == pSelect->pFill) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "Missing EVERY clause or FILL clause"); + } + } else { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, + "Missing RANGE clause, EVERY clause or FILL clause"); + } } } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 66fa405a3a..ea9c5df976 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -894,6 +894,15 @@ static bool isInterpFunc(int32_t funcId) { return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId); } +static void initStreamOption(SLogicPlanContext* pCxt, SStreamOption* pOption) { + pOption->triggerType = pCxt->pPlanCxt->triggerType; + pOption->watermark = pCxt->pPlanCxt->watermark; + pOption->deleteMark = pCxt->pPlanCxt->deleteMark; + pOption->igExpired = pCxt->pPlanCxt->igExpired; + pOption->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate; + pOption->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey; +} + static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasInterpFunc) { return TSDB_CODE_SUCCESS; @@ -934,6 +943,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = createColumnByRewriteExprs(pInterpFunc->pFuncs, &pInterpFunc->node.pTargets); } + if (TSDB_CODE_SUCCESS == code) { + initStreamOption(pCxt, &pInterpFunc->streamOption); + } + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pInterpFunc; } else { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 1b896e03c3..44fc9a3fbb 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1684,8 +1684,9 @@ static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) { - SInterpFuncPhysiNode* pInterpFunc = - (SInterpFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC); + SInterpFuncPhysiNode* pInterpFunc = (SInterpFuncPhysiNode*)makePhysiNode( + pCxt, (SLogicNode*)pFuncLogicNode, + pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC : QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC); if (NULL == pInterpFunc) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1728,6 +1729,10 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc); } + if (pCxt->pPlanCxt->streamQuery) { + pInterpFunc->streamOption = pFuncLogicNode->streamOption; + } + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pInterpFunc; } else { @@ -1869,7 +1874,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pWindow->deleteMark = pWindowLogicNode->deleteMark; pWindow->igExpired = pWindowLogicNode->igExpired; if (pCxt->pPlanCxt->streamQuery) { - pWindow->destHasPrimayKey = pCxt->pPlanCxt->destHasPrimaryKey; + pWindow->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey; } pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true); pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder; diff --git a/source/libs/planner/src/planValidator.c b/source/libs/planner/src/planValidator.c index 4fcd064e56..6b7b46cfa7 100755 --- a/source/libs/planner/src/planValidator.c +++ b/source/libs/planner/src/planValidator.c @@ -118,6 +118,7 @@ int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: break; case QUERY_NODE_PHYSICAL_SUBPLAN: return validateSubplanNode(pCxt, (SSubplan*)pNode); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6b81ac87ee..e0933f4fac 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -160,7 +160,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateClear_rocksdb(SStreamState* pState); -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurNext_rocksdb(SStreamStateCur* pCur); int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); @@ -205,6 +205,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState); // partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4f07faad25..3e0e19bdbd 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2817,7 +2817,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { return 0; } -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { +int32_t streamStateCurNext_rocksdb(SStreamStateCur* pCur) { if (!pCur) { return -1; } @@ -2982,6 +2982,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } + SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { stDebug("streamStateGetCur_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -3480,6 +3481,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } + +SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) { + SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX}; + return streamStateFillSeekKeyNext_rocksdb(pState, &key); +} + #ifdef BUILD_NO_CALL int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { stDebug("streamStateSessionGetKeyByRange_rocksdb"); @@ -3518,7 +3525,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } if (c > 0) { - streamStateCurNext_rocksdb(pState, pCur); + streamStateCurNext_rocksdb(pCur); code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; @@ -3565,7 +3572,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe goto _end; } taosMemoryFreeClear(*pVal); - streamStateCurNext_rocksdb(pState, pCur); + streamStateCurNext_rocksdb(pCur); } else { *key = originKey; streamStateFreeCur(pCur); @@ -3611,7 +3618,7 @@ int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { } taosMemoryFreeClear(buf); - streamStateCurNext_rocksdb(pState, pCur); + streamStateCurNext_rocksdb(pCur); } streamStateFreeCur(pCur); return -1; @@ -3642,7 +3649,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* goto _end; } - streamStateCurNext_rocksdb(pState, pCur); + streamStateCurNext_rocksdb(pCur); } else { *key = tmpKey; streamStateFreeCur(pCur); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 84db657392..feea9f048e 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -20,12 +20,10 @@ #include "tcommon.h" #include "tsimplehash.h" -typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos); - -int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) { +int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos); SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey; - return sessionWinKeyCmpr(pWin1, pWin2); + return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2); } int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) { @@ -595,7 +593,7 @@ int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { if (pCur && pCur->buffIndex >= 0) { pCur->buffIndex++; } else { - streamStateCurNext_rocksdb(NULL, pCur); + streamStateCurNext_rocksdb(pCur); } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c new file mode 100644 index 0000000000..001bb02622 --- /dev/null +++ b/source/libs/stream/src/streamSliceState.c @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstreamFileState.h" + +#include "query.h" +#include "streamBackendRocksdb.h" +#include "tcommon.h" +#include "tsimplehash.h" + +#define NUM_OF_FLUSED_WIN 64 + +int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { + SWinKey* pWin2 = taosArrayGet(pDatas, pos); + return winKeyCmprImpl((SWinKey*)pWin1, pWin2); +} + +int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) { + int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen); + SArray* pWinStates = NULL; + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + pWinStates = taosArrayInit(16, sizeof(SWinKey)); + tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + } + if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { + TSKEY ts = getFlushMark(pFileState); + SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; + void* pState = getStateFileStore(pFileState); + SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); + int32_t code = TSDB_CODE_SUCCESS; + for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && code == TSDB_CODE_SUCCESS; i++) { + SWinKey tmp = {.groupId = pKey->groupId}; + code = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0); + if (code != TSDB_CODE_SUCCESS) { + break; + } + taosArrayPush(pWinStates, &tmp); + code = streamStateCurPrev_rocksdb(pCur); + } + taosArraySort(pWinStates, winKeyCmprImpl); + streamStateFreeCur(pCur); + } + + int32_t size = taosArrayGetSize(pWinStates); + if (!isFlushedState(pFileState, pKey->ts, 0)) { + // find the first position which is smaller than the pKey + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (index == -1) { + index = 0; + } + taosArrayInsert(pWinStates, index, pKey); + } + return code; +} + +int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) { + void* pState = getStateFileStore(pFileState); + return streamStateFillGet_rocksdb(pState, pKey, data, pDataLen); +} + +int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { + void* pState = getStateFileStore(pFileState); + return streamStateFillDel_rocksdb(pState, pKey); +} + +void clearSearchBuff(SStreamFileState* pFileState) { + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + if (!pSearchBuff) { + return; + } + void* pIte = NULL; + int32_t iter = 0; + void* pBuff = getRowStateBuff(pFileState); + while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { + SArray* pWinStates = *((void**)pIte); + int32_t size = taosArrayGetSize(pWinStates); + if (size > 0) { + TSKEY ts = getFlushMark(pFileState); + SWinKey key = *(SWinKey*)taosArrayGet(pWinStates, 0); + key.ts = ts; + int32_t num = binarySearch(pWinStates, size, &key, fillStateKeyCompare); + if (size > NUM_OF_FLUSED_WIN) { + num = TMIN(num, size - NUM_OF_FLUSED_WIN); + taosArrayRemoveBatch(pWinStates, 0, num, NULL); + } + } + } +} + +int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { + SArray* pWinStates = NULL; + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void* pState = getStateFileStore(pFileState); + void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen); + streamStateFreeCur(pCur); + return code; + } + int32_t size = taosArrayGetSize(pWinStates); + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (index == -1) { + SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen); + streamStateFreeCur(pCur); + return code; + } else { + if (index == size - 1) { + return TSDB_CODE_FAILED; + } + SWinKey* pNext = taosArrayGet(pWinStates, index + 1); + *pResKey = *pNext; + return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen); + } + return TSDB_CODE_FAILED; +} + +int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { + SArray* pWinStates = NULL; + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void* pState = getStateFileStore(pFileState); + void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); + } else { + SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + streamStateFreeCur(pCur); + return code; + } + int32_t size = taosArrayGetSize(pWinStates); + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (index == -1 || index == 0) { + SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + streamStateFreeCur(pCur); + return code; + } else { + SWinKey* pNext = taosArrayGet(pWinStates, index - 1); + *pResKey = *pNext; + return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen); + } + return TSDB_CODE_FAILED; +} + diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a817070d5b..6bdfff2977 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -359,12 +359,23 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* // todo refactor int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB + if (pState->pFileState) { + return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen); + } return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); #else return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); #endif } +int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { + return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen); +} + +int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { + return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen); +} + // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { #ifdef USE_ROCKSDB diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index fb745f86cb..5c022c2a5b 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -27,6 +27,8 @@ #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024) #define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_RECOVER_ROW_BUFF 128 +#define MIN_NUM_SEARCH_BUCKET 128 +#define MAX_ARRAY_SIZE 1024 #define TASK_KEY "streamFileState" #define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" @@ -49,6 +51,7 @@ struct SStreamFileState { GetTsFun getTs; char* id; char* cfName; + void* searchBuff; _state_buff_cleanup_fn stateBuffCleanupFn; _state_buff_remove_fn stateBuffRemoveFn; @@ -91,7 +94,7 @@ int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateDel_rocksdb(pFileState->pFileStore, pKey); } -int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) { +int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) { return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); } @@ -107,7 +110,7 @@ int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) { return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey); } -int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) { +int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) { return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen); } @@ -160,7 +163,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileClearFn = streamStateClear_rocksdb; pFileState->cfName = taosStrdup("state"); pFileState->stateFunctionGetFn = getRowBuff; - } else { + } else if (type == STREAM_STATE_BUFF_SORT) { pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); pFileState->stateBuffCleanupFn = sessionWinStateCleanup; pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn; @@ -172,6 +175,19 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ pFileState->stateFileClearFn = streamStateSessionClear_rocksdb; pFileState->cfName = taosStrdup("sess"); pFileState->stateFunctionGetFn = getSessionRowBuff; + } else if (type == STREAM_STATE_BUFF_HASH_SORT) { + pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn); + pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn); + pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn; + pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn; + pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn; + pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey; + + pFileState->stateFileRemoveFn = hashSortFileRemoveFn; + pFileState->stateFileGetFn = hashSortFileGetFn; + pFileState->stateFileClearFn = NULL; + pFileState->cfName = taosStrdup("fill"); + pFileState->stateFunctionGetFn = NULL; } if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) { @@ -194,8 +210,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ // todo(liuyao) optimize if (type == STREAM_STATE_BUFF_HASH) { recoverSnapshot(pFileState, checkpointId); - } else { + } else if (type == STREAM_STATE_BUFF_SORT) { recoverSesssion(pFileState, checkpointId); + } else if (type == STREAM_STATE_BUFF_HASH_SORT) { + recoverFillSnapshot(pFileState, checkpointId); } void* valBuf = NULL; @@ -361,6 +379,11 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { } } + if (pFileState->searchBuff) { + clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount); + } + + flushSnapshot(pFileState, pFlushList, false); SListIter fIter = {0}; @@ -461,7 +484,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) { int32_t len = 0; void* p = NULL; - code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len); + code = pFileState->stateFileGetFn(pFileState->pFileStore, pKey, &p, &len); qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code); if (code == TSDB_CODE_SUCCESS) { memcpy(pNewPos->pRowBuff, p, len); @@ -597,6 +620,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStateClearBatch(batch); + clearSearchBuff(pFileState); + int64_t elapsed = taosGetTimestampMs() - st; qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms", pFileState->id, numOfElems, BATCH_LIMIT, elapsed); @@ -755,6 +780,7 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) { } void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; } +void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; } void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; } @@ -764,8 +790,56 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } +TSKEY getFlushMark(SStreamFileState* pFileState) {return pFileState->flushMark;}; + int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; } int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen); } + +int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { + int32_t code = TSDB_CODE_SUCCESS; + if (pFileState->maxTs != INT64_MIN) { + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) + ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + deleteExpiredCheckPoint(pFileState, mark); + } + + SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore); + if (pCur == NULL) { + return -1; + } + int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount); + while (code == TSDB_CODE_SUCCESS) { + if (pFileState->curRowCount >= recoverNum) { + break; + } + + void* pVal = NULL; + int32_t vlen = 0; + SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + code = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + if (code != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0) ) { + destroyRowBuffPos(pNewPos); + SListNode* pNode = tdListPopTail(pFileState->usedBuffs); + taosMemoryFreeClear(pNode); + taosMemoryFreeClear(pVal); + break; + } + ASSERT(vlen == pFileState->rowSize); + memcpy(pNewPos->pRowBuff, pVal, vlen); + taosMemoryFreeClear(pVal); + pNewPos->beFlushed = true; + code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + destroyRowBuffPos(pNewPos); + break; + } + code = streamStateCurPrev_rocksdb(pCur); + } + streamStateFreeCur(pCur); + + return TSDB_CODE_SUCCESS; +}