enh: rpc set default epset
This commit is contained in:
parent
e2d7df7a0f
commit
cca2bcdb28
|
@ -125,6 +125,7 @@ void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
|
||||||
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||||
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
|
void rpcSetDefaultEpSet(void *thandle, const SEpSet *dst);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,8 +95,8 @@ typedef void* queue[2];
|
||||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
||||||
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
||||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
|
@ -155,7 +155,7 @@ typedef struct {
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
typedef enum { Normal, Quit, Release, Register } STransMsgType;
|
typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
|
||||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
|
@ -231,6 +231,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransM
|
||||||
void transSendResponse(const STransMsg* msg);
|
void transSendResponse(const STransMsg* msg);
|
||||||
void transRegisterMsg(const STransMsg* msg);
|
void transRegisterMsg(const STransMsg* msg);
|
||||||
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo);
|
||||||
|
void transSetDefaultEpSet(void* shandle, const SEpSet* dst);
|
||||||
|
|
||||||
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
|
@ -148,6 +148,10 @@ void rpcReleaseHandle(void* handle, int8_t type) {
|
||||||
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
(*transReleaseHandle[type])(handle);
|
(*transReleaseHandle[type])(handle);
|
||||||
}
|
}
|
||||||
|
void rpcSetDefaultEpSet(void* thandle, const SEpSet* dst) {
|
||||||
|
// later
|
||||||
|
transSetDefaultEpSet(thandle, dst);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rpcInit() {
|
int32_t rpcInit() {
|
||||||
// impl later
|
// impl later
|
||||||
|
|
|
@ -63,7 +63,11 @@ typedef struct SCliThrdObj {
|
||||||
SDelayQueue* delayQueue;
|
SDelayQueue* delayQueue;
|
||||||
uint64_t nextTimeout; // next timeout
|
uint64_t nextTimeout; // next timeout
|
||||||
void* pTransInst; //
|
void* pTransInst; //
|
||||||
bool quit;
|
|
||||||
|
bool useDefaultEpSet;
|
||||||
|
SEpSet defaultEpSet;
|
||||||
|
|
||||||
|
bool quit;
|
||||||
} SCliThrdObj;
|
} SCliThrdObj;
|
||||||
|
|
||||||
typedef struct SCliObj {
|
typedef struct SCliObj {
|
||||||
|
@ -116,7 +120,9 @@ static void cliHandleExcept(SCliConn* conn);
|
||||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease};
|
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||||
|
cliHandleUpdate};
|
||||||
|
|
||||||
static void cliSendQuit(SCliThrdObj* thrd);
|
static void cliSendQuit(SCliThrdObj* thrd);
|
||||||
static void destroyUserdata(STransMsg* userdata);
|
static void destroyUserdata(STransMsg* userdata);
|
||||||
|
@ -683,6 +689,15 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
transUnrefCliHandle(conn);
|
transUnrefCliHandle(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
|
pThrd->useDefaultEpSet = true;
|
||||||
|
pThrd->defaultEpSet = pCtx->epSet;
|
||||||
|
|
||||||
|
tsem_post(pCtx->pSem);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
SCliConn* conn = NULL;
|
SCliConn* conn = NULL;
|
||||||
|
@ -712,6 +727,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
|
if (pThrd->useDefaultEpSet) {
|
||||||
|
pCtx->epSet = pThrd->defaultEpSet;
|
||||||
|
}
|
||||||
|
|
||||||
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
@ -1067,4 +1086,28 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
||||||
taosMemoryFree(pSem);
|
taosMemoryFree(pSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transSetDefaultEpSet(void* ahandle, const SEpSet* dst) {
|
||||||
|
STrans* pTransInst = ahandle;
|
||||||
|
for (int i = 0; i < pTransInst->numOfThreads; i++) {
|
||||||
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
|
pCtx->hThrdIdx = i;
|
||||||
|
pCtx->epSet = *dst;
|
||||||
|
pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||||
|
tsem_init(pCtx->pSem, 0, 0);
|
||||||
|
|
||||||
|
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
|
cliMsg->ctx = pCtx;
|
||||||
|
cliMsg->type = Update;
|
||||||
|
|
||||||
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
|
||||||
|
tDebug("send update epset at thread:%d, threadID:%" PRId64 "", i, thrd->thread);
|
||||||
|
|
||||||
|
tsem_t* pSem = pCtx->pSem;
|
||||||
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
|
|
||||||
|
tsem_wait(pSem);
|
||||||
|
tsem_destroy(pSem);
|
||||||
|
taosMemoryFree(pSem);
|
||||||
|
}
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue