add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-02 18:01:25 +08:00
parent b8e9b00564
commit b485a61767
1 changed files with 74 additions and 42 deletions

View File

@ -56,6 +56,8 @@ typedef struct SSvrConn {
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
int64_t whiteListVer;
} SSvrConn;
typedef struct SSvrMsg {
@ -69,6 +71,10 @@ typedef struct SSvrMsg {
} SSvrMsg;
typedef struct {
SHashObj* pList;
int64_t ver;
} SWhiteList;
typedef struct SWorkThrd {
TdThread thread;
uv_connect_t connect_req;
@ -83,7 +89,8 @@ typedef struct SWorkThrd {
void* pTransInst;
bool quit;
SHashObj* pWhiteList;
SWhiteList* pWhiteList;
int64_t whiteListVer;
} SWorkThrd;
typedef struct SServerObj {
@ -106,6 +113,14 @@ typedef struct SServerObj {
bool inited;
} SServerObj;
SWhiteList* uvWhiteListCreate();
void uvWhiteListDestroy(SWhiteList* pWhite);
void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip);
void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable);
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn);
bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip);
void uvWhiteListSetConnVer(SWhiteList* pWhite, SSvrConn* pConn);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
@ -263,44 +278,57 @@ static bool uvCheckIp(char* range, int32_t ip) {
return subnetCheckIp(&subnet, ip);
}
static void uvWhiteListDestroy(SHashObj* pWhiteList) {
void* pIter = taosHashIterate(pWhiteList, NULL);
SWhiteList* uvWhiteListCreate() {
SWhiteList* pWhiteList = taosMemoryCalloc(1, sizeof(SWhiteList));
pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
pWhiteList->ver = 0;
return pWhiteList;
}
void uvWhiteListDestroy(SWhiteList* pWhite) {
SHashObj* pWhiteList = pWhite->pList;
void* pIter = taosHashIterate(pWhiteList, NULL);
while (pIter) {
SArray* list = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(list); i++) {
char* range = taosArrayGetP(list, i);
SArray* pArr = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(pArr); i++) {
char* range = taosArrayGetP(pArr, i);
taosMemoryFree(range);
}
taosArrayDestroy(list);
taosArrayDestroy(pArr);
pIter = taosHashIterate(pWhiteList, pIter);
}
taosHashCleanup(pWhiteList);
}
static bool uvWhiteListAdd(SHashObj* pWhiteList, char* user, char* ip) {
SArray** pWhite = taosHashGet(pWhiteList, user, strlen(user));
if (pWhite == NULL || *pWhite == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*));
taosArrayPush(list, &ip);
taosHashPut(pWhiteList, user, strlen(user), &list, sizeof(void*));
} else {
taosArrayPush(*pWhite, &ip);
}
return true;
taosMemoryFree(pWhite);
}
static bool uvWhiteListFilte(SHashObj* pWhiteList, char* user, uint32_t ip) {
void uvWhiteListAdd(SWhiteList* pWhite, char* user, char* ip) {
SHashObj* pWhiteList = pWhite->pList;
SArray** ppArr = taosHashGet(pWhiteList, user, strlen(user));
if (ppArr == NULL || *ppArr == NULL) {
SArray* pArr = taosArrayInit(8, sizeof(void*));
taosArrayPush(pArr, &ip);
taosHashPut(pWhiteList, user, strlen(user), &pArr, sizeof(void*));
} else {
taosArrayPush(*ppArr, &ip);
}
}
void uvWhiteListUpdate(SWhiteList* pWhite, SHashObj* pTable) {
// impl later
}
bool uvWhiteListFilte(SWhiteList* pWhite, char* user, uint32_t ip) {
// impl check
bool valid = false;
SArray** pWhite = taosHashGet(pWhiteList, user, strlen(user));
if (pWhite == NULL || *pWhite == NULL) {
SHashObj* pWhiteList = pWhite->pList;
bool valid = false;
SArray** ppArr = taosHashGet(pWhiteList, user, strlen(user));
if (ppArr == NULL || *ppArr == NULL) {
return true;
}
// char userIp[64] = {0};
// tinet_ntoa(userIp, ip);
for (int i = 0; i < taosArrayGetSize(*pWhite); i++) {
char* range = taosArrayGetP(*pWhite, i);
for (int i = 0; i < taosArrayGetSize(*ppArr); i++) {
char* range = taosArrayGetP(*ppArr, i);
if (uvCheckIp(range, ip)) {
valid = true;
break;
@ -308,6 +336,16 @@ static bool uvWhiteListFilte(SHashObj* pWhiteList, char* user, uint32_t ip) {
}
return valid;
}
bool uvWhiteListCheckConn(SWhiteList* pWhite, SSvrConn* pConn) {
if (pWhite->ver == pConn->whiteListVer) {
return true;
}
return uvWhiteListFilte(pWhite, pConn->user, pConn->clientIp);
}
void uvWhiteListSetConnVer(SWhiteList* pWhite, SSvrConn* pConn) {
// if conn already check by current whiteLis
pConn->whiteListVer = pWhite->ver;
}
static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst;
@ -320,9 +358,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
if (uvWhiteListFilte(pThrd->pWhiteList, pHead->user, pConn->clientIp) == false) {
return false;
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
@ -333,6 +368,12 @@ static bool uvHandleReq(SSvrConn* pConn) {
memcpy(pConn->user, pHead->user, strlen(pHead->user));
if (uvWhiteListCheckConn(pThrd->pWhiteList, pConn) == false) {
return false;
} else {
uvWhiteListSetConnVer(pThrd->pWhiteList, pConn);
}
if (uvRecvReleaseReq(pConn, pHead)) {
return true;
}
@ -1199,7 +1240,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
thrd->pWhiteList = uvWhiteListCreate();
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
@ -1226,7 +1267,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
thrd->pWhiteList = uvWhiteListCreate();
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd;
@ -1342,16 +1383,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool);
void* pIter = taosHashIterate(pThrd->pWhiteList, NULL);
while (pIter) {
SArray* arr = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(arr); i++) {
char* p = taosArrayGetP(arr, i);
taosMemoryFree(p);
}
pIter = taosHashIterate(pThrd->pWhiteList, pIter);
}
taosHashCleanup(pThrd->pWhiteList);
uvWhiteListDestroy(pThrd->pWhiteList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);