add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-08 11:49:12 +08:00
parent 27e3ee8f1d
commit 56706193eb
5 changed files with 206 additions and 34 deletions

View File

@ -31,8 +31,6 @@ int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs); bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo); void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo);
SIpWhiteList *mndCreateIpWhiteFromDnode(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -130,7 +130,7 @@ typedef struct SMnode {
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb; SMsgCb msgCb;
int64_t ipWhiteVer; int64_t ipWhiteVer;
} SMnode; } SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
@ -141,6 +141,8 @@ bool mndGetRestored(SMnode *pMnode);
void mndSetStop(SMnode *pMnode); void mndSetStop(SMnode *pMnode);
bool mndGetStop(SMnode *pMnode); bool mndGetStop(SMnode *pMnode);
SArray *mndGetAllDnodeFqdns(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -23,6 +23,10 @@
extern "C" { extern "C" {
#endif #endif
enum {
IP_WHITE_ADD,
IP_WHITE_DROP,
};
int32_t mndInitUser(SMnode *pMnode); int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode); void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName); SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
@ -43,6 +47,7 @@ void mndUserFreeObj(SUserObj *pUser);
int64_t mndGetIpWhiteVer(SMnode *pMnode); int64_t mndGetIpWhiteVer(SMnode *pMnode);
void mndUpdateIpWhite(char *user, char *fqdn, int8_t type, int8_t lock);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -51,6 +51,12 @@ enum {
DND_CONN_ACTIVE_CODE, DND_CONN_ACTIVE_CODE,
}; };
enum {
DND_CREATE,
DND_ADD,
DND_DROP,
};
static int32_t mndCreateDefaultDnode(SMnode *pMnode); static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -103,7 +109,10 @@ int32_t mndInitDnode(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupDnode(SMnode *pMnode) {} SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode);
SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn);
void mndCleanupDnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) { static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
int32_t code = -1; int32_t code = -1;
@ -130,6 +139,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
mndUpdateIpWhite("_dnd", dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -695,6 +705,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
mndUpdateIpWhite("_dnd", dnodeObj.fqdn, IP_WHITE_ADD, 1);
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
@ -820,37 +831,94 @@ _OVER:
return code; return code;
} }
SIpWhiteList *mndCreateIpWhiteFromDnode(SMnode *pMnode) { // void mndUpdateIpWhiteOfDnode(SMnode *pMnode, char *fqdn, int8_t type) {
SDnodeObj *pObj = NULL; // if (type == DND_CREATE) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SArray *fqdns = taosArrayInit(4, sizeof(void *));
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
char *fqdn = taosStrdup(pObj->fqdn); // } else if (type == DND_ADD) {
taosArrayPush(fqdns, &fqdn);
sdbRelease(pSdb, pObj);
}
int32_t sz = taosArrayGetSize(fqdns);
SIpWhiteList *list = NULL;
if (sz != 0) {
list = taosMemoryCalloc(1, sizeof(SIpWhiteList) + sz * sizeof(SIpV4Range));
for (int i = 0; i < sz; i++) {
char *e = taosArrayGetP(fqdns, i);
taosMemoryFree(e);
int32_t ip = taosGetFqdn(e);
SIpV4Range *pRange = &list->pIpRange[0]; // } else if (type == DND_DROP) {
pRange->ip = ip;
pRange->mask = 0;
}
}
taosArrayDestroy(fqdns); // }
return list; // }
} // SIpWhiteList *mndCreateIpWhiteOfDnode(SMnode *pMnode) {
// SDnodeObj *pObj = NULL;
// void *pIter = NULL;
// SSdb *pSdb = pMnode->pSdb;
// SArray *fqdns = taosArrayInit(4, sizeof(void *));
// while (1) {
// pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
// if (pIter == NULL) break;
// char *fqdn = taosStrdup(pObj->fqdn);
// taosArrayPush(fqdns, &fqdn);
// sdbRelease(pSdb, pObj);
// }
// int32_t sz = taosArrayGetSize(fqdns);
// SIpWhiteList *list = NULL;
// if (sz != 0) {
// list = taosMemoryCalloc(1, sizeof(SIpWhiteList) + sz * sizeof(SIpV4Range));
// for (int i = 0; i < sz; i++) {
// char *e = taosArrayGetP(fqdns, i);
// taosMemoryFree(e);
// int32_t ip = taosGetFqdn(e);
// SIpV4Range *pRange = &list->pIpRange[0];
// pRange->ip = ip;
// pRange->mask = 0;
// }
// }
// taosArrayDestroy(fqdns);
// return list;
// }
// SIpWhiteList *mndAddIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn) {
// SIpV4Range dst = {.ip = taosGetFqdn(fqdn), .mask = 0};
// bool exist = false;
// for (int i = 0; i < pIpWhiteList->num; i++) {
// SIpV4Range *pRange = &pIpWhiteList->pIpRange[i];
// if (pRange->ip == dst.ip && pRange->mask == dst.mask) {
// exist = true;
// break;
// }
// }
// if (exist) {
// return cloneIpWhiteList(pIpWhiteList);
// } else {
// SIpWhiteList *pRet = taosMemoryCalloc(1, sizeof(SIpWhiteList) + (pIpWhiteList->num + 1) * sizeof(SIpV4Range));
// pRet->num = pIpWhiteList->num + 1;
// memcpy(pRet->pIpRange, pIpWhiteList->pIpRange, sizeof(SIpV4Range) * pIpWhiteList->num);
// SIpV4Range *pLast = &pRet->pIpRange[pIpWhiteList->num];
// pLast->ip = dst.ip;
// pLast->mask = dst.mask;
// return pRet;
// }
// }
// SIpWhiteList *mndRmIpWhiteOfDnode(SIpWhiteList *pIpWhiteList, char *fqdn) {
// SIpV4Range tgt = {.ip = taosGetFqdn(fqdn), .mask = 0};
// SIpWhiteList *pRet = taosMemoryCalloc(1, sizeof(SIpWhiteList) + (pIpWhiteList->num) * sizeof(SIpV4Range));
// int32_t idx = 0;
// for (int i = 0; i < pIpWhiteList->num; i++) {
// SIpV4Range *pSrc = &pIpWhiteList->pIpRange[i];
// SIpV4Range *pDst = &pIpWhiteList->pIpRange[idx];
// if (pSrc->ip != tgt.ip || pSrc->mask != tgt.mask) {
// pDst[idx].ip = pSrc[i].ip;
// pDst[idx].mask = pSrc[i].mask;
// idx++;
// }
// }
// pRet->num = idx;
// if (pRet->num == 0) {
// taosMemoryFree(pRet);
// return NULL;
// }
// return pRet;
// }
static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) { static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
SShowVariablesRsp rsp = {0}; SShowVariablesRsp rsp = {0};
@ -1019,6 +1087,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndUpdateIpWhite("_dnd", pDnode->fqdn, IP_WHITE_DROP, 1);
code = 0; code = 0;
_OVER: _OVER:
@ -1491,3 +1560,19 @@ _err:
terrno = TSDB_CODE_INVALID_CFG; terrno = TSDB_CODE_INVALID_CFG;
return -1; return -1;
} }
SArray *mndGetAllDnodeFqdns(SMnode *pMnode) {
SDnodeObj *pObj = NULL;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SArray *fqdns = taosArrayInit(4, sizeof(void *));
while (1) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
char *fqdn = taosStrdup(pObj->fqdn);
taosArrayPush(fqdns, &fqdn);
sdbRelease(pSdb, pObj);
}
return fqdns;
}

