add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-07 16:10:07 +08:00
parent f4f97a67f5
commit 8c5fa3526a
5 changed files with 107 additions and 62 deletions

View File

@ -901,18 +901,21 @@ int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pR
void tFreeSCreateUserReq(SCreateUserReq* pReq); void tFreeSCreateUserReq(SCreateUserReq* pReq);
typedef struct { typedef struct {
int64_t ver;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
int32_t numOfRange; int32_t numOfRange;
SIpV4Range* pIpRanges; SIpV4Range* pIpRanges;
} SUpdateUserIpWhite; } SUpdateUserIpWhite;
typedef struct { typedef struct {
int64_t ver;
int numOfUser; int numOfUser;
SUpdateUserIpWhite* pUserIpWhite; SUpdateUserIpWhite* pUserIpWhite;
} SUpdateIpWhite; } SUpdateIpWhite;
int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq);
void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq);
typedef struct { typedef struct {
int64_t ipWhiteVer; int64_t ipWhiteVer;

View File

@ -1430,10 +1430,12 @@ int32_t tSerializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pReq
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->ver) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfUser) < 0) return -1;
for (int i = 0; i < pReq->numOfUser; i++) { for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]); SUpdateUserIpWhite *pUser = &(pReq->pUserIpWhite[i]);
if (tEncodeI64(&encoder, pUser->ver) < 0) return -1;
if (tEncodeCStr(&encoder, pUser->user) < 0) return -1; if (tEncodeCStr(&encoder, pUser->user) < 0) return -1;
if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1; if (tEncodeI32(&encoder, pUser->numOfRange) < 0) return -1;
for (int j = 0; j < pUser->numOfRange; j++) { for (int j = 0; j < pUser->numOfRange; j++) {
@ -1455,11 +1457,13 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
// impl later // impl later
if (tDecodeI64(&decoder, &pReq->ver) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfUser) < 0) return -1;
pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); pReq->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
for (int i = 0; i < pReq->numOfUser; i++) { for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i];
if (tDecodeI64(&decoder, &pUserWhite->ver) < 0) return -1;
if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1; if (tDecodeCStrTo(&decoder, pUserWhite->user) < 0) return -1;
if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1; if (tDecodeI32(&decoder, &pUserWhite->numOfRange) < 0) return -1;
@ -1484,6 +1488,25 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
// impl later // impl later
return; return;
} }
SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser);
pClone->ver = pReq->ver;
for (int i = 0; i < pReq->numOfUser; i++) {
SUpdateUserIpWhite *pNew = &pClone->pUserIpWhite[i];
SUpdateUserIpWhite *pOld = &pReq->pUserIpWhite[i];
pNew->ver = pOld->ver;
memcpy(pNew->user, pOld->user, strlen(pOld->user));
pNew->numOfRange = pOld->numOfRange;
int32_t sz = pOld->numOfRange * sizeof(SIpV4Range);
pNew->pIpRanges = taosMemoryCalloc(1, sz);
memcpy(pNew->pIpRanges, pOld->pIpRanges, sz);
}
return pClone;
}
int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);

View File

@ -67,8 +67,12 @@ static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) {
SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite); tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite);
// rpcSetIpWhite(pTrans, pIpWhite); rpcSetIpWhite(pTrans, pIpWhite);
// tFreeSUpdateIpWhiteReq(&ipWhite);
tFreeSUpdateIpWhiteReq(pIpWhite);
taosMemoryFree(pIpWhite);
rpcFreeCont(pRpc->pCont);
} }
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
@ -103,8 +107,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet != NULL) { if (pEpSet != NULL) {
dmSetMnodeEpSet(&pDnode->data, pEpSet); dmSetMnodeEpSet(&pDnode->data, pEpSet);
} }
break;
case TDMT_MND_RETRIEVE_IP_WHITE_RSP: { case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
dmUpdateRpcIpWhite(pTrans->serverRpc, pRpc); dmUpdateRpcIpWhite(pTrans->serverRpc, pRpc);
return;
} break; } break;
default: default:
break; break;

View File

@ -126,7 +126,9 @@ int64_t mndGetIpWhiteVer(SMnode *pMnode) {
return ver; return ver;
} }
int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
int64_t ver = 0;
taosThreadRwlockWrlock(&ipWhiteMgt.rw); taosThreadRwlockWrlock(&ipWhiteMgt.rw);
ver = ipWhiteMgt.ver;
int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList); int32_t num = taosHashGetSize(ipWhiteMgt.pIpWhiteList);
pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite)); pUpdate->pUserIpWhite = taosMemoryCalloc(1, num * sizeof(SUpdateUserIpWhite));
void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL); void *pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, NULL);
@ -138,8 +140,8 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
size_t klen; size_t klen;
char *key = taosHashGetKey(pIter, &klen); char *key = taosHashGetKey(pIter, &klen);
if (list->num != 0) { if (list->num != 0) {
pUser->ver = ver;
memcpy(pUser->user, key, klen); memcpy(pUser->user, key, klen);
pUser->numOfRange = list->num; pUser->numOfRange = list->num;
pUser->pIpRanges = taosMemoryCalloc(1, list->num * sizeof(SIpV4Range)); pUser->pIpRanges = taosMemoryCalloc(1, list->num * sizeof(SIpV4Range));
memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range)); memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range));

View File

