Merge pull request #29822 from taosdata/kjq/enh-code-clarity
enh(stream): add options to control message and frame size
This commit is contained in:
commit
6735e5e948
|
@ -296,6 +296,8 @@ extern int32_t tsMaxStreamBackendCache;
|
||||||
extern int32_t tsPQSortMemThreshold;
|
extern int32_t tsPQSortMemThreshold;
|
||||||
extern bool tsStreamCoverage;
|
extern bool tsStreamCoverage;
|
||||||
extern int8_t tsS3EpNum;
|
extern int8_t tsS3EpNum;
|
||||||
|
extern int32_t tsStreamNotifyMessageSize;
|
||||||
|
extern int32_t tsStreamNotifyFrameSize;
|
||||||
|
|
||||||
extern bool tsExperimental;
|
extern bool tsExperimental;
|
||||||
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||||
|
|
|
@ -356,6 +356,9 @@ int32_t tsMaxTsmaCalcDelay = 600;
|
||||||
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
|
int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d
|
||||||
void *pTimezoneNameMap = NULL;
|
void *pTimezoneNameMap = NULL;
|
||||||
|
|
||||||
|
int32_t tsStreamNotifyMessageSize = 8 * 1024; // KB, default 8MB
|
||||||
|
int32_t tsStreamNotifyFrameSize = 256; // KB, default 256KB
|
||||||
|
|
||||||
int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len);
|
int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len);
|
||||||
|
|
||||||
#define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \
|
#define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \
|
||||||
|
@ -965,6 +968,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
|
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "minDiskFreeSize", tsMinDiskFreeSize, TFS_MIN_DISK_FREE_SIZE, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_LOCAL));
|
||||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
|
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableWhiteList", tsEnableWhiteList, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
|
||||||
|
|
||||||
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyMessageSize", tsStreamNotifyMessageSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL));
|
||||||
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "streamNotifyFrameSize", tsStreamNotifyFrameSize, 8, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL));
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// GRANT_CFG_ADD;
|
// GRANT_CFG_ADD;
|
||||||
|
@ -1850,6 +1856,12 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize");
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize");
|
||||||
tsMinReservedMemorySize = pItem->i32;
|
tsMinReservedMemorySize = pItem->i32;
|
||||||
|
|
||||||
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyMessageSize");
|
||||||
|
tsStreamNotifyMessageSize = pItem->i32;
|
||||||
|
|
||||||
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamNotifyFrameSize");
|
||||||
|
tsStreamNotifyFrameSize = pItem->i32;
|
||||||
|
|
||||||
// GRANT_CFG_GET;
|
// GRANT_CFG_GET;
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
@ -3241,4 +3253,4 @@ int32_t taosCheckCfgStrValueLen(const char *name, const char *value, int32_t len
|
||||||
TAOS_RETURN(TSDB_CODE_INVALID_CFG_VALUE);
|
TAOS_RETURN(TSDB_CODE_INVALID_CFG_VALUE);
|
||||||
}
|
}
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,9 +21,6 @@
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50 ms
|
#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50 ms
|
||||||
#define STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB 8 * 1024 // 8 MB
|
|
||||||
#define STREAM_EVENT_NOTIFY_FRAME_SIZE 256 * 1024 // 256 KB
|
|
||||||
|
|
||||||
typedef struct SStreamNotifyHandle {
|
typedef struct SStreamNotifyHandle {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
#ifndef WINDOWS
|
#ifndef WINDOWS
|
||||||
|
@ -296,7 +293,7 @@ static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBl
|
||||||
msgLen += varDataLen(val) + 1;
|
msgLen += varDataLen(val) + 1;
|
||||||
}
|
}
|
||||||
*nNotifyEvents += pDataBlock->info.rows;
|
*nNotifyEvents += pDataBlock->info.rows;
|
||||||
if (msgLen >= STREAM_EVENT_NOTIFY_MESSAAGE_SIZE_KB * 1024) {
|
if (msgLen >= tsStreamNotifyMessageSize * 1024) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -381,7 +378,7 @@ static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
|
||||||
}
|
}
|
||||||
sentLen = 0;
|
sentLen = 0;
|
||||||
while (sentLen < totalLen) {
|
while (sentLen < totalLen) {
|
||||||
size_t chunkSize = TMIN(totalLen - sentLen, STREAM_EVENT_NOTIFY_FRAME_SIZE);
|
size_t chunkSize = TMIN(totalLen - sentLen, tsStreamNotifyFrameSize * 1024);
|
||||||
if (sentLen == 0) {
|
if (sentLen == 0) {
|
||||||
res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET);
|
res = curl_ws_send(pHandle->curl, msg, chunkSize, &nbytes, totalLen, CURLWS_TEXT | CURLWS_OFFSET);
|
||||||
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
|
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
|
||||||
|
|
|
@ -2977,7 +2977,6 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray,
|
||||||
|
|
||||||
if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) {
|
if (winCode == TSDB_CODE_SUCCESS && inWinRange(&pWinKey->win, &childWin.sessionWin.win)) {
|
||||||
if (num == 0) {
|
if (num == 0) {
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
|
||||||
code = setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin,
|
code = setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin,
|
||||||
&winCode);
|
&winCode);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
|
@ -1502,31 +1502,46 @@ bool taosAssertRelease(bool condition) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define NUM_BASE 100
|
||||||
|
#define DIGIT_LENGTH 2
|
||||||
|
#define MAX_DIGITS 24
|
||||||
|
|
||||||
char* u64toaFastLut(uint64_t val, char* buf) {
|
char* u64toaFastLut(uint64_t val, char* buf) {
|
||||||
|
// Look-up table for 2-digit numbers
|
||||||
static const char* lut =
|
static const char* lut =
|
||||||
"0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455"
|
"0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455"
|
||||||
"5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899";
|
"5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899";
|
||||||
|
|
||||||
char temp[24];
|
char temp[MAX_DIGITS];
|
||||||
char* p = temp;
|
char* p = temp + tListLen(temp);
|
||||||
|
|
||||||
while (val >= 100) {
|
// Process the digits greater than or equal to 100
|
||||||
strncpy(p, lut + (val % 100) * 2, 2);
|
while (val >= NUM_BASE) {
|
||||||
val /= 100;
|
// Get the last 2 digits from the look-up table and add to the buffer
|
||||||
p += 2;
|
p -= DIGIT_LENGTH;
|
||||||
|
strncpy(p, lut + (val % NUM_BASE) * DIGIT_LENGTH, DIGIT_LENGTH);
|
||||||
|
val /= NUM_BASE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process the remaining 1 or 2 digits
|
||||||
if (val >= 10) {
|
if (val >= 10) {
|
||||||
strncpy(p, lut + val * 2, 2);
|
// If the number is 10 or more, get the 2 digits from the look-up table
|
||||||
p += 2;
|
p -= DIGIT_LENGTH;
|
||||||
|
strncpy(p, lut + val * DIGIT_LENGTH, DIGIT_LENGTH);
|
||||||
} else if (val > 0 || p == temp) {
|
} else if (val > 0 || p == temp) {
|
||||||
*(p++) = val + '0';
|
// If the number is less than 10, add the single digit to the buffer
|
||||||
|
p -= 1;
|
||||||
|
*p = val + '0';
|
||||||
}
|
}
|
||||||
|
|
||||||
while (p != temp) {
|
int64_t len = temp + tListLen(temp) - p;
|
||||||
*buf++ = *--p;
|
if (len > 0) {
|
||||||
|
memcpy(buf, p, len);
|
||||||
|
} else {
|
||||||
|
buf[0] = '0';
|
||||||
|
len = 1;
|
||||||
}
|
}
|
||||||
|
buf[len] = '\0';
|
||||||
|
|
||||||
*buf = '\0';
|
return buf + len;
|
||||||
return buf;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,3 +139,28 @@ TEST(log, misc) {
|
||||||
|
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(log, test_u64toa) {
|
||||||
|
char buf[64] = {0};
|
||||||
|
char *p = buf;
|
||||||
|
|
||||||
|
p = u64toaFastLut(0, buf);
|
||||||
|
EXPECT_EQ(p, buf + 1);
|
||||||
|
EXPECT_EQ(strcmp(buf, "0"), 0);
|
||||||
|
|
||||||
|
p = u64toaFastLut(1, buf);
|
||||||
|
EXPECT_EQ(p, buf + 1);
|
||||||
|
EXPECT_EQ(strcmp(buf, "1"), 0);
|
||||||
|
|
||||||
|
p = u64toaFastLut(12, buf);
|
||||||
|
EXPECT_EQ(p, buf + 2);
|
||||||
|
EXPECT_EQ(strcmp(buf, "12"), 0);
|
||||||
|
|
||||||
|
p = u64toaFastLut(12345, buf);
|
||||||
|
EXPECT_EQ(p, buf + 5);
|
||||||
|
EXPECT_EQ(strcmp(buf, "12345"), 0);
|
||||||
|
|
||||||
|
p = u64toaFastLut(1234567890, buf);
|
||||||
|
EXPECT_EQ(p, buf + 10);
|
||||||
|
EXPECT_EQ(strcmp(buf, "1234567890"), 0);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue