add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-11 20:48:24 +08:00
parent d549a47cf5
commit 39d04ed15b
8 changed files with 56 additions and 27 deletions

View File

@ -52,6 +52,7 @@ typedef struct {
void* data; void* data;
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
void* serverRpc;
PutToQueueFp putToQueueFp; PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;

View File

@ -33,6 +33,10 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
dInfo("ip-white-dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->ipWhiteVer, ver); dInfo("ip-white-dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->ipWhiteVer, ver);
if (pMgmt->ipWhiteVer == ver) { if (pMgmt->ipWhiteVer == ver) {
if (ver == 0) {
dInfo("disable ip-white-dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->ipWhiteVer, ver);
rpcSetIpWhite(pMgmt->msgCb.serverRpc, NULL);
}
return; return;
} }
int64_t oldVer = pMgmt->ipWhiteVer; int64_t oldVer = pMgmt->ipWhiteVer;

View File

@ -55,7 +55,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp; pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
pMgmt->ipWhiteVer = -1; pMgmt->ipWhiteVer = 0;
if (dmStartWorker(pMgmt) != 0) { if (dmStartWorker(pMgmt) != 0) {
return -1; return -1;
} }

View File

@ -73,6 +73,18 @@ static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) {
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
if (forbidden) {
SIpV4Range range = {.ip = clientIp, .mask = 32};
char buf[36] = {0};
rpcUtilSIpRangeToStr(&range, buf);
dError("User %s host:%s not in ip white list", user, buf);
return true;
} else {
return false;
}
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
@ -91,13 +103,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER; goto _OVER;
} }
if (pRpc->info.forbiddenIp == 1) { bool isForbidden = dmIsForbiddenIp(pRpc->info.forbiddenIp, pRpc->info.conn.user, pRpc->info.conn.clientIp);
char tbuf[36] = {0}; if (isForbidden) {
SIpV4Range range = {.ip = pRpc->info.conn.clientIp, .mask = 32};
rpcUtilSIpRangeToStr(&range, tbuf);
dError("User %s host:%s not in ip white list", pRpc->info.conn.user, tbuf);
terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST; terrno = TSDB_CODE_IP_NOT_IN_WHITE_LIST;
goto _OVER; goto _OVER;
} }
@ -397,6 +404,7 @@ void dmCleanupServer(SDnode *pDnode) {
SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = { SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc, .clientRpc = pDnode->trans.clientRpc,
.serverRpc = pDnode->trans.serverRpc,
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,

View File

@ -33,6 +33,7 @@ int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname); int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp); int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
int32_t mndCheckIpWhiteList(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -28,6 +28,7 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) { int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0; return 0;
} }
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; } int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) { int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
return 0; return 0;
@ -41,4 +42,6 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
pRsp->passVer = pUser->passVersion; pRsp->passVer = pUser->passVersion;
return 0; return 0;
} }
int32_t mndCheckIpWhiteList(SMnode *pMnode) { return 0; }
#endif #endif

View File

@ -66,7 +66,7 @@ static SIpWhiteMgt ipWhiteMgt;
void ipWhiteMgtInit() { void ipWhiteMgtInit() {
ipWhiteMgt.pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK); ipWhiteMgt.pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
ipWhiteMgt.ver = taosGetTimestampMs(); ipWhiteMgt.ver = 0;
taosThreadRwlockInit(&ipWhiteMgt.rw, NULL); taosThreadRwlockInit(&ipWhiteMgt.rw, NULL);
} }
void ipWhiteMgtCleanup() { void ipWhiteMgtCleanup() {
@ -188,11 +188,14 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) {
int64_t ver = ipWhiteMgt.ver; int64_t ver = ipWhiteMgt.ver;
if (ver == 0) { if (ver == 0) {
ipWhiteMgtUpdateAll(pMnode); ipWhiteMgtUpdateAll(pMnode);
ipWhiteMgt.ver = taosGetTimestampMs();
} }
ver = ipWhiteMgt.ver;
taosThreadRwlockUnlock(&ipWhiteMgt.rw); taosThreadRwlockUnlock(&ipWhiteMgt.rw);
mInfo("ip-white-mnode ver, %" PRId64 "", ver); mInfo("ip-white-mnode ver, %" PRId64 "", ver);
if (mndCheckIpWhiteList(pMnode) == 0 || tsEnableWhiteList == false) {
return 0;
}
return ver; return ver;
} }

View File

@ -96,6 +96,7 @@ typedef struct SWorkThrd {
SWhiteList* pWhiteList; SWhiteList* pWhiteList;
int64_t whiteListVer; int64_t whiteListVer;
int8_t enableIpWhiteList;
} SWorkThrd; } SWorkThrd;
typedef struct SServerObj { typedef struct SServerObj {
@ -355,10 +356,13 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
memcpy(pConn->user, pHead->user, strlen(pHead->user)); memcpy(pConn->user, pHead->user, strlen(pHead->user));
int8_t forbiddenIp = uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false ? 1 : 0; int8_t forbiddenIp = 0;
if (pThrd->enableIpWhiteList) {
forbiddenIp = uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false ? 1 : 0;
if (forbiddenIp == 0) { if (forbiddenIp == 0) {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
} }
}
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
return true; return true;
@ -1355,6 +1359,7 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
} }
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
SUpdateIpWhite* req = msg->arg; SUpdateIpWhite* req = msg->arg;
if (req != NULL) {
for (int i = 0; i < req->numOfUser; i++) { for (int i = 0; i < req->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
@ -1367,9 +1372,13 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
} }
thrd->pWhiteList->ver = req->ver; thrd->pWhiteList->ver = req->ver;
thrd->enableIpWhiteList = 1;
tFreeSUpdateIpWhiteReq(req); tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req); taosMemoryFree(req);
} else {
thrd->enableIpWhiteList = 0;
}
taosMemoryFree(msg); taosMemoryFree(msg);
return; return;
} }
@ -1560,7 +1569,7 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
SWorkThrd* pThrd = svrObj->pThreadObj[i]; SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
SUpdateIpWhite* pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg); SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL);
msg->type = Update; msg->type = Update;
msg->arg = pReq; msg->arg = pReq;