From 916dbe798b233f792e5ae74ce6dc124ab1c1916e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 18 Mar 2022 23:38:47 +0800 Subject: [PATCH] handle except --- include/libs/transport/trpc.h | 3 +- source/libs/transport/inc/transComm.h | 5 +- source/libs/transport/src/trans.c | 5 +- source/libs/transport/src/transSrv.c | 81 +++++++++++++++++++++------ 4 files changed, 69 insertions(+), 25 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index aae0c6bd22..af5afb51c5 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -38,13 +38,12 @@ typedef struct SRpcConnInfo { typedef struct SRpcMsg { tmsg_t msgType; - tmsg_t expectMsgType; void * pCont; int contLen; int32_t code; void * handle; // rpc handle returned to app void * ahandle; // app handle set by client - int noResp; // has response or not(default 0 indicate resp); + int noResp; // has response or not(default 0, 0: resp, 1: no resp); int persistHandle; // persist handle or not } SRpcMsg; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a939bbd644..c861ed350e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -181,7 +181,7 @@ typedef struct { #pragma pack(pop) -typedef enum { Normal, Quit, Release } STransMsgType; +typedef enum { Normal, Quit, Release, Register } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) @@ -262,7 +262,8 @@ void transReleaseSrvHandle(void* handle); void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); -void transSendResponse(const STransMsg* pMsg); +void transSendResponse(const STransMsg* msg); +void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index ded53ab4ea..317f80c48d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -144,10 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { - // - rpcSendResponse(msg); -} +void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); } void rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*transReleaseHandle[type])(handle); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 6be664233b..126973f27c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -17,6 +17,12 @@ #include "transComm.h" +typedef struct { + int notifyCount; // + int init; // init or not + STransMsg msg; +} SSrvRegArg; + typedef struct SSrvConn { T_REF_DECLARE() uv_tcp_t* pTcp; @@ -33,7 +39,8 @@ typedef struct SSrvConn { void* hostThrd; SArray* srvMsgs; - bool broken; // conn broken; + SSrvRegArg regArg; + bool broken; // conn broken; ConnStatus status; struct sockaddr_in addr; @@ -117,7 +124,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/) static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease}; +static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -285,11 +294,13 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - // uvNotifyLinkBrokenToApp(conn); - - // STrans* pTransInst = conn->pTransInst; - // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { - //} + if (conn->status == ConnAcquire) { + if (conn->regArg.init) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + } transUnrefSrvHandle(conn); } } @@ -317,6 +328,17 @@ void uvOnSendCb(uv_write_t* req, int status) { if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); + } else if (msg->type == Register && conn->status == ConnAcquire) { + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + return; } destroySmsg(msg); // send second data, just use for push @@ -403,16 +425,6 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -// static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { -// STrans* pTransInst = conn->pTransInst; -// if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { -// STransMsg transMsg = {0}; -// transMsg.msgType = conn->inType; -// transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; -// // transRefSrvHandle(conn); -// (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); -// } -//} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; @@ -641,6 +653,7 @@ static SSrvConn* createConn(void* hThrd) { pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; @@ -774,6 +787,7 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { if (conn->status == ConnAcquire) { if (taosArrayGetSize(conn->srvMsgs) > 0) { taosArrayPush(conn->srvMsgs, &msg); + return; } taosArrayPush(conn->srvMsgs, &msg); uvStartSendRespInternal(msg); @@ -790,6 +804,25 @@ void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { // send msg to client uvStartSendResp(msg); } +void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { + SSrvConn* conn = msg->pConn; + if (conn->status == ConnAcquire) { + if (taosArrayGetSize(conn->srvMsgs) > 0) { + taosArrayPush(conn->srvMsgs, &msg); + return; + } + conn->regArg.notifyCount = 0; + conn->regArg.init = 1; + conn->regArg.msg = msg->msg; + + if (conn->broken) { + STrans* pTransInst = conn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); + memset(&conn->regArg, 0, sizeof(conn->regArg)); + } + free(msg); + } +} void destroyWorkThrd(SWorkThrdObj* pThrd) { if (pThrd == NULL) { return; @@ -884,6 +917,20 @@ void transSendResponse(const STransMsg* pMsg) { tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } +void transRegisterMsg(const STransMsg* msg) { + if (msg->handle == NULL) { + return; + } + SSrvConn* pConn = msg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->pConn = pConn; + srvMsg->msg = *msg; + srvMsg->type = Register; + tTrace("server conn %p start to send resp", pConn); + transSendAsync(pThrd->asyncPool, &srvMsg->q); +} int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { SSrvConn* pConn = thandle; struct sockaddr_in addr = pConn->addr;