@ -120,7 +120,7 @@ typedef struct SServerObj {
SWhiteList* uvWhiteListCreate(); SWhiteList* uvWhiteListCreate();
void uvWhiteListDestroy(SWhiteList* pWhite); void uvWhiteListDestroy(SWhiteList* pWhite);
void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip); void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* pList, int64_t ver);
void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable); void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable);
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn); bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn);
bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver); bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver);
@ -234,29 +234,31 @@ typedef struct {
int8_t type; int8_t type;
} SubnetUtils; } SubnetUtils;
int32_t subnetInit(SubnetUtils* pUtils, char* range) { int32_t subnetInit(SubnetUtils* pUtils, SIpV4Range* pRange) {
char buf[32] = {0}; // char buf[32] = {0};
strncpy(pUtils->info, range, strlen(range)); // strncpy(pUtils->info, range, strlen(range));
strncpy(buf, range, strlen(range)); // strncpy(buf, range, strlen(range));
int16_t ip[5] = {0}; // int16_t ip[5] = {0};
int8_t k = cvtIp2Int(buf, ip); // int8_t k = cvtIp2Int(buf, ip);
if (k < 4) { // if (k < 4) {
return -1; // return -1;
} // }
for (int i = 0; i < 4; i++) { // for (int i = 0; i < 4; i++) {
pUtils->address |= (ip[i] << (8 * (4 - i - 1))); // pUtils->address |= (ip[i] << (8 * (4 - i - 1)));
} // }
if (k == 5) { pUtils->address = pRange->ip;
for (int i = 0; i < ip[4]; i++) {
pUtils->netmask |= (1 << (31 - i)); int32_t mask = 0;
} for (int i = 0; i < pRange->mask; i++) {
mask |= (1 << (31 - i));
} }
pUtils->netmask = mask;
pUtils->network = pUtils->address & pUtils->netmask; pUtils->network = pUtils->address & pUtils->netmask;
pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF); pUtils->broadcast = (pUtils->network) | (pUtils->netmask ^ 0xFFFFFFFF);
pUtils->type = (k == 4 ? 0 : 1); pUtils->type = (pRange->mask == 0 ? 0 : 1);
return 0; return 0;
} }
@ -275,10 +277,10 @@ int32_t subnetCheckIp(SubnetUtils* pUtils, uint32_t ip) {
} }
} }
static bool uvCheckIp(char* range, int32_t ip) { static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) {
// impl later // impl later
SubnetUtils subnet = {0}; SubnetUtils subnet = {0};
if (subnetInit(&subnet, range) != 0) { if (subnetInit(&subnet, pRange) != 0) {
return false; return false;
} }
return subnetCheckIp(&subnet, ip); return subnetCheckIp(&subnet, ip);
@ -295,33 +297,33 @@ void uvWhiteListDestroy(SWhiteList* pWhite) {
SHashObj* pWhiteList = pWhite->pList; SHashObj* pWhiteList = pWhite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL); void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) { while (pIter) {
SWhiteUserList* pList = *(SWhiteUserList**)pIter; SWhiteUserList* pUserList = *(SWhiteUserList**)pIter;
// for (int i = 0; i < taosArrayGetSize(pList->list); i++) { taosMemoryFree(pUserList->pList);
// char* range = taosArrayGetP(pList->list, i); taosMemoryFree(pUserList);
// taosMemoryFree(range);
// }
// taosArrayDestroy(pList->list);
taosMemoryFree(pList);
pIter = taosHashIterate(pWhiteList, pIter); pIter = taosHashIterate(pWhiteList, pIter);
} }
taosHashCleanup(pWhiteList); taosHashCleanup(pWhiteList);
taosMemoryFree(pWhite); taosMemoryFree(pWhite);
} }
void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) { void uvWhiteListAdd(SWhiteList* pWhite, char* user, SIpWhiteList* plist, int64_t ver) {
SHashObj* pWhiteList = pWhite->pList; SHashObj* pWhiteList = pWhite->pList;
SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user)); SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) { if (ppUserList == NULL || *ppUserList == NULL) {
// SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
// pList->list = taosArrayInit(8, sizeof(void*)); pUserList->ver = ver;
// taosArrayPush(pList->list, &ip);
// pList->ver += 1; pUserList->pList = plist;
// taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*));
taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*));
} else { } else {
// SWhiteUserList* pList = *ppList; SWhiteUserList* pUserList = *ppUserList;
// pList->ver += 1;
// taosArrayPush(pList->list, &ip); taosMemoryFreeClear(pUserList->pList);
pUserList->ver = ver;
pUserList->pList = plist;
} }
} }
@ -341,13 +343,14 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver)
SWhiteUserList* pList = *ppList; SWhiteUserList* pList = *ppList;
if (pList->ver == ver) return true; if (pList->ver == ver) return true;
// for (int i = 0; i < taosArrayGetSize(pList->list); i++) { SIpWhiteList* pIpWhiteList = pList->pList;
// char* range = taosArrayGetP(pList->list, i); for (int i = 0; i < pIpWhiteList->num; i++) {
// if (uvCheckIp(range, ip)) { SIpV4Range* range = &pIpWhiteList->pIpRange[i];
// valid = true; if (uvCheckIp(range, ip)) {
// break; valid = true;
// } break;
// } }
}
return valid; return valid;
} }
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) { bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) {
@ -1381,17 +1384,22 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
} }
} }
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
// update white ip SUpdateIpWhite* req = msg->arg;
// bool ret = (msg->func)(msg->arg); for (int i = 0; i < req->numOfUser; i++) {
SUpdateIpWhite* updateReq = msg->arg; SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i];
for (int i = 0; i < updateReq->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &updateReq->pUserIpWhite[i]; int32_t sz = sizeof(SIpWhiteList) + pUser->numOfRange * sizeof(SIpV4Range);
SIpWhiteList* pList = taosMemoryCalloc(1, sz);
pList->num = pUser->numOfRange;
memcpy(pList->pIpRange, pUser->pIpRanges, sz);
uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver);
} }
// uvWhiteListUpdate(thrd->pWhiteList, SHashObj *pTable);
tFreeSUpdateIpWhiteReq(updateReq); thrd->pWhiteList->ver = req->ver;
taosMemoryFree(updateReq); tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req);
taosMemoryFree(msg); taosMemoryFree(msg);
return; return;
} }
@ -1581,9 +1589,12 @@ void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
for (int i = 0; i < svrObj->numOfThreads; i++) { for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i]; SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
SUpdateIpWhite* pReq = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg);
msg->type = Update; msg->type = Update;
msg->arg = arg; msg->arg = pReq;
transAsyncSend(pThrd->asyncPool, &msg->q); transAsyncSend(pThrd->asyncPool, &msg->q);
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);