add rpc update interface

This commit is contained in:
yihaoDeng 2023-09-01 17:21:58 +08:00
parent 5f78c27535
commit 44dcee0ca4
1 changed files with 40 additions and 4 deletions

View File

@ -18,6 +18,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a"; static char* notify = "a";
typedef bool (*FilteFunc)(void* arg);
typedef struct { typedef struct {
int notifyCount; // int notifyCount; //
int init; // init or not int init; // init or not
@ -62,6 +63,10 @@ typedef struct SSvrMsg {
STransMsg msg; STransMsg msg;
queue q; queue q;
STransMsgType type; STransMsgType type;
void* arg;
FilteFunc func;
} SSvrMsg; } SSvrMsg;
typedef struct SWorkThrd { typedef struct SWorkThrd {
@ -77,6 +82,8 @@ typedef struct SWorkThrd {
queue conn; queue conn;
void* pTransInst; void* pTransInst;
bool quit; bool quit;
SHashObj* pWhiteList;
} SWorkThrd; } SWorkThrd;
typedef struct SServerObj { typedef struct SServerObj {
@ -141,8 +148,9 @@ static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL}; uvHandleRegister, uvHandleUpdate};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
@ -542,7 +550,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) { if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} else { } else {
STransMsg transMsg = msg->msg; STransMsg transMsg = msg->msg;
@ -638,7 +646,7 @@ static void uvPrepareCb(uv_prepare_t* handle) {
continue; continue;
} }
// release handle to rpc init // release handle to rpc init
if (msg->type == Quit) { if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
continue; continue;
} else { } else {
@ -1066,9 +1074,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->quit = false; thrd->quit = false;
srv->pThreadObj[i] = thrd;
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
srv->pThreadObj[i] = thrd;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
thrd->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
@ -1093,6 +1102,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->quit = false; thrd->quit = false;
thrd->pTransInst = shandle; thrd->pTransInst = shandle;
thrd->pWhiteList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK);
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd; srv->pThreadObj[i] = thrd;
@ -1192,6 +1202,12 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg); taosMemoryFree(msg);
} }
} }
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
// update white ip
bool ret = (msg->func)(msg->arg);
taosMemoryFree(msg);
return;
}
void destroyWorkThrd(SWorkThrd* pThrd) { void destroyWorkThrd(SWorkThrd* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
@ -1200,6 +1216,13 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
void* pIter = taosHashIterate(pThrd->pWhiteList, NULL);
while (pIter) {
pIter = taosHashIterate(pThrd->pWhiteList, pIter);
}
taosHashCleanup(pThrd->pWhiteList);
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
@ -1367,5 +1390,18 @@ _return2:
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
return -1; return -1;
} }
void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
tInfo("update ip white list");
SServerObj* svrObj = thandle;
for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Update;
msg->arg = arg;
msg->func = *func;
transAsyncSend(pThrd->asyncPool, &msg->q);
}
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }