diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a53830723c..687a21d657 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -18,6 +18,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; +typedef bool (*FilteFunc)(void* arg); typedef struct { int notifyCount; // int init; // init or not @@ -62,6 +63,10 @@ typedef struct SSvrMsg { STransMsg msg; queue q; STransMsgType type; + + void* arg; + FilteFunc func; + } SSvrMsg; typedef struct SWorkThrd { @@ -77,6 +82,8 @@ typedef struct SWorkThrd { queue conn; void* pTransInst; bool quit; + + SHashObj* pWhiteList; } SWorkThrd; typedef struct SServerObj { @@ -141,8 +148,9 @@ static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); +static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd); static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister, NULL}; + uvHandleRegister, uvHandleUpdate}; static void uvDestroyConn(uv_handle_t* handle); @@ -542,7 +550,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } // release handle to rpc init - if (msg->type == Quit) { + if (msg->type == Quit || msg->type == Update) { (*transAsyncHandle[msg->type])(msg, pThrd); } else { STransMsg transMsg = msg->msg; @@ -638,7 +646,7 @@ static void uvPrepareCb(uv_prepare_t* handle) { continue; } // release handle to rpc init - if (msg->type == Quit) { + if (msg->type == Quit || msg->type == Update) { (*transAsyncHandle[msg->type])(msg, pThrd); continue; } else { @@ -1066,9 +1074,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); thrd->pTransInst = shandle; thrd->quit = false; - srv->pThreadObj[i] = thrd; thrd->pTransInst = shandle; + thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read @@ -1093,6 +1102,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; + thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pThreadObj[i] = thrd; @@ -1192,6 +1202,12 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { taosMemoryFree(msg); } } +void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { + // update white ip + bool ret = (msg->func)(msg->arg); + taosMemoryFree(msg); + return; +} void destroyWorkThrd(SWorkThrd* pThrd) { if (pThrd == NULL) { return; @@ -1200,6 +1216,13 @@ void destroyWorkThrd(SWorkThrd* pThrd) { SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transAsyncPoolDestroy(pThrd->asyncPool); + + void* pIter = taosHashIterate(pThrd->pWhiteList, NULL); + while (pIter) { + pIter = taosHashIterate(pThrd->pWhiteList, pIter); + } + taosHashCleanup(pThrd->pWhiteList); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); @@ -1367,5 +1390,18 @@ _return2: rpcFreeCont(msg->pCont); return -1; } +void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { + tInfo("update ip white list"); + SServerObj* svrObj = thandle; + for (int i = 0; i < svrObj->numOfThreads; i++) { + SWorkThrd* pThrd = svrObj->pThreadObj[i]; + + SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + msg->type = Update; + msg->arg = arg; + msg->func = *func; + transAsyncSend(pThrd->asyncPool, &msg->q); + } +} int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }