refactor mgt

This commit is contained in:
yihaoDeng 2024-07-29 19:41:44 +08:00
parent b9a0164fda
commit 00cc66eb15
20 changed files with 287 additions and 243 deletions

View File

@ -84,7 +84,7 @@ int32_t taosWTryLockLatch(SRWLatch *pLatch);
int32_t old_ = atomic_add_fetch_32((x), 0); \ int32_t old_ = atomic_add_fetch_32((x), 0); \
if (old_ & 0x00000001) { \ if (old_ & 0x00000001) { \
if (i_ % 1000 == 0) { \ if (i_ % 1000 == 0) { \
sched_yield(); \ (void)sched_yield(); \
} \ } \
continue; \ continue; \
} }
@ -98,9 +98,9 @@ int32_t taosWTryLockLatch(SRWLatch *pLatch);
#define taosCorBeginWrite(x) \ #define taosCorBeginWrite(x) \
taosCorBeginRead(x) if (atomic_val_compare_exchange_32((x), old_, old_ + 1) != old_) { continue; } taosCorBeginRead(x) if (atomic_val_compare_exchange_32((x), old_, old_ + 1) != old_) { continue; }
#define taosCorEndWrite(x) \ #define taosCorEndWrite(x) \
atomic_add_fetch_32((x), 1); \ (void)atomic_add_fetch_32((x), 1); \
break; \ break; \
} }
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -32,10 +32,10 @@ static void removeEmptyDir() {
empty = false; empty = false;
} }
if (empty) taosRemoveDir(filename); if (empty) taosRemoveDir(filename);
taosCloseDir(&pDirTmp); (void)taosCloseDir(&pDirTmp);
} }
taosCloseDir(&pDir); (void)taosCloseDir(&pDir);
} }
#ifdef WINDOWS #ifdef WINDOWS
@ -92,12 +92,12 @@ static int32_t generateConfigFile(char* confDir) {
uDebug("[rsync] conf:%s", confContent); uDebug("[rsync] conf:%s", confContent);
if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) { if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) {
uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
return code; return code;
} }
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
return 0; return 0;
} }

View File

@ -80,11 +80,11 @@ static void dmSetAssert(int32_t signum, void *sigInfo, void *context) { tsAssert
static void dmStopDnode(int signum, void *sigInfo, void *context) { static void dmStopDnode(int signum, void *sigInfo, void *context) {
// taosIgnSignal(SIGUSR1); // taosIgnSignal(SIGUSR1);
// taosIgnSignal(SIGUSR2); // taosIgnSignal(SIGUSR2);
taosIgnSignal(SIGTERM); (void)taosIgnSignal(SIGTERM);
taosIgnSignal(SIGHUP); (void)taosIgnSignal(SIGHUP);
taosIgnSignal(SIGINT); (void)taosIgnSignal(SIGINT);
taosIgnSignal(SIGABRT); (void)taosIgnSignal(SIGABRT);
taosIgnSignal(SIGBREAK); (void)taosIgnSignal(SIGBREAK);
dInfo("shut down signal is %d", signum); dInfo("shut down signal is %d", signum);
#ifndef WINDOWS #ifndef WINDOWS
@ -102,11 +102,11 @@ void dmLogCrash(int signum, void *sigInfo, void *context) {
// taosIgnSignal(SIGBREAK); // taosIgnSignal(SIGBREAK);
#ifndef WINDOWS #ifndef WINDOWS
taosIgnSignal(SIGBUS); (void)taosIgnSignal(SIGBUS);
#endif #endif
taosIgnSignal(SIGABRT); (void)taosIgnSignal(SIGABRT);
taosIgnSignal(SIGFPE); (void)taosIgnSignal(SIGFPE);
taosIgnSignal(SIGSEGV); (void)taosIgnSignal(SIGSEGV);
char *pMsg = NULL; char *pMsg = NULL;
const char *flags = "UTL FATAL "; const char *flags = "UTL FATAL ";
@ -135,23 +135,23 @@ _return:
} }
static void dmSetSignalHandle() { static void dmSetSignalHandle() {
taosSetSignal(SIGUSR1, dmSetDebugFlag); (void)taosSetSignal(SIGUSR1, dmSetDebugFlag);
taosSetSignal(SIGUSR2, dmSetAssert); (void)taosSetSignal(SIGUSR2, dmSetAssert);
taosSetSignal(SIGTERM, dmStopDnode); (void)taosSetSignal(SIGTERM, dmStopDnode);
taosSetSignal(SIGHUP, dmStopDnode); (void)taosSetSignal(SIGHUP, dmStopDnode);
taosSetSignal(SIGINT, dmStopDnode); (void)taosSetSignal(SIGINT, dmStopDnode);
taosSetSignal(SIGBREAK, dmStopDnode); (void)taosSetSignal(SIGBREAK, dmStopDnode);
#ifndef WINDOWS #ifndef WINDOWS
taosSetSignal(SIGTSTP, dmStopDnode); (void)taosSetSignal(SIGTSTP, dmStopDnode);
taosSetSignal(SIGQUIT, dmStopDnode); (void)taosSetSignal(SIGQUIT, dmStopDnode);
#endif #endif
#ifndef WINDOWS #ifndef WINDOWS
taosSetSignal(SIGBUS, dmLogCrash); (void)taosSetSignal(SIGBUS, dmLogCrash);
#endif #endif
taosSetSignal(SIGABRT, dmLogCrash); (void)taosSetSignal(SIGABRT, dmLogCrash);
taosSetSignal(SIGFPE, dmLogCrash); (void)taosSetSignal(SIGFPE, dmLogCrash);
taosSetSignal(SIGSEGV, dmLogCrash); (void)taosSetSignal(SIGSEGV, dmLogCrash);
} }
static int32_t dmParseArgs(int32_t argc, char const *argv[]) { static int32_t dmParseArgs(int32_t argc, char const *argv[]) {

View File

@ -21,32 +21,45 @@
extern SConfig *tsCfg; extern SConfig *tsCfg;
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
int32_t code = 0;
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set local info, dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosThreadRwlockWrlock(&pMgmt->pData->lock); (void)taosThreadRwlockWrlock(&pMgmt->pData->lock);
pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->dnodeId = pCfg->dnodeId;
pMgmt->pData->clusterId = pCfg->clusterId; pMgmt->pData->clusterId = pCfg->clusterId;
dmWriteEps(pMgmt->pData); code = dmWriteEps(pMgmt->pData);
taosThreadRwlockUnlock(&pMgmt->pData->lock); if (code != 0) {
dInfo("failed to set local info, dnodeId:%d clusterId:%" PRId64 " reason:%s", pCfg->dnodeId, pCfg->clusterId,
tstrerror(code));
}
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
} }
} }
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
int32_t code = 0;
dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver); dDebug("ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
if (pMgmt->pData->ipWhiteVer == ver) { if (pMgmt->pData->ipWhiteVer == ver) {
if (ver == 0) { if (ver == 0) {
dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver); dDebug("disable ip-white-list on dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->pData->ipWhiteVer, ver);
rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL); rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
// pMgmt->ipWhiteVer = ver;
} }
return; return;
} }
int64_t oldVer = pMgmt->pData->ipWhiteVer; int64_t oldVer = pMgmt->pData->ipWhiteVer;
// pMgmt->ipWhiteVer = ver;
SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer}; SRetrieveIpWhiteReq req = {.ipWhiteVer = oldVer};
int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req); int32_t contLen = tSerializeRetrieveIpWhite(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); if (contLen < 0) {
tSerializeRetrieveIpWhite(pHead, contLen, &req); dError("failed to serialize ip white list request since: %s", tstrerror(contLen));
return;
}
void *pHead = rpcMallocCont(contLen);
contLen = tSerializeRetrieveIpWhite(pHead, contLen, &req);
if (contLen < 0) {
rpcFreeCont(pHead);
dError("failed to serialize ip white list request since:%s", tstrerror(contLen));
return;
}
SRpcMsg rpcMsg = {.pCont = pHead, SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen, .contLen = contLen,
@ -57,9 +70,12 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
.info.handle = 0}; .info.handle = 0};
SEpSet epset = {0}; SEpSet epset = {0};
dmGetMnodeEpSet(pMgmt->pData, &epset); (void)dmGetMnodeEpSet(pMgmt->pData, &epset);
rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL); code = rpcSendRequest(pMgmt->msgCb.clientRpc, &epset, &rpcMsg, NULL);
if (code != 0) {
dError("failed to send retrieve ip white list request since:%s", tstrerror(code));
}
} }
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId; const STraceId *trace = &pRsp->info.traceId;
@ -72,7 +88,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
pMgmt->pData->dropped = 1; pMgmt->pData->dropped = 1;
dmWriteEps(pMgmt->pData); dmWriteEps(pMgmt->pData);
dInfo("dnode will exit since it is in the dropped state"); dInfo("dnode will exit since it is in the dropped state");
raise(SIGINT); (void)raise(SIGINT);
} }
} else { } else {
SStatusRsp statusRsp = {0}; SStatusRsp statusRsp = {0};
@ -93,9 +109,10 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
} }
void dmSendStatusReq(SDnodeMgmt *pMgmt) { void dmSendStatusReq(SDnodeMgmt *pMgmt) {
int32_t code = 0;
SStatusReq req = {0}; SStatusReq req = {0};
taosThreadRwlockRdlock(&pMgmt->pData->lock); (void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.sver = tsVersion; req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer; req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->pData->dnodeId; req.dnodeId = pMgmt->pData->dnodeId;
@ -129,7 +146,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosThreadRwlockUnlock(&pMgmt->pData->lock); (void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
SMonVloadInfo vinfo = {0}; SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo); (*pMgmt->getVnodeLoadsFp)(&vinfo);
@ -146,8 +163,17 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.ipWhiteVer = pMgmt->pData->ipWhiteVer; req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen); if (contLen < 0) {
dError("failed to serialize status req since %s", tstrerror(contLen));
return;
}
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req); tSerializeSStatusReq(pHead, contLen, &req);
if (contLen < 0) {
rpcFreeCont(pHead);
dError("failed to serialize status req since %s", tstrerror(contLen));
return;
}
tFreeSStatusReq(&req); tFreeSStatusReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead, SRpcMsg rpcMsg = {.pCont = pHead,
@ -163,8 +189,15 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SEpSet epSet = {0}; SEpSet epSet = {0};
int8_t epUpdated = 0; int8_t epUpdated = 0;
dmGetMnodeEpSet(pMgmt->pData, &epSet); (void)dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) {
dError("failed to send status req since %s", tstrerror(code));
return;
}
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[512]; char tbuf[512];

View File

@ -266,22 +266,22 @@ static void *dmCrashReportThreadFp(void *param) {
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) { int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create status thread since %s", tstrerror(code)); dError("failed to create status thread since %s", tstrerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-status", "initialized"); tmsgReportStartup("dnode-status", "initialized");
return 0; return 0;
} }
void dmStopStatusThread(SDnodeMgmt *pMgmt) { void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) { if (taosCheckPthreadValid(pMgmt->statusThread)) {
taosThreadJoin(pMgmt->statusThread, NULL); (void)taosThreadJoin(pMgmt->statusThread, NULL);
taosThreadClear(&pMgmt->statusThread); taosThreadClear(&pMgmt->statusThread);
} }
} }
@ -289,40 +289,40 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) { int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create notify thread since %s", strerror(code)); dError("failed to create notify thread since %s", strerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-notify", "initialized"); tmsgReportStartup("dnode-notify", "initialized");
return 0; return 0;
} }
void dmStopNotifyThread(SDnodeMgmt *pMgmt) { void dmStopNotifyThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->notifyThread)) { if (taosCheckPthreadValid(pMgmt->notifyThread)) {
tsem_post(&dmNotifyHdl.sem); (void)tsem_post(&dmNotifyHdl.sem);
taosThreadJoin(pMgmt->notifyThread, NULL); (void)taosThreadJoin(pMgmt->notifyThread, NULL);
taosThreadClear(&pMgmt->notifyThread); taosThreadClear(&pMgmt->notifyThread);
} }
tsem_destroy(&dmNotifyHdl.sem); (void)tsem_destroy(&dmNotifyHdl.sem);
} }
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create monitor thread since %s", tstrerror(code)); dError("failed to create monitor thread since %s", tstrerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-monitor", "initialized"); tmsgReportStartup("dnode-monitor", "initialized");
return 0; return 0;
} }
@ -330,30 +330,30 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create audit thread since %s", tstrerror(code)); dError("failed to create audit thread since %s", tstrerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-audit", "initialized"); tmsgReportStartup("dnode-audit", "initialized");
return 0; return 0;
} }
void dmStopMonitorThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) { if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL); (void)taosThreadJoin(pMgmt->monitorThread, NULL);
taosThreadClear(&pMgmt->monitorThread); (void)taosThreadClear(&pMgmt->monitorThread);
} }
} }
void dmStopAuditThread(SDnodeMgmt *pMgmt) { void dmStopAuditThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->auditThread)) { if (taosCheckPthreadValid(pMgmt->auditThread)) {
taosThreadJoin(pMgmt->auditThread, NULL); (void)taosThreadJoin(pMgmt->auditThread, NULL);
taosThreadClear(&pMgmt->auditThread); (void)taosThreadClear(&pMgmt->auditThread);
} }
} }
@ -364,15 +364,15 @@ int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
} }
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create crashReport thread since %s", tstrerror(code)); dError("failed to create crashReport thread since %s", tstrerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-crashReport", "initialized"); tmsgReportStartup("dnode-crashReport", "initialized");
return 0; return 0;
} }
@ -383,8 +383,8 @@ void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
} }
if (taosCheckPthreadValid(pMgmt->crashReportThread)) { if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
taosThreadJoin(pMgmt->crashReportThread, NULL); (void)taosThreadJoin(pMgmt->crashReportThread, NULL);
taosThreadClear(&pMgmt->crashReportThread); (void)taosThreadClear(&pMgmt->crashReportThread);
} }
} }
@ -454,7 +454,11 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info, .info = pMsg->info,
}; };
rpcSendResponse(&rsp);
code = rpcSendResponse(&rsp);
if (code != 0) {
dError("failed to send response since %s", tstrerror(code));
}
} }
dTrace("msg:%p, is freed, code:0x%x", pMsg, code); dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
@ -463,6 +467,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
} }
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
int32_t code = 0;
SSingleWorkerCfg cfg = { SSingleWorkerCfg cfg = {
.min = 1, .min = 1,
.max = 1, .max = 1,
@ -470,9 +475,9 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
.fp = (FItem)dmProcessMgmtQueue, .fp = (FItem)dmProcessMgmtQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { if ((code = tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg)) != 0) {
dError("failed to start dnode-mgmt worker since %s", terrstr()); dError("failed to start dnode-mgmt worker since %s", tstrerror(code));
return -1; return code;
} }
dDebug("dnode workers are initialized"); dDebug("dnode workers are initialized");
@ -487,6 +492,5 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker; SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); return taosWriteQitem(pWorker->queue, pMsg);
return 0;
} }

View File

@ -17,12 +17,12 @@
#include "mmInt.h" #include "mmInt.h"
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant); (void)mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->stb, &pInfo->grant);
} }
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1; pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load); (void)mndGetLoad(pMgmt->pMnode, &pInfo->load);
} }
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {

View File

@ -68,7 +68,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
taosThreadRwlockDestroy(&pMgmt->lock); (void)taosThreadRwlockDestroy(&pMgmt->lock);
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
} }
@ -107,7 +107,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue; pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL); (void)taosThreadRwlockInit(&pMgmt->lock, NULL);
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if ((code = mmReadFile(pMgmt->path, &option)) != 0) { if ((code = mmReadFile(pMgmt->path, &option)) != 0) {
@ -163,9 +163,9 @@ static int32_t mmStart(SMnodeMgmt *pMgmt) {
static void mmStop(SMnodeMgmt *pMgmt) { static void mmStop(SMnodeMgmt *pMgmt) {
dDebug("mnode-mgmt start to stop"); dDebug("mnode-mgmt start to stop");
mndPreClose(pMgmt->pMnode); mndPreClose(pMgmt->pMnode);
taosThreadRwlockWrlock(&pMgmt->lock); (void)taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1; pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
mndStop(pMgmt->pMnode); mndStop(pMgmt->pMnode);
} }

View File

@ -20,20 +20,20 @@
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) { if (pMgmt->stopped) {
code = -1; code = -1;
} else { } else {
atomic_add_fetch_32(&pMgmt->refCount, 1); (void)atomic_add_fetch_32(&pMgmt->refCount, 1);
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
return code; return code;
} }
static inline void mmRelease(SMnodeMgmt *pMgmt) { static inline void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1); (void)atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
} }
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
@ -100,16 +100,16 @@ static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) { static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
int32_t code = 0;
if (mmAcquire(pMgmt) == 0) { if ((code = mmAcquire(pMgmt)) == 0) {
dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); code = taosWriteQitem(pWorker->queue, pMsg);
mmRelease(pMgmt); mmRelease(pMgmt);
return 0; return code;
} else { } else {
dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, tstrerror(code),
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
return -1; return code;
} }
} }

View File

@ -18,13 +18,13 @@
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) { void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
SQnodeLoad qload = {0}; SQnodeLoad qload = {0};
qndGetLoad(pMgmt->pQnode, &qload); (void)qndGetLoad(pMgmt->pQnode, &qload);
qload.dnodeId = pMgmt->pData->dnodeId; qload.dnodeId = pMgmt->pData->dnodeId;
} }
void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) { void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
qndGetLoad(pMgmt->pQnode, pInfo); (void)qndGetLoad(pMgmt->pQnode, pInfo);
pInfo->dnodeId = pMgmt->pData->dnodeId; pInfo->dnodeId = pMgmt->pData->dnodeId;
} }

View File

@ -23,7 +23,7 @@ static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info, .info = pMsg->info,
}; };
tmsgSendRsp(&rsp); (void)tmsgSendRsp(&rsp);
} }
static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
@ -43,8 +43,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); return taosWriteQitem(pWorker->queue, pMsg);
return 0;
} }
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
@ -69,18 +68,18 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen); dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg); code = taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0; return code;
case READ_QUEUE: case READ_QUEUE:
case FETCH_QUEUE: case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen); dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); code = taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0; return code;
default: default:
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
return -1; return terrno;
} }
} }

View File

@ -23,7 +23,7 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
.info = pMsg->info, .info = pMsg->info,
}; };
tmsgSendRsp(&rsp); (void)tmsgSendRsp(&rsp);
} }
static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
@ -31,7 +31,7 @@ static void smProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for (int32_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); (void)taosGetQitem(qall, (void **)&pMsg);
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dTrace("msg:%p, get from snode-write queue", pMsg); dTrace("msg:%p, get from snode-write queue", pMsg);

View File

@ -20,13 +20,13 @@
#define MAX_CONTENT_LEN 2 * 1024 * 1024 #define MAX_CONTENT_LEN 2 * 1024 * 1024
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
int32_t num = 0; int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->hash); int32_t size = taosHashGetSize(pMgmt->hash);
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) { if (pVnodes == NULL) {
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -44,7 +44,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
} }
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
*numOfVnodes = num; *numOfVnodes = num;
*ppVnodes = pVnodes; *ppVnodes = pVnodes;

View File

@ -22,7 +22,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
tfsUpdateSize(pMgmt->pTfs); tfsUpdateSize(pMgmt->pTfs);
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
@ -32,21 +32,21 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
SVnodeLoad vload = {.vgId = pVnode->vgId}; SVnodeLoad vload = {.vgId = pVnode->vgId};
if (!pVnode->failed) { if (!pVnode->failed) {
vnodeGetLoad(pVnode->pImpl, &vload); (void)vnodeGetLoad(pVnode->pImpl, &vload);
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload); if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
} }
taosArrayPush(pInfo->pVloads, &vload); (void)taosArrayPush(pInfo->pVloads, &vload);
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
} }
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite)); pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
if (!pInfo->pVloads) return; if (!pInfo->pVloads) return;
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) { while (pIter) {
@ -57,13 +57,13 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
if (!pVnode->failed) { if (!pVnode->failed) {
SVnodeLoadLite vload = {0}; SVnodeLoadLite vload = {0};
if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) { if (vnodeGetLoadLite(pVnode->pImpl, &vload) == 0) {
taosArrayPush(pInfo->pVloads, &vload); (void)taosArrayPush(pInfo->pVloads, &vload);
} }
} }
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
} }
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
@ -109,7 +109,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); (void)tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
taosArrayDestroy(pVloads); taosArrayDestroy(pVloads);
} }
@ -200,7 +200,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port; pNode->nodePort = pCreate->replicas[pCfg->syncCfg.replicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN); tstrncpy(pNode->nodeFqdn, pCreate->replicas[pCfg->syncCfg.replicaNum].fqdn, TSDB_FQDN_LEN);
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.replicaNum++; pCfg->syncCfg.replicaNum++;
} }
if (pCreate->selfIndex != -1) { if (pCreate->selfIndex != -1) {
@ -212,7 +212,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port; pNode->nodePort = pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN); tstrncpy(pNode->nodeFqdn, pCreate->learnerReplicas[pCfg->syncCfg.totalReplicaNum].fqdn, TSDB_FQDN_LEN);
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.totalReplicaNum++; pCfg->syncCfg.totalReplicaNum++;
} }
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum; pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
@ -323,7 +323,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false); SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, req.vgId, false);
if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) { if (pVnode != NULL && (req.replica == 1 || !pVnode->failed)) {
dError("vgId:%d, already exist", req.vgId); dError("vgId:%d, already exist", req.vgId);
tFreeSCreateVnodeReq(&req); (void)tFreeSCreateVnodeReq(&req);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
code = TSDB_CODE_VND_ALREADY_EXIST; code = TSDB_CODE_VND_ALREADY_EXIST;
return 0; return 0;
@ -374,14 +374,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER; goto _OVER;
} }
//taosThreadMutexLock(&pMgmt->createLock); // taosThreadMutexLock(&pMgmt->createLock);
code = vmWriteVnodeListToFile(pMgmt); code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) { if (code != 0) {
code = terrno != 0 ? terrno : code; code = terrno != 0 ? terrno : code;
//taosThreadMutexUnlock(&pMgmt->createLock); // taosThreadMutexUnlock(&pMgmt->createLock);
goto _OVER; goto _OVER;
} }
//taosThreadMutexUnlock(&pMgmt->createLock); // taosThreadMutexUnlock(&pMgmt->createLock);
_OVER: _OVER:
if (code != 0) { if (code != 0) {
@ -392,7 +392,7 @@ _OVER:
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
tFreeSCreateVnodeReq(&req); (void)tFreeSCreateVnodeReq(&req);
terrno = code; terrno = code;
return code; return code;
} }
@ -779,10 +779,11 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SDropVnodeReq dropReq = {0}; SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return terrno;
} }
int32_t vgId = dropReq.vgId; int32_t vgId = dropReq.vgId;
@ -791,25 +792,25 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (dropReq.dnodeId != pMgmt->pData->dnodeId) { if (dropReq.dnodeId != pMgmt->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId); dError("vgId:%d, dnodeId:%d not matched with local dnode", dropReq.vgId, dropReq.dnodeId);
return -1; return terrno;
} }
SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false); SVnodeObj *pVnode = vmAcquireVnodeImpl(pMgmt, vgId, false);
if (pVnode == NULL) { if (pVnode == NULL) {
dInfo("vgId:%d, failed to drop since %s", vgId, terrstr()); dInfo("vgId:%d, failed to drop since %s", vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST; terrno = TSDB_CODE_VND_NOT_EXIST;
return -1; return terrno;
} }
pVnode->dropped = 1; pVnode->dropped = 1;
if (vmWriteVnodeListToFile(pMgmt) != 0) { if ((code = vmWriteVnodeListToFile(pMgmt)) != 0) {
pVnode->dropped = 0; pVnode->dropped = 0;
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return -1; return code;
} }
vmCloseVnode(pMgmt, pVnode, false); vmCloseVnode(pMgmt, pVnode, false);
vmWriteVnodeListToFile(pMgmt); (void)vmWriteVnodeListToFile(pMgmt);
dInfo("vgId:%d, is dropped", vgId); dInfo("vgId:%d, is dropped", vgId);
return 0; return 0;
@ -864,7 +865,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
continue; continue;
} }
taosArrayPush(arbHbRsp.hbMembers, &rspMember); (void)taosArrayPush(arbHbRsp.hbMembers, &rspMember);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
} }
@ -895,7 +896,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
_OVER: _OVER:
tFreeSVArbHeartBeatReq(&arbHbReq); tFreeSVArbHeartBeatReq(&arbHbReq);
tFreeSVArbHeartBeatRsp(&arbHbRsp); tFreeSVArbHeartBeatRsp(&arbHbRsp);
return terrno == TSDB_CODE_SUCCESS ? 0 : -1; return terrno;
} }
SArray *vmGetMsgHandles() { SArray *vmGetMsgHandles() {

View File

@ -23,12 +23,12 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t diskId = -1; int32_t diskId = -1;
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); (void)taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode != NULL) { if (pVnode != NULL) {
diskId = pVnode->diskPrimary; diskId = pVnode->diskPrimary;
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
return diskId; return diskId;
} }
@ -96,8 +96,8 @@ int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) { SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); (void)taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) { if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
pVnode = NULL; pVnode = NULL;
@ -105,7 +105,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount); // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
} }
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
return pVnode; return pVnode;
} }
@ -115,10 +115,10 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVno
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return; if (pVnode == NULL) return;
taosThreadRwlockRdlock(&pMgmt->lock); (void)taosThreadRwlockRdlock(&pMgmt->lock);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount); // dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
} }
static void vmFreeVnodeObj(SVnodeObj **ppVnode) { static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
@ -163,15 +163,15 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->failed = 1; pVnode->failed = 1;
} }
taosThreadRwlockWrlock(&pMgmt->lock); (void)taosThreadRwlockWrlock(&pMgmt->lock);
SVnodeObj *pOld = NULL; SVnodeObj *pOld = NULL;
taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld); (void)taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (pOld) { if (pOld) {
ASSERT(pOld->failed); ASSERT(pOld->failed);
vmFreeVnodeObj(&pOld); vmFreeVnodeObj(&pOld);
} }
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
return code; return code;
} }
@ -184,9 +184,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit); vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
} }
taosThreadRwlockWrlock(&pMgmt->lock); (void)taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); (void)taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock); (void)taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
if (pVnode->failed) { if (pVnode->failed) {
@ -235,8 +235,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
if (commitAndRemoveWal) { if (commitAndRemoveWal) {
dInfo("vgId:%d, commit data for vnode split", pVnode->vgId); dInfo("vgId:%d, commit data for vnode split", pVnode->vgId);
vnodeSyncCommit(pVnode->pImpl); (void)vnodeSyncCommit(pVnode->pImpl);
vnodeBegin(pVnode->pImpl); (void)vnodeBegin(pVnode->pImpl);
dInfo("vgId:%d, commit data finished", pVnode->vgId); dInfo("vgId:%d, commit data finished", pVnode->vgId);
} }
@ -250,8 +250,8 @@ _closed:
if (commitAndRemoveWal) { if (commitAndRemoveWal) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path); dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
tfsRmdir(pMgmt->pTfs, path); (void)tfsRmdir(pMgmt->pTfs, path);
tfsMkdir(pMgmt->pTfs, path); (void)tfsMkdir(pMgmt->pTfs, path);
} }
if (pVnode->dropped) { if (pVnode->dropped) {
@ -332,7 +332,7 @@ static void *vmOpenVnodeInThread(void *param) {
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++; pThread->opened++;
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
} }
dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
@ -381,13 +381,13 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if (pThread->vnodeNum == 0) continue; if (pThread->vnodeNum == 0) continue;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) { if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
} }
bool updateVnodesList = false; bool updateVnodesList = false;
@ -395,7 +395,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL); (void)taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread); taosThreadClear(&pThread->thread);
} }
taosMemoryFree(pThread->pCfgs); taosMemoryFree(pThread->pCfgs);
@ -484,19 +484,19 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
if (pThread->vnodeNum == 0) continue; if (pThread->vnodeNum == 0) continue;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
} }
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL); (void)taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread); taosThreadClear(&pThread->thread);
} }
taosMemoryFree(pThread->ppVnodes); taosMemoryFree(pThread->ppVnodes);
@ -519,8 +519,8 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmCloseVnodes(pMgmt); vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt); vmStopWorker(pMgmt);
vnodeCleanup(); vnodeCleanup();
taosThreadRwlockDestroy(&pMgmt->lock); (void)taosThreadRwlockDestroy(&pMgmt->lock);
taosThreadMutexDestroy(&pMgmt->createLock); (void)taosThreadMutexDestroy(&pMgmt->createLock);
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
} }
@ -569,22 +569,22 @@ static void *vmThreadFp(void *param) {
static int32_t vmInitTimer(SVnodeMgmt *pMgmt) { static int32_t vmInitTimer(SVnodeMgmt *pMgmt) {
int32_t code = 0; int32_t code = 0;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) { if (taosThreadCreate(&pMgmt->thread, &thAttr, vmThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create vnode timer thread since %s", tstrerror(code)); dError("failed to create vnode timer thread since %s", tstrerror(code));
return code; return code;
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
return 0; return 0;
} }
static void vmCleanupTimer(SVnodeMgmt *pMgmt) { static void vmCleanupTimer(SVnodeMgmt *pMgmt) {
pMgmt->stop = true; pMgmt->stop = true;
if (taosCheckPthreadValid(pMgmt->thread)) { if (taosCheckPthreadValid(pMgmt->thread)) {
taosThreadJoin(pMgmt->thread, NULL); (void)taosThreadJoin(pMgmt->thread, NULL);
taosThreadClear(&pMgmt->thread); taosThreadClear(&pMgmt->thread);
} }
} }
@ -707,7 +707,7 @@ static void *vmRestoreVnodeInThread(void *param) {
} else { } else {
dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex); dInfo("vgId:%d, is restored by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++; pThread->opened++;
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); (void)atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
} }
} }
@ -761,20 +761,20 @@ static int32_t vmStartVnodes(SVnodeMgmt *pMgmt) {
if (pThread->vnodeNum == 0) continue; if (pThread->vnodeNum == 0) continue;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) { if (taosThreadCreate(&pThread->thread, &thAttr, vmRestoreVnodeInThread, pThread) != 0) {
dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno)); dError("thread:%d, failed to create thread to restore vnode since %s", pThread->threadIndex, strerror(errno));
ASSERT(errno == 0); ASSERT(errno == 0);
} }
taosThreadAttrDestroy(&thAttr); (void)taosThreadAttrDestroy(&thAttr);
} }
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
taosThreadJoin(pThread->thread, NULL); (void)taosThreadJoin(pThread->thread, NULL);
taosThreadClear(&pThread->thread); taosThreadClear(&pThread->thread);
} }
taosMemoryFree(pThread->ppVnodes); taosMemoryFree(pThread->ppVnodes);

View File

@ -187,7 +187,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmSendResponse(SRpcMsg *pMsg) { static void vmSendResponse(SRpcMsg *pMsg) {
if (pMsg->info.handle) { if (pMsg->info.handle) {
SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
rpcSendResponse(&rsp); (void)rpcSendResponse(&rsp);
} }
} }
@ -236,7 +236,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dError("vgId:%d, msg:%p preprocess query msg failed since %s", pVnode->vgId, pMsg, tstrerror(code)); dError("vgId:%d, msg:%p preprocess query msg failed since %s", pVnode->vgId, pMsg, tstrerror(code));
} else { } else {
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg); code = taosWriteQitem(pVnode->pQueryQ, pMsg);
} }
break; break;
case STREAM_QUEUE: case STREAM_QUEUE:

View File

@ -47,8 +47,8 @@ static int32_t dmCheckRepeatInit(SDnode *pDnode) {
} }
static int32_t dmInitSystem() { static int32_t dmInitSystem() {
taosIgnSIGPIPE(); (void)taosIgnSIGPIPE();
taosBlockSIGPIPE(); (void)taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
return 0; return 0;
} }
@ -200,10 +200,10 @@ void dmCleanup() {
auditCleanup(); auditCleanup();
syncCleanUp(); syncCleanUp();
walCleanUp(); walCleanUp();
udfcClose(); (void)udfcClose();
udfStopUdfd(); udfStopUdfd();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dmDiskClose(); (void)dmDiskClose();
DestroyRegexCache(); DestroyRegexCache();
#if defined(USE_S3) #if defined(USE_S3)

View File

@ -47,7 +47,8 @@ int32_t dmInitDnode(SDnode *pDnode) {
} }
// compress module init // compress module init
tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor); (void)tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse,
tsCompressor);
pDnode->wrappers[DNODE].func = dmGetMgmtFunc(); pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc(); pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
@ -60,7 +61,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
pWrapper->pDnode = pDnode; pWrapper->pDnode = pDnode;
pWrapper->name = dmNodeName(ntype); pWrapper->name = dmNodeName(ntype);
pWrapper->ntype = ntype; pWrapper->ntype = ntype;
taosThreadRwlockInit(&pWrapper->lock, NULL); (void)taosThreadRwlockInit(&pWrapper->lock, NULL);
snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = taosStrdup(path); pWrapper->path = taosStrdup(path);
@ -214,8 +215,8 @@ int32_t dmInitVars(SDnode *pDnode) {
return -1; return -1;
} }
taosThreadRwlockInit(&pData->lock, NULL); (void)taosThreadRwlockInit(&pData->lock, NULL);
taosThreadMutexInit(&pDnode->mutex, NULL); (void)taosThreadMutexInit(&pDnode->mutex, NULL);
return 0; return 0;
} }
@ -223,16 +224,16 @@ void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path); taosMemoryFreeClear(pWrapper->path);
taosThreadRwlockDestroy(&pWrapper->lock); (void)taosThreadRwlockDestroy(&pWrapper->lock);
} }
if (pDnode->lockfile != NULL) { if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile); (void)taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile); (void)taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL; pDnode->lockfile = NULL;
} }
SDnodeData *pData = &pDnode->data; SDnodeData *pData = &pDnode->data;
taosThreadRwlockWrlock(&pData->lock); (void)taosThreadRwlockWrlock(&pData->lock);
if (pData->oldDnodeEps != NULL) { if (pData->oldDnodeEps != NULL) {
if (dmWriteEps(pData) == 0) { if (dmWriteEps(pData) == 0) {
dmRemoveDnodePairs(pData); dmRemoveDnodePairs(pData);
@ -248,10 +249,10 @@ void dmClearVars(SDnode *pDnode) {
taosHashCleanup(pData->dnodeHash); taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL; pData->dnodeHash = NULL;
} }
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
taosThreadRwlockDestroy(&pData->lock); (void)taosThreadRwlockDestroy(&pData->lock);
taosThreadMutexDestroy(&pDnode->mutex); (void)taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
} }
@ -266,14 +267,14 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
taosThreadRwlockRdlock(&pWrapper->lock); (void)taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed) { if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
// dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount); // dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
} else { } else {
pRetWrapper = NULL; pRetWrapper = NULL;
} }
taosThreadRwlockUnlock(&pWrapper->lock); (void)taosThreadRwlockUnlock(&pWrapper->lock);
return pRetWrapper; return pRetWrapper;
} }
@ -281,7 +282,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0; int32_t code = 0;
taosThreadRwlockRdlock(&pWrapper->lock); (void)taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed) { if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
// dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); // dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
@ -304,7 +305,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
break; break;
} }
} }
taosThreadRwlockUnlock(&pWrapper->lock); (void)taosThreadRwlockUnlock(&pWrapper->lock);
return code; return code;
} }
@ -312,9 +313,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
void dmReleaseWrapper(SMgmtWrapper *pWrapper) { void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return; if (pWrapper == NULL) return;
taosThreadRwlockRdlock(&pWrapper->lock); (void)taosThreadRwlockRdlock(&pWrapper->lock);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosThreadRwlockUnlock(&pWrapper->lock); (void)taosThreadRwlockUnlock(&pWrapper->lock);
// dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); // dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
} }
@ -343,7 +344,7 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
rsp.contLen = pMsg->contLen; rsp.contLen = pMsg->contLen;
} }
rpcSendResponse(&rsp); (void)rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
@ -365,6 +366,6 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
} }
} }
rpcSendResponse(&rsp); (void)rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }

