enh(streams): add options to control message and frame size

Add options to allow users to specify message and frame size limits for
event notifications.
This commit is contained in:
Jinqing Kuang 2025-02-18 15:30:21 +08:00
parent 87040f1822
commit a48095d467
4 changed files with 17 additions and 7 deletions

View File

@ -296,6 +296,8 @@ extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
extern bool tsStreamCoverage;
extern int8_t tsS3EpNum;
extern int32_t tsStreamNotifyMessageSize;
extern int32_t tsStreamNotifyFrameSize;
extern bool tsExperimental;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -356,6 +356,9 @@ int32_t tsMaxTsmaCalcDelay = 600;
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
void *pTimezoneNameMap = NULL;
int32_t tsStreamNotifyMessageSize = 8 * 1024; // KB, default 8MB
int32_t tsStreamNotifyFrameSize = 256; // KB, default 256KB
int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len);
#define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \
@ -965,6 +968,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyMessageSize", tsStreamNotifyMessageSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyFrameSize", tsStreamNotifyFrameSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL));
// clang-format on
// GRANT_CFG_ADD;
@ -1850,6 +1856,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize");
tsMinReservedMemorySize = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyMessageSize");
tsStreamNotifyMessageSize = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyFrameSize");
tsStreamNotifyFrameSize = pItem->i32;
// GRANT_CFG_GET;
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -3241,4 +3253,4 @@ int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len
TAOS_RETURN(TSDB_CODE_INVALID_CFG_VALUE);
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
}

View File

@ -21,9 +21,6 @@
#endif
#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50 ms
#define STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB 8 * 1024 // 8 MB
#define STREAM_EVENT_NOTIFY_FRAME_SIZE 256 * 1024 // 256 KB
typedef struct SStreamNotifyHandle {
TdThreadMutex mutex;
#ifndef WINDOWS
@ -296,7 +293,7 @@ static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBl
msgLen += varDataLen(val) + 1;
}
*nNotifyEvents += pDataBlock->info.rows;
if (msgLen >= STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB * 1024) {
if (msgLen >= tsStreamNotifyMessageSize * 1024) {
break;
}
}
@ -381,7 +378,7 @@ static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
}
sentLen = 0;
while (sentLen < totalLen) {
size_t chunkSize = TMIN(totalLen - sentLen, STREAM_EVENT_NOTIFY_FRAME_SIZE);
size_t chunkSize = TMIN(totalLen - sentLen, tsStreamNotifyFrameSize * 1024);
if (sentLen == 0) {
res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);

View File

@ -2977,7 +2977,6 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray,
if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) {
if (num == 0) {
int32_t winCode = TSDB_CODE_SUCCESS;
code = setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin,
&winCode);
QUERY_CHECK_CODE(code, lino, _end);