View File

@ -172,6 +172,7 @@ void ipWhiteMgtUpdateAll(SMnode *pMnode) {
SHashObj *pOld = ipWhiteMgt.pIpWhiteTab; SHashObj *pOld = ipWhiteMgt.pIpWhiteTab;
ipWhiteMgt.pIpWhiteTab = pNew; ipWhiteMgt.pIpWhiteTab = pNew;
destroyIpWhiteTab(pOld); destroyIpWhiteTab(pOld);
} }
void ipWhiteMgtUpdate2(SMnode *pMnode) { void ipWhiteMgtUpdate2(SMnode *pMnode) {
@ -194,6 +195,77 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) {
mInfo("ip-white-mnode ver, %" PRId64 "", ver); mInfo("ip-white-mnode ver, %" PRId64 "", ver);
return ver; return ver;
} }
bool mndUpdateIpWhiteImpl(SHashObj *pIpWhiteTab, char *user, char *fqdn, int8_t type) {
bool update = false;
SIpV4Range range = {.ip = taosGetIpv4FromFqdn(fqdn), .mask = 0};
SIpWhiteList **ppList = taosHashGet(pIpWhiteTab, user, strlen(user));
SIpWhiteList *pList = NULL;
if (ppList != NULL && *ppList != NULL) {
pList = *ppList;
}
if (type == IP_WHITE_ADD) {
if (pList == NULL) {
SIpWhiteList *pNewList = taosMemoryCalloc(1, sizeof(SIpWhiteList) + sizeof(SIpV4Range));
memcpy(pNewList->pIpRange, &range, sizeof(SIpV4Range));
pNewList->num = 1;
taosHashPut(pIpWhiteTab, user, strlen(user), &pNewList, sizeof(void *));
update = true;
} else {
if (!isRangeInWhiteList(pList, &range)) {
int32_t sz = sizeof(SIpWhiteList) + sizeof(SIpV4Range) * (pList->num + 1);
SIpWhiteList *pNewList = taosMemoryCalloc(1, sz);
memcpy(pNewList->pIpRange, pList->pIpRange, sizeof(SIpV4Range) * (pList->num));
pNewList->pIpRange[pList->num].ip = range.ip;
pNewList->pIpRange[pList->num].mask = range.mask;
pNewList->num = pList->num + 1;
taosHashPut(pIpWhiteTab, user, strlen(user), &pNewList, sizeof(void *));
taosMemoryFree(pList);
update = true;
}
}
} else if (type == IP_WHITE_DROP) {
if (pList != NULL) {
if (isRangeInWhiteList(pList, &range)) {
if (pList->num == 1) {
taosHashRemove(pIpWhiteTab, user, strlen(user));
taosMemoryFree(pList);
} else {
int32_t idx = 0;
int32_t sz = sizeof(SIpWhiteList) + sizeof(SIpV4Range) * (pList->num - 1);
SIpWhiteList *pNewList = taosMemoryCalloc(1, sz);
for (int i = 0; i < pList->num; i++) {
SIpV4Range *e = &pList->pIpRange[i];
if (!isIpRangeEqual(e, &range)) {
pNewList->pIpRange[idx].ip = e->ip;
pNewList->pIpRange[idx].mask = e->mask;
idx++;
}
}
pNewList->num = idx;
taosHashPut(pIpWhiteTab, user, strlen(user), &pNewList, sizeof(void *));
taosMemoryFree(pList);
}
update = true;
}
}
}
return update;
}
void mndUpdateIpWhite(char *user, char *fqdn, int8_t type, int8_t lock) {
if (lock) taosThreadRwlockWrlock(&ipWhiteMgt.rw);
bool update = mndUpdateIpWhiteImpl(ipWhiteMgt.pIpWhiteTab, user, fqdn, type);
if (update) ipWhiteMgt.ver++;
if (lock) taosThreadRwlockUnlock(&ipWhiteMgt.rw);
}
int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
int64_t ver = 0; int64_t ver = 0;
taosThreadRwlockWrlock(&ipWhiteMgt.rw); taosThreadRwlockWrlock(&ipWhiteMgt.rw);
@ -251,6 +323,17 @@ SHashObj *mndFetchAllIpWhite(SMnode *pMnode) {
sdbRelease(pSdb, pUser); sdbRelease(pSdb, pUser);
} }
SArray *fqdns = mndGetAllDnodeFqdns(pMnode);
for (int i = 0; i < taosArrayGetSize(fqdns); i++) {
char *fqdn = taosArrayGetP(fqdns, i);
mndUpdateIpWhiteImpl(pIpWhiteTab, "_dnd", fqdn, IP_WHITE_ADD);
taosMemoryFree(fqdn);
}
taosArrayDestroy(fqdns);
return pIpWhiteTab; return pIpWhiteTab;
} }
@ -913,8 +996,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
TSWAP(pOld->writeTbs, pNew->writeTbs); TSWAP(pOld->writeTbs, pNew->writeTbs);
TSWAP(pOld->useDbs, pNew->useDbs); TSWAP(pOld->useDbs, pNew->useDbs);
int32_t sz = pNew->pIpWhiteList->num * sizeof(SIpV4Range) + sizeof(SIpWhiteList); int32_t sz = sizeof(SIpWhiteList) + pNew->pIpWhiteList->num * sizeof(SIpV4Range);
char *pWhiteList = taosMemoryCalloc(1, sz);
pOld->pIpWhiteList = taosMemoryRealloc(pOld->pIpWhiteList, sz); pOld->pIpWhiteList = taosMemoryRealloc(pOld->pIpWhiteList, sz);
memcpy(pOld->pIpWhiteList, pNew->pIpWhiteList, sz); memcpy(pOld->pIpWhiteList, pNew->pIpWhiteList, sz);