From 8c5fa3526a8631583bcac428dd05ad9147f63cde Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 7 Sep 2023 16:10:07 +0800 Subject: [PATCH] add rpc update interface --- include/common/tmsg.h | 9 +- source/common/src/tmsg.c | 25 +++- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 10 +- source/dnode/mnode/impl/src/mndUser.c | 4 +- source/libs/transport/src/transSvr.c | 121 ++++++++++-------- 5 files changed, 107 insertions(+), 62 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7142c933c5..96d6edac8c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -901,18 +901,21 @@ int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pR void tFreeSCreateUserReq(SCreateUserReq* pReq); typedef struct { + int64_t ver; char user[TSDB_USER_LEN]; int32_t numOfRange; SIpV4Range* pIpRanges; } SUpdateUserIpWhite; typedef struct { + int64_t ver; int numOfUser; SUpdateUserIpWhite* pUserIpWhite; } SUpdateIpWhite; -int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq); typedef struct { int64_t ipWhiteVer; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8ab9b5624d..0d7ccc35e6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1430,10 +1430,12 @@ int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - + if (tEncodeI64(&encoder, pReq->ver) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1; for (int i = 0; i < pReq->numOfUser; i++) { SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]); + + if (tEncodeI64(&encoder, pUser->ver) < 0) return -1; if (tEncodeCStr(&encoder, pUser->user) < 0) return -1; if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1; for (int j = 0; j < pUser->numOfRange; j++) { @@ -1455,11 +1457,13 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR if (tStartDecode(&decoder) < 0) return -1; // impl later + if (tDecodeI64(&decoder, &pReq->ver) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1; pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); for (int i = 0; i < pReq->numOfUser; i++) { SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + if (tDecodeI64(&decoder, &pUserWhite->ver) < 0) return -1; if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1; if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1; @@ -1484,6 +1488,25 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { // impl later return; } +SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { + SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); + + pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); + pClone->ver = pReq->ver; + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i]; + SUpdateUserIpWhite *pOld = &pReq->pUserIpWhite[i]; + + pNew->ver = pOld->ver; + memcpy(pNew->user, pOld->user, strlen(pOld->user)); + pNew->numOfRange = pOld->numOfRange; + + int32_t sz = pOld->numOfRange * sizeof(SIpV4Range); + pNew->pIpRanges = taosMemoryCalloc(1, sz); + memcpy(pNew->pIpRanges, pOld->pIpRanges, sz); + } + return pClone; +} int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index abccc23f58..d252371e57 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -67,8 +67,12 @@ static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) { SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite); - // rpcSetIpWhite(pTrans, pIpWhite); - // tFreeSUpdateIpWhiteReq(&ipWhite); + rpcSetIpWhite(pTrans, pIpWhite); + + tFreeSUpdateIpWhiteReq(pIpWhite); + taosMemoryFree(pIpWhite); + + rpcFreeCont(pRpc->pCont); } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; @@ -103,8 +107,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet != NULL) { dmSetMnodeEpSet(&pDnode->data, pEpSet); } + break; case TDMT_MND_RETRIEVE_IP_WHITE_RSP: { dmUpdateRpcIpWhite(pTrans->serverRpc, pRpc); + return; } break; default: break; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 128bb9b84b..1bb08733ea 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -126,7 +126,9 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) { return ver; } int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { + int64_t ver = 0; taosThreadRwlockWrlock(&ipWhiteMgt.rw); + ver = ipWhiteMgt.ver; int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList); pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite)); void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL); @@ -138,8 +140,8 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { size_t klen; char *key = taosHashGetKey(pIter, &klen); if (list->num != 0) { + pUser->ver = ver; memcpy(pUser->user, key, klen); - pUser->numOfRange = list->num; pUser->pIpRanges = taosMemoryCalloc(1, list->num * sizeof(SIpV4Range)); memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range)); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 923b61d876..31b7af5fa2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -120,7 +120,7 @@ typedef struct SServerObj { SWhiteList* uvWhiteListCreate(); void uvWhiteListDestroy(SWhiteList* pWhite); -void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip); +void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* pList, int64_t ver); void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable); bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn); bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver); @@ -234,29 +234,31 @@ typedef struct { int8_t type; } SubnetUtils; -int32_t subnetInit(SubnetUtils* pUtils, char* range) { - char buf[32] = {0}; - strncpy(pUtils->info, range, strlen(range)); - strncpy(buf, range, strlen(range)); +int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) { + // char buf[32] = {0}; + // strncpy(pUtils->info, range, strlen(range)); + // strncpy(buf, range, strlen(range)); - int16_t ip[5] = {0}; - int8_t k = cvtIp2Int(buf, ip); - if (k < 4) { - return -1; - } + // int16_t ip[5] = {0}; + // int8_t k = cvtIp2Int(buf, ip); + // if (k < 4) { + // return -1; + // } - for (int i = 0; i < 4; i++) { - pUtils->address |= (ip[i] << (8 * (4 - i - 1))); - } - if (k == 5) { - for (int i = 0; i < ip[4]; i++) { - pUtils->netmask |= (1 << (31 - i)); - } + // for (int i = 0; i < 4; i++) { + // pUtils->address |= (ip[i] << (8 * (4 - i - 1))); + // } + pUtils->address = pRange->ip; + + int32_t mask = 0; + for (int i = 0; i < pRange->mask; i++) { + mask |= (1 << (31 - i)); } + pUtils->netmask = mask; pUtils->network = pUtils->address & pUtils->netmask; pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF); - pUtils->type = (k == 4 ? 0 : 1); + pUtils->type = (pRange->mask == 0 ? 0 : 1); return 0; } @@ -275,10 +277,10 @@ int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) { } } -static bool uvCheckIp(char* range, int32_t ip) { +static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) { // impl later SubnetUtils subnet = {0}; - if (subnetInit(&subnet, range) != 0) { + if (subnetInit(&subnet, pRange) != 0) { return false; } return subnetCheckIp(&subnet, ip); @@ -295,33 +297,33 @@ void uvWhiteListDestroy(SWhiteList* pWhite) { SHashObj* pWhiteList = pWhite->pList; void* pIter = taosHashIterate(pWhiteList, NULL); while (pIter) { - SWhiteUserList* pList = *(SWhiteUserList**)pIter; - // for (int i = 0; i < taosArrayGetSize(pList->list); i++) { - // char* range = taosArrayGetP(pList->list, i); - // taosMemoryFree(range); - // } - // taosArrayDestroy(pList->list); - taosMemoryFree(pList); + SWhiteUserList* pUserList = *(SWhiteUserList**)pIter; + taosMemoryFree(pUserList->pList); + taosMemoryFree(pUserList); + pIter = taosHashIterate(pWhiteList, pIter); } taosHashCleanup(pWhiteList); taosMemoryFree(pWhite); } -void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) { +void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* plist, int64_t ver) { SHashObj* pWhiteList = pWhite->pList; - SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user)); - if (ppList == NULL || *ppList == NULL) { - // SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); - // pList->list = taosArrayInit(8, sizeof(void*)); - // taosArrayPush(pList->list, &ip); - // pList->ver += 1; - // taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*)); + SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user)); + if (ppUserList == NULL || *ppUserList == NULL) { + SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); + pUserList->ver = ver; + + pUserList->pList = plist; + + taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*)); } else { - // SWhiteUserList* pList = *ppList; - // pList->ver += 1; - // taosArrayPush(pList->list, &ip); + SWhiteUserList* pUserList = *ppUserList; + + taosMemoryFreeClear(pUserList->pList); + pUserList->ver = ver; + pUserList->pList = plist; } } @@ -341,13 +343,14 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver) SWhiteUserList* pList = *ppList; if (pList->ver == ver) return true; - // for (int i = 0; i < taosArrayGetSize(pList->list); i++) { - // char* range = taosArrayGetP(pList->list, i); - // if (uvCheckIp(range, ip)) { - // valid = true; - // break; - // } - // } + SIpWhiteList* pIpWhiteList = pList->pList; + for (int i = 0; i < pIpWhiteList->num; i++) { + SIpV4Range* range = &pIpWhiteList->pIpRange[i]; + if (uvCheckIp(range, ip)) { + valid = true; + break; + } + } return valid; } bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) { @@ -1381,17 +1384,22 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { } } void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { - // update white ip - // bool ret = (msg->func)(msg->arg); - SUpdateIpWhite* updateReq = msg->arg; - for (int i = 0; i < updateReq->numOfUser; i++) { - SUpdateUserIpWhite* pUser = &updateReq->pUserIpWhite[i]; + SUpdateIpWhite* req = msg->arg; + for (int i = 0; i < req->numOfUser; i++) { + SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; + + int32_t sz = sizeof(SIpWhiteList) + pUser->numOfRange * sizeof(SIpV4Range); + SIpWhiteList* pList = taosMemoryCalloc(1, sz); + pList->num = pUser->numOfRange; + + memcpy(pList->pIpRange, pUser->pIpRanges, sz); + uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver); } - // uvWhiteListUpdate(thrd->pWhiteList, SHashObj *pTable); - tFreeSUpdateIpWhiteReq(updateReq); + thrd->pWhiteList->ver = req->ver; - taosMemoryFree(updateReq); + tFreeSUpdateIpWhiteReq(req); + taosMemoryFree(req); taosMemoryFree(msg); return; } @@ -1581,9 +1589,12 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; - SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + SUpdateIpWhite* pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg); + msg->type = Update; - msg->arg = arg; + msg->arg = pReq; + transAsyncSend(pThrd->asyncPool, &msg->q); } transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);