add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-07 11:54:34 +08:00
parent bc27d245b1
commit 147de26eec
7 changed files with 62 additions and 46 deletions

View File

@ -880,6 +880,11 @@ typedef struct SIpV4Range {
uint32_t mask;
} SIpV4Range;
typedef struct {
int32_t num;
SIpV4Range pIpRange[];
} SIpWhiteList;
typedef struct {
int8_t createType;
int8_t superUser; // denote if it is a super user or not

View File

@ -63,11 +63,12 @@ static void dmConvertErrCode(tmsg_t msgType) {
terrno = TSDB_CODE_VND_STOPPED;
}
}
static void dmUpdateRpcIpWhite(SRpcMsg *pRpc) {
SUpdateIpWhite ipWhite;
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite);
tFreeSUpdateIpWhiteReq(&ipWhite);
static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) {
SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite));
tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite);
// rpcSetIpWhite(pTrans, pIpWhite);
// tFreeSUpdateIpWhiteReq(&ipWhite);
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
@ -103,7 +104,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmSetMnodeEpSet(&pDnode->data, pEpSet);
}
case TDMT_MND_RETRIEVE_IP_WHITE_RSP: {
dmUpdateRpcIpWhite(pRpc);
dmUpdateRpcIpWhite(pTrans->serverRpc, pRpc);
} break;
default:
break;

View File

@ -275,10 +275,6 @@ typedef struct {
SAcctCfg cfg;
SAcctInfo info;
} SAcctObj;
typedef struct {
int32_t num;
SIpV4Range pIpRange[];
} SIpWhiteList;
typedef struct {
char user[TSDB_USER_LEN];

View File

@ -138,7 +138,6 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
size_t klen;
char *key = taosHashGetKey(pIter, &klen);
if (list->num != 0) {
taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
memcpy(pUser->user, key, klen);
pUser->numOfRange = list->num;
@ -146,7 +145,7 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) {
memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range));
i++;
}
taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter);
}
pUpdate->numOfUser = i;
@ -258,7 +257,7 @@ int32_t convertIpWhiteListToStr(SIpWhiteList *pList, char **buf) {
*buf = NULL;
return 0;
}
*buf = taosMemoryCalloc(1, pList->num * (sizeof(SIpWhiteList) + 4) + 4);
*buf = taosMemoryCalloc(1, pList->num * 36);
ipRangeListToStr(pList->pIpRange, pList->num, *buf);
return strlen(*buf);
}
@ -1006,7 +1005,8 @@ int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) {
goto _OVER;
}
SUpdateIpWhite ipWhite;
SUpdateIpWhite ipWhite = {0};
ipWhiteMgtFillMsg(&ipWhite);
int32_t len = tSerializeSUpdateIpWhite(NULL, 0, &ipWhite);

View File

@ -305,11 +305,12 @@ void transUnrefCliHandle(void* handle);
int transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func);
int transSockInfo2Str(struct sockaddr* sockname, char* dst);

View File

@ -178,12 +178,13 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); }
// client only
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later
return transSetDefaultAddr(thandle, ip, fqdn);
}
void rpcSetIpWhite(void* thandl, void* arg) { return ; }
// server only
void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); }
void* rpcAllocHandle() { return (void*)transAllocHandle(); }

View File

@ -71,8 +71,10 @@ typedef struct SSvrMsg {
} SSvrMsg;
typedef struct {
int64_t ver;
SArray* list;
int64_t ver;
SIpWhiteList* pList;
// SArray* list;
} SWhiteUserList;
typedef struct {
SHashObj* pList;
@ -294,11 +296,11 @@ void uvWhiteListDestroy(SWhiteList* pWhite) {
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
SWhiteUserList* pList = *(SWhiteUserList**)pIter;
for (int i = 0; i < taosArrayGetSize(pList->list); i++) {
char* range = taosArrayGetP(pList->list, i);
taosMemoryFree(range);
}
taosArrayDestroy(pList->list);
// for (int i = 0; i < taosArrayGetSize(pList->list); i++) {
// char* range = taosArrayGetP(pList->list, i);
// taosMemoryFree(range);
// }
// taosArrayDestroy(pList->list);
taosMemoryFree(pList);
pIter = taosHashIterate(pWhiteList, pIter);
}
@ -311,15 +313,15 @@ void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) {
SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user));
if (ppList == NULL || *ppList == NULL) {
SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
pList->list = taosArrayInit(8, sizeof(void*));
taosArrayPush(pList->list, &ip);
pList->ver += 1;
taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*));
// SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList));
// pList->list = taosArrayInit(8, sizeof(void*));
// taosArrayPush(pList->list, &ip);
// pList->ver += 1;
// taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*));
} else {
SWhiteUserList* pList = *ppList;
pList->ver += 1;
taosArrayPush(pList->list, &ip);
// SWhiteUserList* pList = *ppList;
// pList->ver += 1;
// taosArrayPush(pList->list, &ip);
}
}
@ -339,13 +341,13 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver)
SWhiteUserList* pList = *ppList;
if (pList->ver == ver) return true;
for (int i = 0; i < taosArrayGetSize(pList->list); i++) {
char* range = taosArrayGetP(pList->list, i);
if (uvCheckIp(range, ip)) {
valid = true;
break;
}
}
// for (int i = 0; i < taosArrayGetSize(pList->list); i++) {
// char* range = taosArrayGetP(pList->list, i);
// if (uvCheckIp(range, ip)) {
// valid = true;
// break;
// }
// }
return valid;
}
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) {
@ -1380,8 +1382,16 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
}
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
// update white ip
bool ret = (msg->func)(msg->arg);
// bool ret = (msg->func)(msg->arg);
SUpdateIpWhite* updateReq = msg->arg;
for (int i = 0; i < updateReq->numOfUser; i++) {
SUpdateUserIpWhite* pUser = &updateReq->pUserIpWhite[i];
}
// uvWhiteListUpdate(thrd->pWhiteList, SHashObj *pTable);
tFreeSUpdateIpWhiteReq(updateReq);
taosMemoryFree(updateReq);
taosMemoryFree(msg);
return;
}
@ -1564,17 +1574,19 @@ _return2:
return -1;
}
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle);
tInfo("update ip white list");
SServerObj* svrObj = thandle;
SServerObj* svrObj = pTransInst->tcphandle;
for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Update;
msg->arg = arg;
msg->func = *func;
transAsyncSend(pThrd->asyncPool, &msg->q);
}
transReleaseExHandle(transGetInstMgt(), (int64_t)thandle);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }