From 87040f1822a2da07ef5c378cf1344614b07109f1 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Tue, 18 Feb 2025 17:13:35 +0800 Subject: [PATCH 1/2] enh(stream): replace magic numbers in u64toaFastLut with macros Replace magic numbers in the u64toaFastLut function with macros for better readability and maintainability. Add unit test to ensure the correctness of the refactor. --- source/util/src/tlog.c | 41 +++++++++++++++++++++++++++------------- source/util/test/log.cpp | 25 ++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 0e3e236b36..2460c7c650 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -1502,31 +1502,46 @@ bool taosAssertRelease(bool condition) { } #endif +#define NUM_BASE 100 +#define DIGIT_LENGTH 2 +#define MAX_DIGITS 24 + char* u64toaFastLut(uint64_t val, char* buf) { + // Look-up table for 2-digit numbers static const char* lut = "0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455" "5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899"; - char temp[24]; - char* p = temp; + char temp[MAX_DIGITS]; + char* p = temp + tListLen(temp); - while (val >= 100) { - strncpy(p, lut + (val % 100) * 2, 2); - val /= 100; - p += 2; + // Process the digits greater than or equal to 100 + while (val >= NUM_BASE) { + // Get the last 2 digits from the look-up table and add to the buffer + p -= DIGIT_LENGTH; + strncpy(p, lut + (val % NUM_BASE) * DIGIT_LENGTH, DIGIT_LENGTH); + val /= NUM_BASE; } + // Process the remaining 1 or 2 digits if (val >= 10) { - strncpy(p, lut + val * 2, 2); - p += 2; + // If the number is 10 or more, get the 2 digits from the look-up table + p -= DIGIT_LENGTH; + strncpy(p, lut + val * DIGIT_LENGTH, DIGIT_LENGTH); } else if (val > 0 || p == temp) { - *(p++) = val + '0'; + // If the number is less than 10, add the single digit to the buffer + p -= 1; + *p = val + '0'; } - while (p != temp) { - *buf++ = *--p; + int64_t len = temp + tListLen(temp) - p; + if (len > 0) { + memcpy(buf, p, len); + } else { + buf[0] = '0'; + len = 1; } + buf[len] = '\0'; - *buf = '\0'; - return buf; + return buf + len; } diff --git a/source/util/test/log.cpp b/source/util/test/log.cpp index 1899aac2c4..ae1be94e40 100644 --- a/source/util/test/log.cpp +++ b/source/util/test/log.cpp @@ -139,3 +139,28 @@ TEST(log, misc) { taosCloseLog(); } + +TEST(log, test_u64toa) { + char buf[64] = {0}; + char *p = buf; + + p = u64toaFastLut(0, buf); + EXPECT_EQ(p, buf + 1); + EXPECT_EQ(strcmp(buf, "0"), 0); + + p = u64toaFastLut(1, buf); + EXPECT_EQ(p, buf + 1); + EXPECT_EQ(strcmp(buf, "1"), 0); + + p = u64toaFastLut(12, buf); + EXPECT_EQ(p, buf + 2); + EXPECT_EQ(strcmp(buf, "12"), 0); + + p = u64toaFastLut(12345, buf); + EXPECT_EQ(p, buf + 5); + EXPECT_EQ(strcmp(buf, "12345"), 0); + + p = u64toaFastLut(1234567890, buf); + EXPECT_EQ(p, buf + 10); + EXPECT_EQ(strcmp(buf, "1234567890"), 0); +} From a48095d467e36faf431856d33a5a3c489fee7e33 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Tue, 18 Feb 2025 15:30:21 +0800 Subject: [PATCH 2/2] enh(streams): add options to control message and frame size Add options to allow users to specify message and frame size limits for event notifications. --- include/common/tglobal.h | 2 ++ source/common/src/tglobal.c | 14 +++++++++++++- source/dnode/vnode/src/tq/tqStreamNotify.c | 7 ++----- .../libs/executor/src/streamtimewindowoperator.c | 1 - 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bb3f0964de..1b59f1de8a 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -296,6 +296,8 @@ extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern bool tsStreamCoverage; extern int8_t tsS3EpNum; +extern int32_t tsStreamNotifyMessageSize; +extern int32_t tsStreamNotifyFrameSize; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 223418feed..05898ea26e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -356,6 +356,9 @@ int32_t tsMaxTsmaCalcDelay = 600; int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d void *pTimezoneNameMap = NULL; +int32_t tsStreamNotifyMessageSize = 8 * 1024; // KB, default 8MB +int32_t tsStreamNotifyFrameSize = 256; // KB, default 256KB + int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len); #define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \ @@ -965,6 +968,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyMessageSize", tsStreamNotifyMessageSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyFrameSize", tsStreamNotifyFrameSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL)); + // clang-format on // GRANT_CFG_ADD; @@ -1850,6 +1856,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize"); tsMinReservedMemorySize = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyMessageSize"); + tsStreamNotifyMessageSize = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyFrameSize"); + tsStreamNotifyFrameSize = pItem->i32; + // GRANT_CFG_GET; TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -3241,4 +3253,4 @@ int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len TAOS_RETURN(TSDB_CODE_INVALID_CFG_VALUE); } TAOS_RETURN(TSDB_CODE_SUCCESS); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c index 61200b189b..1c64c404e6 100644 --- a/source/dnode/vnode/src/tq/tqStreamNotify.c +++ b/source/dnode/vnode/src/tq/tqStreamNotify.c @@ -21,9 +21,6 @@ #endif #define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50 ms -#define STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB 8 * 1024 // 8 MB -#define STREAM_EVENT_NOTIFY_FRAME_SIZE 256 * 1024 // 256 KB - typedef struct SStreamNotifyHandle { TdThreadMutex mutex; #ifndef WINDOWS @@ -296,7 +293,7 @@ static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBl msgLen += varDataLen(val) + 1; } *nNotifyEvents += pDataBlock->info.rows; - if (msgLen >= STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB * 1024) { + if (msgLen >= tsStreamNotifyMessageSize * 1024) { break; } } @@ -381,7 +378,7 @@ static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) { } sentLen = 0; while (sentLen < totalLen) { - size_t chunkSize = TMIN(totalLen - sentLen, STREAM_EVENT_NOTIFY_FRAME_SIZE); + size_t chunkSize = TMIN(totalLen - sentLen, tsStreamNotifyFrameSize * 1024); if (sentLen == 0) { res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET); TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3b799eea23..fbb55301cd 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2977,7 +2977,6 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) { if (num == 0) { - int32_t winCode = TSDB_CODE_SUCCESS; code = setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin, &winCode); QUERY_CHECK_CODE(code, lino, _end);