From b485a61767a02e1977d44a70dcf2e6f97ab8c950 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 2 Sep 2023 18:01:25 +0800 Subject: [PATCH] add rpc update interface --- source/libs/transport/src/transSvr.c | 116 +++++++++++++++++---------- 1 file changed, 74 insertions(+), 42 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 5509152984..4ee7ceb426 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -56,6 +56,8 @@ typedef struct SSvrConn { char user[TSDB_UNI_LEN]; // user ID for the link char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; // ciphering key + + int64_t whiteListVer; } SSvrConn; typedef struct SSvrMsg { @@ -69,6 +71,10 @@ typedef struct SSvrMsg { } SSvrMsg; +typedef struct { + SHashObj* pList; + int64_t ver; +} SWhiteList; typedef struct SWorkThrd { TdThread thread; uv_connect_t connect_req; @@ -83,7 +89,8 @@ typedef struct SWorkThrd { void* pTransInst; bool quit; - SHashObj* pWhiteList; + SWhiteList* pWhiteList; + int64_t whiteListVer; } SWorkThrd; typedef struct SServerObj { @@ -106,6 +113,14 @@ typedef struct SServerObj { bool inited; } SServerObj; +SWhiteList* uvWhiteListCreate(); +void uvWhiteListDestroy(SWhiteList* pWhite); +void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip); +void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable); +bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn); +bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip); +void uvWhiteListSetConnVer(SWhiteList* pWhite, SSvrConn* pConn); + static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); @@ -263,44 +278,57 @@ static bool uvCheckIp(char* range, int32_t ip) { return subnetCheckIp(&subnet, ip); } -static void uvWhiteListDestroy(SHashObj* pWhiteList) { - void* pIter = taosHashIterate(pWhiteList, NULL); +SWhiteList* uvWhiteListCreate() { + SWhiteList* pWhiteList = taosMemoryCalloc(1, sizeof(SWhiteList)); + + pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + pWhiteList->ver = 0; + return pWhiteList; +} +void uvWhiteListDestroy(SWhiteList* pWhite) { + SHashObj* pWhiteList = pWhite->pList; + void* pIter = taosHashIterate(pWhiteList, NULL); while (pIter) { - SArray* list = *(SArray**)pIter; - for (int i = 0; i < taosArrayGetSize(list); i++) { - char* range = taosArrayGetP(list, i); + SArray* pArr = *(SArray**)pIter; + for (int i = 0; i < taosArrayGetSize(pArr); i++) { + char* range = taosArrayGetP(pArr, i); taosMemoryFree(range); } - taosArrayDestroy(list); + taosArrayDestroy(pArr); pIter = taosHashIterate(pWhiteList, pIter); } taosHashCleanup(pWhiteList); -} -static bool uvWhiteListAdd(SHashObj* pWhiteList, char* user, char* ip) { - SArray** pWhite = taosHashGet(pWhiteList, user, strlen(user)); - if (pWhite == NULL || *pWhite == NULL) { - SArray* list = taosArrayInit(8, sizeof(void*)); - taosArrayPush(list, &ip); - taosHashPut(pWhiteList, user, strlen(user), &list, sizeof(void*)); - } else { - taosArrayPush(*pWhite, &ip); - } - return true; + taosMemoryFree(pWhite); } -static bool uvWhiteListFilte(SHashObj* pWhiteList, char* user, uint32_t ip) { +void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) { + SHashObj* pWhiteList = pWhite->pList; + + SArray** ppArr = taosHashGet(pWhiteList, user, strlen(user)); + if (ppArr == NULL || *ppArr == NULL) { + SArray* pArr = taosArrayInit(8, sizeof(void*)); + taosArrayPush(pArr, &ip); + taosHashPut(pWhiteList, user, strlen(user), &pArr, sizeof(void*)); + } else { + taosArrayPush(*ppArr, &ip); + } +} + +void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable) { + // impl later +} + +bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip) { // impl check - bool valid = false; - SArray** pWhite = taosHashGet(pWhiteList, user, strlen(user)); - if (pWhite == NULL || *pWhite == NULL) { + SHashObj* pWhiteList = pWhite->pList; + bool valid = false; + SArray** ppArr = taosHashGet(pWhiteList, user, strlen(user)); + if (ppArr == NULL || *ppArr == NULL) { return true; } - // char userIp[64] = {0}; - // tinet_ntoa(userIp, ip); - - for (int i = 0; i < taosArrayGetSize(*pWhite); i++) { - char* range = taosArrayGetP(*pWhite, i); + for (int i = 0; i < taosArrayGetSize(*ppArr); i++) { + char* range = taosArrayGetP(*ppArr, i); if (uvCheckIp(range, ip)) { valid = true; break; @@ -308,6 +336,16 @@ static bool uvWhiteListFilte(SHashObj* pWhiteList, char* user, uint32_t ip) { } return valid; } +bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) { + if (pWhite->ver == pConn->whiteListVer) { + return true; + } + return uvWhiteListFilte(pWhite, pConn->user, pConn->clientIp); +} +void uvWhiteListSetConnVer(SWhiteList* pWhite, SSvrConn* pConn) { + // if conn already check by current whiteLis + pConn->whiteListVer = pWhite->ver; +} static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; @@ -320,9 +358,6 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); return false; } - if (uvWhiteListFilte(pThrd->pWhiteList, pHead->user, pConn->clientIp) == false) { - return false; - } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); @@ -333,6 +368,12 @@ static bool uvHandleReq(SSvrConn* pConn) { memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false) { + return false; + } else { + uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); + } + if (uvRecvReleaseReq(pConn, pHead)) { return true; } @@ -1199,7 +1240,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; - thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + thrd->pWhiteList = uvWhiteListCreate(); srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); @@ -1226,7 +1267,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; - thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + thrd->pWhiteList = uvWhiteListCreate(); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pThreadObj[i] = thrd; @@ -1342,16 +1383,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transAsyncPoolDestroy(pThrd->asyncPool); - void* pIter = taosHashIterate(pThrd->pWhiteList, NULL); - while (pIter) { - SArray* arr = *(SArray**)pIter; - for (int i = 0; i < taosArrayGetSize(arr); i++) { - char* p = taosArrayGetP(arr, i); - taosMemoryFree(p); - } - pIter = taosHashIterate(pThrd->pWhiteList, pIter); - } - taosHashCleanup(pThrd->pWhiteList); + uvWhiteListDestroy(pThrd->pWhiteList); taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop);