From 147de26eec871433467bc75bb7001d5509760495 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 7 Sep 2023 11:54:34 +0800 Subject: [PATCH] add rpc update interface --- include/common/tmsg.h | 5 ++ source/dnode/mgmt/node_mgmt/src/dmTransport.c | 13 ++-- source/dnode/mnode/impl/inc/mndDef.h | 4 -- source/dnode/mnode/impl/src/mndUser.c | 8 +-- source/libs/transport/inc/transComm.h | 11 ++-- source/libs/transport/src/trans.c | 5 +- source/libs/transport/src/transSvr.c | 62 +++++++++++-------- 7 files changed, 62 insertions(+), 46 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 406e5fcda7..7142c933c5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -880,6 +880,11 @@ typedef struct SIpV4Range { uint32_t mask; } SIpV4Range; +typedef struct { + int32_t num; + SIpV4Range pIpRange[]; +} SIpWhiteList; + typedef struct { int8_t createType; int8_t superUser; // denote if it is a super user or not diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c09e9e4fdf..abccc23f58 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -63,11 +63,12 @@ static void dmConvertErrCode(tmsg_t msgType) { terrno = TSDB_CODE_VND_STOPPED; } } -static void dmUpdateRpcIpWhite(SRpcMsg *pRpc) { - SUpdateIpWhite ipWhite; - tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite); - - tFreeSUpdateIpWhiteReq(&ipWhite); +static void dmUpdateRpcIpWhite(void *pTrans, SRpcMsg *pRpc) { + SUpdateIpWhite *pIpWhite = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); + tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, pIpWhite); + + // rpcSetIpWhite(pTrans, pIpWhite); + // tFreeSUpdateIpWhiteReq(&ipWhite); } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; @@ -103,7 +104,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dmSetMnodeEpSet(&pDnode->data, pEpSet); } case TDMT_MND_RETRIEVE_IP_WHITE_RSP: { - dmUpdateRpcIpWhite(pRpc); + dmUpdateRpcIpWhite(pTrans->serverRpc, pRpc); } break; default: break; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 90557de7dd..6b42fceaba 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -275,10 +275,6 @@ typedef struct { SAcctCfg cfg; SAcctInfo info; } SAcctObj; -typedef struct { - int32_t num; - SIpV4Range pIpRange[]; -} SIpWhiteList; typedef struct { char user[TSDB_USER_LEN]; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 0770f4a570..27d792282a 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -138,7 +138,6 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { size_t klen; char *key = taosHashGetKey(pIter, &klen); if (list->num != 0) { - taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter); memcpy(pUser->user, key, klen); pUser->numOfRange = list->num; @@ -146,7 +145,7 @@ int64_t ipWhiteMgtFillMsg(SUpdateIpWhite *pUpdate) { memcpy(pUser->pIpRanges, list->pIpRange, list->num * sizeof(SIpV4Range)); i++; } - taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter); + pIter = taosHashIterate(ipWhiteMgt.pIpWhiteList, pIter); } pUpdate->numOfUser = i; @@ -258,7 +257,7 @@ int32_t convertIpWhiteListToStr(SIpWhiteList *pList, char **buf) { *buf = NULL; return 0; } - *buf = taosMemoryCalloc(1, pList->num * (sizeof(SIpWhiteList) + 4) + 4); + *buf = taosMemoryCalloc(1, pList->num * 36); ipRangeListToStr(pList->pIpRange, pList->num, *buf); return strlen(*buf); } @@ -1006,7 +1005,8 @@ int32_t mndProcesSRetrieveIpWhiteReq(SRpcMsg *pReq) { goto _OVER; } - SUpdateIpWhite ipWhite; + SUpdateIpWhite ipWhite = {0}; + ipWhiteMgtFillMsg(&ipWhite); int32_t len = tSerializeSUpdateIpWhite(NULL, 0, &ipWhite); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2d5a18fcc0..012884abc3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -305,11 +305,12 @@ void transUnrefCliHandle(void* handle); int transReleaseCliHandle(void* handle); int transReleaseSrvHandle(void* handle); -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendResponse(const STransMsg* msg); -int transRegisterMsg(const STransMsg* msg); -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendResponse(const STransMsg* msg); +int transRegisterMsg(const STransMsg* msg); +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); int transSockInfo2Str(struct sockaddr* sockname, char* dst); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4875bce1b1..ca255ec69b 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -178,12 +178,13 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } int rpcReleaseHandle(void* handle, int8_t type) { return (*transReleaseHandle[type])(handle); } +// client only int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later return transSetDefaultAddr(thandle, ip, fqdn); } - -void rpcSetIpWhite(void* thandl, void* arg) { return ; } +// server only +void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); } void* rpcAllocHandle() { return (void*)transAllocHandle(); } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1d2c5ef13c..923b61d876 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -71,8 +71,10 @@ typedef struct SSvrMsg { } SSvrMsg; typedef struct { - int64_t ver; - SArray* list; + int64_t ver; + SIpWhiteList* pList; + // SArray* list; + } SWhiteUserList; typedef struct { SHashObj* pList; @@ -294,11 +296,11 @@ void uvWhiteListDestroy(SWhiteList* pWhite) { void* pIter = taosHashIterate(pWhiteList, NULL); while (pIter) { SWhiteUserList* pList = *(SWhiteUserList**)pIter; - for (int i = 0; i < taosArrayGetSize(pList->list); i++) { - char* range = taosArrayGetP(pList->list, i); - taosMemoryFree(range); - } - taosArrayDestroy(pList->list); + // for (int i = 0; i < taosArrayGetSize(pList->list); i++) { + // char* range = taosArrayGetP(pList->list, i); + // taosMemoryFree(range); + // } + // taosArrayDestroy(pList->list); taosMemoryFree(pList); pIter = taosHashIterate(pWhiteList, pIter); } @@ -311,15 +313,15 @@ void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) { SWhiteUserList** ppList = taosHashGet(pWhiteList, user, strlen(user)); if (ppList == NULL || *ppList == NULL) { - SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); - pList->list = taosArrayInit(8, sizeof(void*)); - taosArrayPush(pList->list, &ip); - pList->ver += 1; - taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*)); + // SWhiteUserList* pList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); + // pList->list = taosArrayInit(8, sizeof(void*)); + // taosArrayPush(pList->list, &ip); + // pList->ver += 1; + // taosHashPut(pWhiteList, user, strlen(user), &pList, sizeof(void*)); } else { - SWhiteUserList* pList = *ppList; - pList->ver += 1; - taosArrayPush(pList->list, &ip); + // SWhiteUserList* pList = *ppList; + // pList->ver += 1; + // taosArrayPush(pList->list, &ip); } } @@ -339,13 +341,13 @@ bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip, int64_t ver) SWhiteUserList* pList = *ppList; if (pList->ver == ver) return true; - for (int i = 0; i < taosArrayGetSize(pList->list); i++) { - char* range = taosArrayGetP(pList->list, i); - if (uvCheckIp(range, ip)) { - valid = true; - break; - } - } + // for (int i = 0; i < taosArrayGetSize(pList->list); i++) { + // char* range = taosArrayGetP(pList->list, i); + // if (uvCheckIp(range, ip)) { + // valid = true; + // break; + // } + // } return valid; } bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) { @@ -1380,8 +1382,16 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { } void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { // update white ip - bool ret = (msg->func)(msg->arg); + // bool ret = (msg->func)(msg->arg); + SUpdateIpWhite* updateReq = msg->arg; + for (int i = 0; i < updateReq->numOfUser; i++) { + SUpdateUserIpWhite* pUser = &updateReq->pUserIpWhite[i]; + } + // uvWhiteListUpdate(thrd->pWhiteList, SHashObj *pTable); + tFreeSUpdateIpWhiteReq(updateReq); + + taosMemoryFree(updateReq); taosMemoryFree(msg); return; } @@ -1564,17 +1574,19 @@ _return2: return -1; } void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle); + tInfo("update ip white list"); - SServerObj* svrObj = thandle; + SServerObj* svrObj = pTransInst->tcphandle; for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); msg->type = Update; msg->arg = arg; - msg->func = *func; transAsyncSend(pThrd->asyncPool, &msg->q); } + transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }