handle except

This commit is contained in:
yihaoDeng 2022-03-15 23:47:37 +08:00
parent 607a7ac025
commit 4f330fab1f
6 changed files with 126 additions and 15 deletions

View File

@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
// just release client conn to rpc instance, no close sock // just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle); void rpcReleaseHandle(void *handle, int8_t type);
void rpcRefHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type);

View File

@ -252,6 +252,9 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle); void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle); void transUnrefCliHandle(void* handle);
void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* pMsg); void transSendResponse(const STransMsg* pMsg);

View File

@ -22,6 +22,11 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThre
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
void* rpcOpen(const SRpcInit* pInit) { void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) { if (pRpc == NULL) {
@ -127,9 +132,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void rpcRefHandle(void* handle, int8_t type) { void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle); (*taosRefHandle[type])(handle);
@ -140,6 +142,11 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);
} }
void rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle);
}
int32_t rpcInit() { int32_t rpcInit() {
// impl later // impl later
return 0; return 0;

View File

@ -17,6 +17,11 @@
#include "transComm.h" #include "transComm.h"
// Normal(default): send/recv msg
// Quit: quit rpc inst
// Release: release handle to rpc inst
typedef enum { Normal, Quit, Release } SCliMsgType;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
@ -49,6 +54,7 @@ typedef struct SCliMsg {
STransMsg msg; STransMsg msg;
queue q; queue q;
uint64_t st; uint64_t st;
SCliMsgType type;
} SCliMsg; } SCliMsg;
typedef struct SCliThrdObj { typedef struct SCliThrdObj {
@ -108,6 +114,8 @@ static void cliHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void cliSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(STransMsg* userdata); static void destroyUserdata(STransMsg* userdata);
@ -121,8 +129,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
@ -344,6 +352,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
// avoid conn
QUEUE_REMOVE(&conn->conn);
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
@ -506,6 +516,21 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn);
destroyCmsg(pMsg);
conn->data = NULL;
transDestroyBuffer(&conn->readBuf);
if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
transUnrefCliHandle(conn);
addConnToPool(pThrd->pool, conn);
} else {
transUnrefCliHandle(conn);
}
}
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
@ -517,7 +542,9 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
} else { } else {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); if (conn != NULL) {
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
}
} }
return conn; return conn;
} }
@ -572,10 +599,13 @@ static void cliAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->ctx == NULL) {
cliHandleQuit(pMsg, pThrd); if (pMsg->type == Normal) {
} else {
cliHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} else if (pMsg->type == Quit) {
cliHandleQuit(pMsg, pThrd);
} else if (pMsg->type == Release) {
cliHandleRelease(pMsg, pThrd);
} }
count++; count++;
} }
@ -671,8 +701,10 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
void cliSendQuit(SCliThrdObj* thrd) { void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->type = Quit;
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
int cliRBChoseIdx(STrans* pTransInst) { int cliRBChoseIdx(STrans* pTransInst) {
int64_t index = pTransInst->index; int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) { if (pTransInst->index++ >= pTransInst->numOfThreads) {
@ -702,10 +734,25 @@ void transUnrefCliHandle(void* handle) {
return; return;
} }
int ref = T_REF_DEC((SCliConn*)handle); int ref = T_REF_DEC((SCliConn*)handle);
tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
if (ref == 0) { if (ref == 0) {
cliDestroyConn((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
} }
void transReleaseCliHandle(void* handle) {
SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle);
if (thrd == NULL) {
return;
}
STransMsg tmsg = {.handle = handle};
SCliMsg* cmsg = calloc(1, sizeof(SCliMsg));
cmsg->type = Release;
cmsg->msg = tmsg;
transSendAsync(thrd->asyncPool, &cmsg->q);
}
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)shandle;
@ -728,7 +775,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
assert(pTransInst->connType == TAOS_CONN_CLIENT); assert(pTransInst->connType == TAOS_CONN_CLIENT);
// atomic or not // atomic or not
SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->msg = *pMsg; cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
@ -753,7 +800,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
pCtx->pRsp = pRsp; pCtx->pRsp = pRsp;
tsem_init(pCtx->pSem, 0, 0); tsem_init(pCtx->pSem, 0, 0);
SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();

View File

@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_timer_stop(&conn->pTimer); uv_timer_stop(&conn->pTimer);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
free(conn->pTcp); free(conn->pTcp);
free(conn); // free(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
uv_loop_close(thrd->loop); uv_loop_close(thrd->loop);
@ -786,6 +786,11 @@ void transUnrefSrvHandle(void* handle) {
} }
// unref srv handle // unref srv handle
} }
void transReleaseSrvHandle(void* handle) {
// do nothing currently
//
}
void transSendResponse(const STransMsg* pMsg) { void transSendResponse(const STransMsg* pMsg) {
if (pMsg->handle == NULL) { if (pMsg->handle == NULL) {
return; return;

View File

@ -110,6 +110,14 @@ class Client {
SemWait(); SemWait();
*resp = this->resp; *resp = this->resp;
} }
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
if (req->handle != NULL) {
rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT);
req->handle = NULL;
}
SendAndRecv(req, resp);
}
void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {} void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {}
void SemWait() { tsem_wait(&this->sem); } void SemWait() { tsem_wait(&this->sem); }
void SemPost() { tsem_post(&this->sem); } void SemPost() { tsem_post(&this->sem); }
@ -268,6 +276,7 @@ class TransObj {
} }
void RestartSrv() { srv->Restart(); } void RestartSrv() { srv->Restart(); }
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
~TransObj() { ~TransObj() {
delete cli; delete cli;
@ -352,7 +361,47 @@ TEST_F(TransEnv, cliPersistHandle) {
EXPECT_TRUE(resp.code != 0); EXPECT_TRUE(resp.code != 0);
} }
} }
//////////////////
}
TEST_F(TransEnv, cliReleaseHandle) {
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecvNoHandle(&req, &resp);
// if (i == 5) {
// std::cout << "stop server" << std::endl;
// tr->StopSrv();
//}
// if (i >= 6) {
EXPECT_TRUE(resp.code == 0);
//}
}
//////////////////
}
TEST_F(TransEnv, cliReleaseHandleExcept) {
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecvNoHandle(&req, &resp);
if (i == 5) {
std::cout << "stop server" << std::endl;
tr->StopSrv();
}
if (i >= 6) {
EXPECT_TRUE(resp.code != 0);
}
}
////////////////// //////////////////
} }
TEST_F(TransEnv, srvContinueSend) { TEST_F(TransEnv, srvContinueSend) {
@ -367,11 +416,11 @@ TEST_F(TransEnv, srvContinueSend) {
taosMsleep(2000); taosMsleep(2000);
} }
TEST_F(TransEnv, srvPersisHandleExcept) { TEST_F(TransEnv, srvPersistHandleExcept) {
// conn breken // conn breken
// //
} }
TEST_F(TransEnv, cliPersisHandleExcept) { TEST_F(TransEnv, cliPersistHandleExcept) {
// conn breken // conn breken
} }