From eda06081ffdf782a3f21415326b705b1dccc5732 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 16:55:37 +0800 Subject: [PATCH] enh(stream): refactor and serialize the attributes of history tasks. --- include/common/tcommon.h | 5 +++++ include/libs/stream/tstream.h | 11 ++++++++--- source/dnode/mnode/impl/src/mndStream.c | 4 ++-- source/dnode/vnode/src/inc/tsdb.h | 6 ------ source/libs/stream/src/streamTask.c | 14 ++++++++++++++ 5 files changed, 29 insertions(+), 11 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2f93f8c3e3..8a1e95a661 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,6 +54,11 @@ typedef struct SSessionKey { uint64_t groupId; } SSessionKey; +typedef struct SVersionRange { + uint64_t minVer; + uint64_t maxVer; +} SVersionRange; + static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin2 = (SWinKey*)pKey2; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6d54790b2f..2e2663f87a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -276,6 +276,11 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; } SStreamStatus; +typedef struct SHistoryDataRange { + SVersionRange range; + STimeWindow window; +} SHistoryDataRange; + struct SStreamTask { SStreamId id; int32_t totalLevel; @@ -290,9 +295,9 @@ struct SStreamTask { STaskExec exec; int8_t fillHistory; // fill history - int64_t ekey; // end ts key - int64_t endVer; // end version - SStreamId historyTaskId; + SHistoryDataRange dataRange; + SStreamId historyTaskId; + // children info SArray* childEpInfo; // SArray int32_t nextCheckId; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 81b985f515..1ce4ce2b7e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -476,7 +476,7 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } } - // persistent stream task for history data + // persistent stream task for already stored ts data if (pStream->conf.fillHistory) { level = taosArrayGetSize(pStream->pHTasksList); @@ -493,7 +493,6 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } } - return 0; } @@ -639,6 +638,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReq->head.vgId = htonl(pTask->nodeId); pReq->taskId = pTask->id.taskId; STransAction action = {0}; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index faf550ab75..f9dd80a056 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,7 +64,6 @@ typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; -typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; @@ -376,11 +375,6 @@ struct TSDBKEY { TSKEY ts; }; -struct SVersionRange { - uint64_t minVer; - uint64_t maxVer; -}; - typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d8ec11f44..4f883b76e4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -187,6 +187,13 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; + if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; + int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { @@ -240,6 +247,13 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; + if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; + if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; + if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; + int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));