add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-07 20:47:10 +08:00
parent 8c5fa3526a
commit 60d6a622e1
8 changed files with 109 additions and 30 deletions

View File

@ -885,6 +885,7 @@ typedef struct {
SIpV4Range pIpRange[];
} SIpWhiteList;
SIpWhiteList* cloneIpWhiteList(SIpWhiteList* pIpWhiteList);
typedef struct {
int8_t createType;
int8_t superUser; // denote if it is a super user or not

View File

@ -1376,6 +1376,13 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq)
return 0;
}
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
int32_t sz = sizeof(SIpWhiteList) + pIpWhiteList->num * sizeof(SIpV4Range);
SIpWhiteList *pNew = taosMemoryCalloc(1, sz);
memcpy(pNew, pIpWhiteList, sz);
return pNew;
}
int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1491,8 +1498,10 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
pClone->numOfUser = pReq->numOfUser;
pClone->ver = pReq->ver;
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
SUpdateUserIpWhite *pOld = &pReq->pUserIpWhite[i];

View File

@ -31,6 +31,7 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
}
}
static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) {
dInfo("ip-white-dnode ver: %" PRId64 ", status ver: %" PRId64 "", pMgmt->ipWhiteVer, ver);
if (pMgmt->ipWhiteVer == ver) {
return;
}

View File

@ -64,13 +64,12 @@ static void dmConvertErrCode(tmsg_t msgType) {
}
}
static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) {
SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite);
SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
rpcSetIpWhite(pTrans, pIpWhite);
rpcSetIpWhite(pTrans, &ipWhite);
tFreeSUpdateIpWhiteReq(pIpWhite);
taosMemoryFree(pIpWhite);
tFreeSUpdateIpWhiteReq(&ipWhite);
rpcFreeCont(pRpc->pCont);
}

View File

@ -130,6 +130,7 @@ typedef struct SMnode {
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
int64_t ipWhiteVer;
} SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);

View File

@ -517,6 +517,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
}
}
int64_t ver = mndGetIpWhiteVer(pMnode);
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
@ -524,7 +525,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool supportVnodesChanged = pDnode->numOfSupportVnodes != statusReq.numOfSupportVnodes;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged;
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || pMnode->ipWhiteVer != ver;
pMnode->ipWhiteVer = ver;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
@ -645,7 +648,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
mndGetDnodeEps(pMnode, statusRsp.pDnodeEps);
statusRsp.ipWhiteVer = mndGetIpWhiteVer(pMnode);
statusRsp.ipWhiteVer = ver;
int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp);
void *pHead = rpcMallocCont(contLen);

View File

@ -31,7 +31,6 @@
#define USER_VER_NUMBER 5
#define USER_RESERVE_SIZE 64
static SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList);
static SIpWhiteList *createDefaultIpWhiteList();
SIpWhiteList *createIpWhiteList(void *buf, int32_t len);
static bool updateIpWhiteList(SIpWhiteList *pOld, SIpWhiteList *pNew);
@ -58,7 +57,7 @@ SHashObj *mndFetchAllIpWhite(SMnode *pMnode);
static int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq);
typedef struct {
SHashObj *pIpWhiteList;
SHashObj *pIpWhiteTab;
int64_t ver;
TdThreadRwlock rw;
} SIpWhiteMgt;
@ -66,22 +65,23 @@ typedef struct {
static SIpWhiteMgt ipWhiteMgt;
void ipWhiteMgtInit() {
ipWhiteMgt.pIpWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
ipWhiteMgt.pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
ipWhiteMgt.ver = 0;
taosThreadRwlockInit(&ipWhiteMgt.rw, NULL);
}
void ipWhiteMgtCleanup() {
destroyIpWhiteTab(ipWhiteMgt.pIpWhiteList);
destroyIpWhiteTab(ipWhiteMgt.pIpWhiteTab);
taosThreadRwlockDestroy(&ipWhiteMgt.rw);
}
int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
bool update = true;
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteList, user, strlen(user));
SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteTab, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
SIpWhiteList *p = cloneIpWhiteList(pNew);
taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *));
taosHashPut(ipWhiteMgt.pIpWhiteTab, user, strlen(user), &p, sizeof(void *));
} else {
SIpWhiteList *pOld = *ppList;
if (isIpWhiteListEqual(pOld, pNew)) {
@ -89,7 +89,7 @@ int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
} else {
taosMemoryFree(pOld);
SIpWhiteList *p = cloneIpWhiteList(pNew);
taosHashPut(ipWhiteMgt.pIpWhiteList, user, strlen(user), &p, sizeof(void *));
taosHashPut(ipWhiteMgt.pIpWhiteTab, user, strlen(user), &p, sizeof(void *));
}
}
if (update) ipWhiteMgt.ver++;
@ -97,13 +97,81 @@ int32_t ipWhiteMgtUpdate(char *user, SIpWhiteList *pNew) {
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
}
int32_t ipWhiteMgtRemove(char *user) {
bool update = true;
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
SIpWhiteList **ppList = taosHashGet(ipWhiteMgt.pIpWhiteTab, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
update = false;
} else {
taosMemoryFree(*ppList);
taosHashRemove(ipWhiteMgt.pIpWhiteTab, user, strlen(user));
}
if (update) ipWhiteMgt.ver++;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
}
bool isRangeInWhiteList(SIpWhiteList *pList, SIpV4Range *range) {
for (int i = 0; i < pList->num; i++) {
if (isIpRangeEqual(&pList->pIpRange[i], range)) {
return true;
}
}
return false;
}
int32_t ipWhiteUpdateForAllUser(SIpWhiteList *pList) {
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
SHashObj *pIpWhiteTab = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 1, HASH_ENTRY_LOCK);
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, NULL);
while (pIter) {
SIpWhiteList *p = *(SIpWhiteList **)pIter;
SIpWhiteList *clone = cloneIpWhiteList(pList);
int32_t idx = 0;
for (int i = 0; i < pList->num; i++) {
SIpV4Range *e = &pList->pIpRange[i];
if (!isRangeInWhiteList(p, e)) {
clone->pIpRange[idx] = *e;
idx++;
}
}
clone->num = idx;
SIpWhiteList *val = NULL;
if (clone->num != 0) {
int32_t sz = clone->num + p->num;
val = taosMemoryCalloc(1, sizeof(SIpWhiteList) + sz * sizeof(SIpV4Range));
memcpy(val->pIpRange, p->pIpRange, sizeof(SIpV4Range) * p->num);
memcpy(((char *)val->pIpRange) + sizeof(SIpV4Range) * p->num, (char *)clone->pIpRange,
sizeof(SIpV4Range) * clone->num);
} else {
val = cloneIpWhiteList(p);
}
taosMemoryFree(clone);
size_t klen;
void *key = taosHashGetKey(pIter, &klen);
taosHashPut(pIpWhiteTab, key, klen, val, sizeof(void *));
}
destroyIpWhiteTab(ipWhiteMgt.pIpWhiteTab);
ipWhiteMgt.pIpWhiteTab = pIpWhiteTab;
ipWhiteMgt.ver++;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
return 0;
}
void ipWhiteMgtUpdateAll(SMnode *pMnode) {
ipWhiteMgt.ver++;
SHashObj *pNew = mndFetchAllIpWhite(pMnode);
SHashObj *pOld = ipWhiteMgt.pIpWhiteList;
SHashObj *pOld = ipWhiteMgt.pIpWhiteTab;
ipWhiteMgt.pIpWhiteList = pNew;
ipWhiteMgt.pIpWhiteTab = pNew;
destroyIpWhiteTab(pOld);
}
void ipWhiteMgtUpdate2(SMnode *pMnode) {
@ -123,15 +191,16 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) {
ver = ipWhiteMgt.ver;
taosThreadRwlockUnlock(&ipWhiteMgt.rw);
mInfo("ip-white-mnode ver, %" PRId64 "", ver);
return ver;
}
int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
int64_t ver = 0;
taosThreadRwlockWrlock(&ipWhiteMgt.rw);
ver = ipWhiteMgt.ver;
int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList);
int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteTab);
pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite));
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL);
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, NULL);
int32_t i = 0;
while (pIter) {
SUpdateUserIpWhite *pUser = &pUpdate->pUserIpWhite[i];
@ -147,7 +216,7 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range));
i++;
}
pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
pIter = taosHashIterate(ipWhiteMgt.pIpWhiteTab, pIter);
}
pUpdate->numOfUser = i;
@ -327,12 +396,6 @@ static SIpWhiteList *createDefaultIpWhiteList() {
}
return pWhiteList;
}
static SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
int32_t sz = sizeof(SIpWhiteList) + pIpWhiteList->num * sizeof(SIpV4Range);
SIpWhiteList *pNew = taosMemoryCalloc(1, sz);
memcpy(pNew, pIpWhiteList, sz);
return pNew;
}
static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) {
SUserObj userObj = {0};
@ -897,6 +960,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
p->pIpRange[i].ip = pCreate->pIpRanges[i].ip;
p->pIpRange[i].mask = pCreate->pIpRanges[i].mask;
}
p->num = pCreate->numIpRanges;
userObj.pIpWhiteList = p;
}
@ -922,7 +986,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
mndTransDrop(pTrans);
goto _OVER;
}
ipWhiteMgtUpdate(userObj.user, userObj.pIpWhiteList);
mndTransDrop(pTrans);
return 0;
_OVER:
@ -1050,7 +1114,7 @@ static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpc
mndTransDrop(pTrans);
return -1;
}
ipWhiteMgtUpdate(pNew->user, pNew->pIpWhiteList);
mndTransDrop(pTrans);
return 0;
}
@ -1515,6 +1579,7 @@ static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
mndTransDrop(pTrans);
return -1;
}
ipWhiteMgtRemove(pUser->user);
mndTransDrop(pTrans);
return 0;

View File

@ -1388,8 +1388,8 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
for (int i = 0; i < req->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
int32_t sz = sizeof(SIpWhiteList) + pUser->numOfRange * sizeof(SIpV4Range);
SIpWhiteList* pList = taosMemoryCalloc(1, sz);
int32_t sz = pUser->numOfRange * sizeof(SIpV4Range);
SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList));
pList->num = pUser->numOfRange;
memcpy(pList->pIpRange, pUser->pIpRanges, sz);