View File

@ -77,12 +77,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
taosMsleep(10); taosMsleep(10);
} }
taosThreadRwlockWrlock(&pWrapper->lock); (void)taosThreadRwlockWrlock(&pWrapper->lock);
if (pWrapper->pMgmt != NULL) { if (pWrapper->pMgmt != NULL) {
(*pWrapper->func.closeFp)(pWrapper->pMgmt); (*pWrapper->func.closeFp)(pWrapper->pMgmt);
pWrapper->pMgmt = NULL; pWrapper->pMgmt = NULL;
} }
taosThreadRwlockUnlock(&pWrapper->lock); (void)taosThreadRwlockUnlock(&pWrapper->lock);
dInfo("node:%s, has been closed", pWrapper->name); dInfo("node:%s, has been closed", pWrapper->name);
} }

View File

@ -18,7 +18,7 @@
#include "qworker.h" #include "qworker.h"
#include "tversion.h" #include "tversion.h"
static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } static inline void dmSendRsp(SRpcMsg *pMsg) { (void)rpcSendResponse(pMsg); }
static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
@ -29,7 +29,11 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
if (pMsg->pCont == NULL) { if (pMsg->pCont == NULL) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
tSerializeSEpSet(pMsg->pCont, contLen, &epSet); contLen = tSerializeSEpSet(pMsg->pCont, contLen, &epSet);
if (contLen < 0) {
pMsg->code = contLen;
return;
}
pMsg->contLen = contLen; pMsg->contLen = contLen;
} }
} }
@ -65,14 +69,17 @@ static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) {
return code; return code;
} }
static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
int32_t code = 0; int32_t code = 0;
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite)); SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite); code = tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
if (code < 0) {
dError("failed to update rpc ip-white since: %s", tstrerror(code));
return;
}
code = rpcSetIpWhite(pTrans, &ipWhite); code = rpcSetIpWhite(pTrans, &ipWhite);
pData->ipWhiteVer = ipWhite.ver; pData->ipWhiteVer = ipWhite.ver;
tFreeSUpdateIpWhiteReq(&ipWhite); (void)tFreeSUpdateIpWhiteReq(&ipWhite);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
@ -81,7 +88,7 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
SIpV4Range range = {.ip = clientIp, .mask = 32}; SIpV4Range range = {.ip = clientIp, .mask = 32};
char buf[36] = {0}; char buf[36] = {0};
rpcUtilSIpRangeToStr(&range, buf); (void)rpcUtilSIpRangeToStr(&range, buf);
dError("User:%s host:%s not in ip white list", user, buf); dError("User:%s host:%s not in ip white list", user, buf);
return true; return true;
} else { } else {
@ -100,7 +107,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
int32_t svrVer = 0; int32_t svrVer = 0;
taosVersionStrToInt(version, &svrVer); (void)taosVersionStrToInt(version, &svrVer);
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) { if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer);
goto _OVER; goto _OVER;
@ -296,7 +303,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} else { } else {
pMsg->info.handle = 0; pMsg->info.handle = 0;
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); (void)rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0; return 0;
} }
} }
@ -315,14 +322,13 @@ static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->info.handle); pMsg->info.handle);
return code; return code;
} else { } else {
rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL); return rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
return 0;
} }
} }
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); } static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { (void)rpcRegisterBrokenLinkArg(pMsg); }
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcReleaseHandle(pHandle, type); } static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { (void)rpcReleaseHandle(pHandle, type); }
static bool rpcRfp(int32_t code, tmsg_t msgType) { static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND || if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
@ -382,7 +388,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
@ -426,7 +432,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->statusRpc = rpcOpen(&rpcInit); pTrans->statusRpc = rpcOpen(&rpcInit);
if (pTrans->statusRpc == NULL) { if (pTrans->statusRpc == NULL) {
@ -471,7 +477,7 @@ int32_t dmInitSyncClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->syncRpc = rpcOpen(&rpcInit); pTrans->syncRpc = rpcOpen(&rpcInit);
if (pTrans->syncRpc == NULL) { if (pTrans->syncRpc == NULL) {
@ -522,7 +528,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->serverRpc = rpcOpen(&rpcInit); pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) { if (pTrans->serverRpc == NULL) {
dError("failed to init dnode rpc server"); dError("failed to init dnode rpc server");

View File

@ -33,7 +33,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData);
void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeData *pData = data; SDnodeData *pData = data;
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
@ -48,7 +48,7 @@ void dmGetDnodeEp(void *data, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t
} }
} }
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
} }
static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) { static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
@ -255,8 +255,8 @@ _OVER:
if (taosArrayGetSize(pData->dnodeEps) == 0) { if (taosArrayGetSize(pData->dnodeEps) == 0) {
SDnodeEp dnodeEp = {0}; SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1; dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep); (void)taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
taosArrayPush(pData->dnodeEps, &dnodeEp); (void)taosArrayPush(pData->dnodeEps, &dnodeEp);
} }
if ((code = dmReadDnodePairs(pData)) != 0) { if ((code = dmReadDnodePairs(pData)) != 0) {
@ -337,7 +337,7 @@ int32_t dmWriteEps(SDnodeData *pData) {
if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER); if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER);
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER); if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER);
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER); if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _OVER);
code = 0; code = 0;
@ -348,7 +348,7 @@ int32_t dmWriteEps(SDnodeData *pData) {
_OVER: _OVER:
if (pJson != NULL) tjsonDelete(pJson); if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer); if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) (void)taosCloseFile(&pFile);
if (code != 0) { if (code != 0) {
dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, tstrerror(code), pData->dnodeVer); dError("failed to write dnode file:%s since %s, dnodeVer:%" PRId64, realfile, tstrerror(code), pData->dnodeVer);
@ -358,18 +358,18 @@ _OVER:
int32_t dmGetDnodeSize(SDnodeData *pData) { int32_t dmGetDnodeSize(SDnodeData *pData) {
int32_t size = 0; int32_t size = 0;
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
size = taosArrayGetSize(pData->dnodeEps); size = taosArrayGetSize(pData->dnodeEps);
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
return size; return size;
} }
void dmUpdateEps(SDnodeData *pData, SArray *eps) { void dmUpdateEps(SDnodeData *pData, SArray *eps) {
taosThreadRwlockWrlock(&pData->lock); (void)taosThreadRwlockWrlock(&pData->lock);
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer); dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
dmResetEps(pData, eps); dmResetEps(pData, eps);
dmWriteEps(pData); (void)dmWriteEps(pData);
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
} }
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
@ -398,7 +398,7 @@ static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
for (int32_t i = 0; i < numOfEps; i++) { for (int32_t i = 0; i < numOfEps; i++) {
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i); SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); (void)taosHashPut(pData->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
} }
pData->validMnodeEps = true; pData->validMnodeEps = true;
@ -418,7 +418,7 @@ static void dmPrintEps(SDnodeData *pData) {
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
bool changed = false; bool changed = false;
if (dnodeId == 0) return changed; if (dnodeId == 0) return changed;
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) { if (pDnodeEp != NULL) {
@ -430,14 +430,14 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
} }
} }
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
return changed; return changed;
} }
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
*pEpSet = pData->mnodeEps; *pEpSet = pData->mnodeEps;
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
} }
void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) { void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) {
@ -464,12 +464,12 @@ static FORCE_INLINE void dmSwapEps(SEp *epLhs, SEp *epRhs) {
} }
void dmRotateMnodeEpSet(SDnodeData *pData) { void dmRotateMnodeEpSet(SDnodeData *pData) {
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
SEpSet *pEpSet = &pData->mnodeEps; SEpSet *pEpSet = &pData->mnodeEps;
for (int i = 1; i < pEpSet->numOfEps; i++) { for (int i = 1; i < pEpSet->numOfEps; i++) {
dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]); dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]);
} }
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
} }
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) { void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
@ -486,9 +486,9 @@ void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return; if (memcmp(pEpSet, &pData->mnodeEps, sizeof(SEpSet)) == 0) return;
taosThreadRwlockWrlock(&pData->lock); (void)taosThreadRwlockWrlock(&pData->lock);
pData->mnodeEps = *pEpSet; pData->mnodeEps = *pEpSet;
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
@ -502,7 +502,7 @@ bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
int32_t dnodeId = -1; int32_t dnodeId = -1;
if (did != NULL) dnodeId = *did; if (did != NULL) dnodeId = *did;
taosThreadRwlockRdlock(&pData->lock); (void)taosThreadRwlockRdlock(&pData->lock);
if (pData->oldDnodeEps != NULL) { if (pData->oldDnodeEps != NULL) {
int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps); int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps);
@ -542,7 +542,7 @@ bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
} }
} }
taosThreadRwlockUnlock(&pData->lock); (void)taosThreadRwlockUnlock(&pData->lock);
return updated; return updated;
} }
@ -550,26 +550,26 @@ static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
int32_t code = 0; int32_t code = 0;
SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes"); SJson *dnodes = tjsonGetObjectItem(pJson, "dnodes");
if (dnodes == NULL) return -1; if (dnodes == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
int32_t numOfDnodes = tjsonGetArraySize(dnodes); int32_t numOfDnodes = tjsonGetArraySize(dnodes);
for (int32_t i = 0; i < numOfDnodes; ++i) { for (int32_t i = 0; i < numOfDnodes; ++i) {
SJson *dnode = tjsonGetArrayItem(dnodes, i); SJson *dnode = tjsonGetArrayItem(dnodes, i);
if (dnode == NULL) return -1; if (dnode == NULL) return TSDB_CODE_INVALID_CFG_VALUE;
SDnodeEpPair pair = {0}; SDnodeEpPair pair = {0};
tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code); tjsonGetInt32ValueFromDouble(dnode, "id", pair.id, code);
if (code < 0) return -1; if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn); code = tjsonGetStringValue(dnode, "fqdn", pair.oldFqdn);
if (code < 0) return -1; if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code); tjsonGetUInt16ValueFromDouble(dnode, "port", pair.oldPort, code);
if (code < 0) return -1; if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn); code = tjsonGetStringValue(dnode, "new_fqdn", pair.newFqdn);
if (code < 0) return -1; if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code); tjsonGetUInt16ValueFromDouble(dnode, "new_port", pair.newPort, code);
if (code < 0) return -1; if (code < 0) return TSDB_CODE_INVALID_CFG_VALUE;
if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return -1; if (taosArrayPush(pData->oldDnodeEps, &pair) == NULL) return TSDB_CODE_OUT_OF_MEMORY;
} }
return code; return code;