fix:add monitor result for show cluster variables

This commit is contained in:
wangmm0220 2024-06-23 18:17:50 +08:00
parent a6d7a0703b
commit d0ca79ddca
7 changed files with 196 additions and 24 deletions

View File

@ -11,7 +11,8 @@
SRWLatch monitorLock; SRWLatch monitorLock;
void* monitorTimer; void* monitorTimer;
SHashObj* monitorCounterHash; SHashObj* monitorCounterHash;
int32_t monitorStop = -1; int32_t slowLogFlag = -1;
int32_t monitorFlag = -1;
tsem2_t monitorSem; tsem2_t monitorSem;
STaosQueue* monitorQueue; STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash; SHashObj* monitorSlowLogHash;
@ -99,12 +100,12 @@ static int32_t tscMonitortInit() {
} }
static void tscMonitorStop() { static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&monitorStop, 0, 1)) { if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
uDebug("monitor thread already stopped"); uDebug("monitor thread already stopped");
return; return;
} }
while (atomic_load_32(&monitorStop) > 0) { while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100); taosMsleep(100);
} }
} }
@ -230,6 +231,11 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr
static void reportSendProcess(void* param, void* tmrId) { static void reportSendProcess(void* param, void* tmrId) {
taosRLockLatch(&monitorLock); taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
}
MonitorClient* pMonitor = (MonitorClient*)param; MonitorClient* pMonitor = (MonitorClient*)param;
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId); SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if(pInst == NULL){ if(pInst == NULL){
@ -244,7 +250,6 @@ static void reportSendProcess(void* param, void* tmrId) {
} }
static void sendAllSlowLog(){ static void sendAllSlowLog(){
taosRLockLatch(&monitorLock);
void* data = taosHashIterate(monitorSlowLogHash, NULL); void* data = taosHashIterate(monitorSlowLogHash, NULL);
while (data != NULL) { while (data != NULL) {
TdFilePtr pFile = (*(SlowLogClient**)data)->pFile; TdFilePtr pFile = (*(SlowLogClient**)data)->pFile;
@ -261,8 +266,6 @@ static void sendAllSlowLog(){
data = taosHashIterate(monitorSlowLogHash, data); data = taosHashIterate(monitorSlowLogHash, data);
} }
uDebug("[monitor] sendAllSlowLog when client close"); uDebug("[monitor] sendAllSlowLog when client close");
taosRUnLockLatch(&monitorLock);
} }
void monitorSendAllSlowLogFromTempDir(void* inst){ void monitorSendAllSlowLogFromTempDir(void* inst){
@ -331,7 +334,6 @@ END:
} }
static void sendAllCounter(){ static void sendAllCounter(){
taosRLockLatch(&monitorLock);
MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL); MonitorClient** ppMonitor = (MonitorClient**)taosHashIterate(monitorCounterHash, NULL);
while (ppMonitor != NULL) { while (ppMonitor != NULL) {
MonitorClient* pMonitor = *ppMonitor; MonitorClient* pMonitor = *ppMonitor;
@ -346,7 +348,6 @@ static void sendAllCounter(){
} }
ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor); ppMonitor = taosHashIterate(monitorCounterHash, ppMonitor);
} }
taosRUnLockLatch(&monitorLock);
} }
void monitorInit() { void monitorInit() {
@ -374,12 +375,18 @@ void monitorInit() {
void monitorClose() { void monitorClose() {
uInfo("[monitor] tscMonitor close"); uInfo("[monitor] tscMonitor close");
taosRLockLatch(&monitorLock);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
uDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop(); tscMonitorStop();
sendAllSlowLog(); sendAllSlowLog();
sendAllCounter(); sendAllCounter();
taosHashCleanup(monitorCounterHash); taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash); taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer); taosTmrCleanUp(monitorTimer);
taosRUnLockLatch(&monitorLock);
} }
void monitorCreateClient(int64_t clusterId) { void monitorCreateClient(int64_t clusterId) {
@ -436,6 +443,9 @@ void monitorCreateClient(int64_t clusterId) {
uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor); uInfo("[monitor] monitorCreateClient for %"PRIx64 "finished %p.", clusterId, pMonitor);
} }
taosWUnLockLatch(&monitorLock); taosWUnLockLatch(&monitorLock);
if (-1 != atomic_val_compare_exchange_32(&monitorFlag, -1, 0)) {
uDebug("[monitor] monitorFlag already is 0");
}
return; return;
fail: fail:
@ -499,10 +509,14 @@ void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
taosMemoryFree(pData->value); taosMemoryFree(pData->value);
} }
void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&monitorStop, -1); } void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
void reportSlowLog(void* param, void* tmrId) { void reportSlowLog(void* param, void* tmrId) {
taosRLockLatch(&monitorLock); taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
}
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param); SAppInstInfo* pInst = getAppInstByClusterId((int64_t)param);
if(pInst == NULL){ if(pInst == NULL){
uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param); uError("failed to get app inst, clusterId:%"PRIx64, (int64_t)param);
@ -592,7 +606,7 @@ void* monitorThreadFunc(void *param){
} }
#endif #endif
if (-1 != atomic_val_compare_exchange_32(&monitorStop, -1, 0)) { if (-1 != atomic_val_compare_exchange_32(&slowLogFlag, -1, 0)) {
return NULL; return NULL;
} }
@ -618,7 +632,7 @@ void* monitorThreadFunc(void *param){
return NULL; return NULL;
} }
while (1) { while (1) {
if (monitorStop > 0) break; if (slowLogFlag > 0) break;
MonitorSlowLogData* slowLogData = NULL; MonitorSlowLogData* slowLogData = NULL;
taosReadQitem(monitorQueue, (void**)&slowLogData); taosReadQitem(monitorQueue, (void**)&slowLogData);
@ -633,6 +647,6 @@ void* monitorThreadFunc(void *param){
taosCloseQueue(monitorQueue); taosCloseQueue(monitorQueue);
tsem2_destroy(&monitorSem); tsem2_destroy(&monitorSem);
monitorStop = -2; slowLogFlag = -2;
return NULL; return NULL;
} }

View File

@ -704,7 +704,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThresholdTest", tsSlowLogThresholdTest, 0, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 1, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 0, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "slowLogMaxLen", tsSlowLogMaxLen, 1, 16384, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", tsSlowLogScopeString, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; if (cfgAddString(pCfg, "slowLogScope", tsSlowLogScopeString, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;

View File

@ -1488,6 +1488,32 @@ static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) {
return 0; return 0;
} }
void getSlowLogScopeString(int32_t scope, char* result){
if(scope == SLOW_LOG_TYPE_NULL) {
strcat(result, "NONE");
return;
}
while(scope > 0){
if(scope & SLOW_LOG_TYPE_QUERY) {
strcat(result, "QUERY");
scope &= ~SLOW_LOG_TYPE_QUERY;
} else if(scope & SLOW_LOG_TYPE_INSERT) {
strcat(result, "INSERT");
scope &= ~SLOW_LOG_TYPE_INSERT;
} else if(scope & SLOW_LOG_TYPE_OTHERS) {
strcat(result, "OTHERS");
scope &= ~SLOW_LOG_TYPE_OTHERS;
} else{
printf("invalid slow log scope:%d", scope);
return;
}
if(scope > 0) {
strcat(result, "|");
}
}
}
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t totalRows = 0; int32_t totalRows = 0;
@ -1513,6 +1539,28 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset); snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
totalRows++; totalRows++;
cfgOpts[totalRows] = "monitor";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsEnableMonitor);
totalRows++;
cfgOpts[totalRows] = "monitorInterval";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsMonitorInterval);
totalRows++;
cfgOpts[totalRows] = "slowLogThreshold";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogThreshold);
totalRows++;
char scopeStr[64] = {0};
getSlowLogScopeString(tsSlowLogScope, scopeStr);
cfgOpts[totalRows] = "slowLogScope";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%s", scopeStr);
totalRows++;
cfgOpts[totalRows] = "slowLogMaxLen";
snprintf(cfgVals[totalRows], TSDB_CONFIG_VALUE_LEN, "%d", tsSlowLogMaxLen);
totalRows++;
char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; char bufVal[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};

View File

@ -16,3 +16,4 @@ add_subdirectory(stb)
add_subdirectory(topic) add_subdirectory(topic)
add_subdirectory(trans) add_subdirectory(trans)
#add_subdirectory(user) #add_subdirectory(user)
#add_subdirectory(mnode)

View File

@ -1,11 +1,11 @@
# aux_source_directory(. MNODE_MNODE_TEST_SRC) aux_source_directory(. MNODE_MNODE_TEST_SRC)
# add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC}) add_executable(mmnodeTest ${MNODE_MNODE_TEST_SRC})
# target_link_libraries( target_link_libraries(
# mmnodeTest mmnodeTest
# PUBLIC sut PUBLIC sut
# ) )
# add_test( add_test(
# NAME mmnodeTest NAME mmnodeTest
# COMMAND mmnodeTest COMMAND mmnodeTest
# ) )

View File

@ -281,3 +281,113 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
ASSERT_NE(retry, retryMax); ASSERT_NE(retry, retryMax);
} }
} }
#define SLOW_LOG_TYPE_NULL 0x0
#define SLOW_LOG_TYPE_QUERY 0x1
#define SLOW_LOG_TYPE_INSERT 0x2
#define SLOW_LOG_TYPE_OTHERS 0x4
#define SLOW_LOG_TYPE_ALL 0x7
void getSlowLogScopeString(int32_t scope, char* result){
if(scope == SLOW_LOG_TYPE_NULL) {
strcat(result, "NONE");
return;
}
while(scope > 0){
if(scope & SLOW_LOG_TYPE_QUERY) {
strcat(result, "QUERY");
scope &= ~SLOW_LOG_TYPE_QUERY;
} else if(scope & SLOW_LOG_TYPE_INSERT) {
strcat(result, "INSERT");
scope &= ~SLOW_LOG_TYPE_INSERT;
} else if(scope & SLOW_LOG_TYPE_OTHERS) {
strcat(result, "OTHERS");
scope &= ~SLOW_LOG_TYPE_OTHERS;
} else{
printf("invalid slow log scope:%d", scope);
return;
}
if(scope > 0) {
strcat(result, "|");
}
}
}
// Define test cases
TEST_F(MndTestMnode, ScopeIsNull) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_NULL, result);
// Assert
EXPECT_STREQ(result, "NONE");
}
TEST_F(MndTestMnode, ScopeIsQuery) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_QUERY, result);
// Assert
EXPECT_STREQ(result, "QUERY");
}
TEST_F(MndTestMnode, ScopeIsInsert) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_INSERT, result);
// Assert
EXPECT_STREQ(result, "INSERT");
}
TEST_F(MndTestMnode, ScopeIsOthers) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS, result);
// Assert
EXPECT_STREQ(result, "OTHERS");
}
TEST_F(MndTestMnode, ScopeIsMixed) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_OTHERS|SLOW_LOG_TYPE_INSERT, result);
// Assert
EXPECT_STREQ(result, "INSERT|OTHERS");
}
TEST_F(MndTestMnode, ScopeIsMixed1) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(SLOW_LOG_TYPE_ALL, result);
// Assert
EXPECT_STREQ(result, "QUERY|INSERT|OTHERS");
}
TEST_F(MndTestMnode, ScopeIsInvalid) {
// Arrange
char result[256] = {0};
// Act
getSlowLogScopeString(0xF000, result);
// Assert
EXPECT_STREQ(result, ""); // Expect an empty string since the scope is invalid
// You may also want to check if the error message is correctly logged
}

View File

@ -70,7 +70,6 @@ class TDTestCase(TBase):
"smlTagName tagname", "smlTagName tagname",
"smlTsDefaultName tsdef", "smlTsDefaultName tsdef",
"serverPort 6030", "serverPort 6030",
"slowLogScope insert",
"timezone tz", "timezone tz",
"tempDir /var/tmp" "tempDir /var/tmp"
] ]