From a48095d467e36faf431856d33a5a3c489fee7e33 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Tue, 18 Feb 2025 15:30:21 +0800 Subject: [PATCH] enh(streams): add options to control message and frame size Add options to allow users to specify message and frame size limits for event notifications. --- include/common/tglobal.h | 2 ++ source/common/src/tglobal.c | 14 +++++++++++++- source/dnode/vnode/src/tq/tqStreamNotify.c | 7 ++----- .../libs/executor/src/streamtimewindowoperator.c | 1 - 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bb3f0964de..1b59f1de8a 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -296,6 +296,8 @@ extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern bool tsStreamCoverage; extern int8_t tsS3EpNum; +extern int32_t tsStreamNotifyMessageSize; +extern int32_t tsStreamNotifyFrameSize; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 223418feed..05898ea26e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -356,6 +356,9 @@ int32_t tsMaxTsmaCalcDelay = 600; int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d void *pTimezoneNameMap = NULL; +int32_t tsStreamNotifyMessageSize = 8 * 1024; // KB, default 8MB +int32_t tsStreamNotifyFrameSize = 256; // KB, default 256KB + int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len); #define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \ @@ -965,6 +968,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyMessageSize", tsStreamNotifyMessageSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyFrameSize", tsStreamNotifyFrameSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); + // clang-format on // GRANT_CFG_ADD; @@ -1850,6 +1856,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize"); tsMinReservedMemorySize = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyMessageSize"); + tsStreamNotifyMessageSize = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyFrameSize"); + tsStreamNotifyFrameSize = pItem->i32; + // GRANT_CFG_GET; TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -3241,4 +3253,4 @@ int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len TAOS_RETURN(TSDB_CODE_INVALID_CFG_VALUE); } TAOS_RETURN(TSDB_CODE_SUCCESS); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c index 61200b189b..1c64c404e6 100644 --- a/source/dnode/vnode/src/tq/tqStreamNotify.c +++ b/source/dnode/vnode/src/tq/tqStreamNotify.c @@ -21,9 +21,6 @@ #endif #define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50 ms -#define STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB 8 * 1024 // 8 MB -#define STREAM_EVENT_NOTIFY_FRAME_SIZE 256 * 1024 // 256 KB - typedef struct SStreamNotifyHandle { TdThreadMutex mutex; #ifndef WINDOWS @@ -296,7 +293,7 @@ static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBl msgLen += varDataLen(val) + 1; } *nNotifyEvents += pDataBlock->info.rows; - if (msgLen >= STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB * 1024) { + if (msgLen >= tsStreamNotifyMessageSize * 1024) { break; } } @@ -381,7 +378,7 @@ static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) { } sentLen = 0; while (sentLen < totalLen) { - size_t chunkSize = TMIN(totalLen - sentLen, STREAM_EVENT_NOTIFY_FRAME_SIZE); + size_t chunkSize = TMIN(totalLen - sentLen, tsStreamNotifyFrameSize * 1024); if (sentLen == 0) { res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET); TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3b799eea23..fbb55301cd 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2977,7 +2977,6 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) { if (num == 0) { - int32_t winCode = TSDB_CODE_SUCCESS; code = setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin, &winCode); QUERY_CHECK_CODE(code, lino, _end);