From 3b5f921575cd386b32ad467222ae61e1ada37c3d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Jan 2022 20:29:30 +0800 Subject: [PATCH] refactor rpc --- source/libs/transport/inc/transComm.h | 7 + source/libs/transport/src/trans.c | 8 +- source/libs/transport/src/transCli.c | 78 ++++----- source/libs/transport/src/transComm.c | 47 ++++++ source/libs/transport/src/transSrv.c | 219 ++++++++++++++------------ source/libs/transport/test/rclient.c | 1 + 6 files changed, 216 insertions(+), 144 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c7a23a2482..3f2aa1170e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -202,6 +202,8 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transConnCtxDestroy(STransConnCtx* ctx); void transFreeMsg(void* msg); + +// typedef struct SConnBuffer { char* buf; int len; @@ -209,4 +211,9 @@ typedef struct SConnBuffer { int left; } SConnBuffer; +int transInitBuffer(SConnBuffer* buf); +int transClearBuffer(SConnBuffer* buf); +int transDestroyBuffer(SConnBuffer* buf); +int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); + #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index bc9a9de318..1a3e70e6e0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -55,7 +55,13 @@ void* rpcMallocCont(int contLen) { } return start + sizeof(STransMsgHead); } -void rpcFreeCont(void* cont) { return; } +void rpcFreeCont(void* cont) { + // impl + if (cont == NULL) { + return; + } + free((char*)cont - TRANS_MSG_OVERHEAD); +} void* rpcReallocCont(void* ptr, int contLen) { return NULL; } void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f265acf8c1..95a82cec68 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -112,16 +112,20 @@ static void clientHandleResp(SCliConn* conn) { SRpcMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); - rpcMsg.pCont = transContFromHead(pHead); + rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; + tDebug("conn %p handle resp", conn); (pRpc->cfp)(NULL, &rpcMsg, NULL); conn->notifyCount += 1; SCliThrdObj* pThrd = conn->hostThrd; tfree(conn->data); + // buf alread translated to rpcMsg.pCont + transClearBuffer(&conn->readBuf); + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); // start thread's timer of conn pool if not active @@ -131,6 +135,11 @@ static void clientHandleResp(SCliConn* conn) { destroyTransConnCtx(pCtx); } static void clientHandleExcept(SCliConn* pConn) { + if (pConn->data == NULL) { + clientConnDestroy(pConn, true); + return; + } + tDebug("conn %p destroy", pConn); SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -213,12 +222,17 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { } queue* h = QUEUE_HEAD(&plist->conn); QUEUE_REMOVE(h); - return QUEUE_DATA(h, SCliConn, conn); + + SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); + QUEUE_INIT(&conn->conn); + return conn; } static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { char key[128] = {0}; + tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); @@ -245,32 +259,9 @@ static bool clientReadComplete(SConnBuffer* data) { } } static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - // impl later - static const int CAPACITY = 512; - SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char)); - pBuf->len = 0; - pBuf->cap = CAPACITY; - pBuf->left = -1; - - buf->base = pBuf->buf; - buf->len = CAPACITY; - } else { - if (pBuf->len >= pBuf->cap) { - if (pBuf->left == -1) { - pBuf->cap *= 2; - pBuf->buf = realloc(pBuf->buf, pBuf->cap); - } else if (pBuf->len + pBuf->left > pBuf->cap) { - pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->cap); - } - } - buf->base = pBuf->buf + pBuf->len; - buf->len = pBuf->cap - pBuf->len; - } + transAllocBuffer(pBuf, buf); } static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later @@ -288,10 +279,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } assert(nread <= 0); if (nread == 0) { - tError("conn %p closed", conn); return; } - if (nread < 0) { + if (nread < 0 || nread == UV_EOF) { tError("conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } @@ -300,28 +290,28 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } static void clientConnDestroy(SCliConn* conn, bool clear) { - tDebug("conn %p destroy", conn); + // + tDebug("conn %p remove from conn pool", conn); + QUEUE_REMOVE(&conn->conn); + tDebug("conn %p remove from conn pool successfully", conn); if (clear) { - uv_close((uv_handle_t*)conn->stream, NULL); + uv_close((uv_handle_t*)conn->stream, clientDestroy); } - free(conn->stream); - free(conn->readBuf.buf); - free(conn->writeReq); - free(conn); } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - // QUEUE_REMOVE(&conn->conn); - clientConnDestroy(conn, false); + transDestroyBuffer(&conn->readBuf); + + free(conn->stream); + free(conn->writeReq); + tDebug("conn %p destroy successfully", conn); + free(conn); + + // clientConnDestroy(conn, false); } static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; - - SCliMsg* pMsg = pConn->data; - transFreeMsg((pMsg->msg.pCont)); - pMsg->msg.pCont = NULL; - if (status == 0) { tDebug("conn %p data already was written out", pConn); } else { @@ -381,10 +371,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("conn %p get from conn pool", conn); conn->data = pMsg; conn->writeReq->data = conn; - - conn->readBuf.len = 0; - memset(conn->readBuf.buf, 0, conn->readBuf.cap); - conn->readBuf.left = -1; + transDestroyBuffer(&conn->readBuf); clientWrite(conn); } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); @@ -397,6 +384,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { // write req handle conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; + QUEUE_INIT(&conn->conn); conn->connReq.data = conn; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5bece11bec..ca39f85eb3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -198,4 +198,51 @@ void transFreeMsg(void* msg) { } free((char*)msg - sizeof(STransMsgHead)); } + +int transInitBuffer(SConnBuffer* buf) { + transClearBuffer(buf); + return 0; +} +int transClearBuffer(SConnBuffer* buf) { + memset(buf, 0, sizeof(*buf)); + return 0; +} +int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { + /* + * formate of data buffer: + * |<--------------------------data from socket------------------------------->| + * |<------STransMsgHead------->|<-------------------other data--------------->| + */ + static const int CAPACITY = 1024; + + SConnBuffer* p = connBuf; + if (p->cap == 0) { + p->buf = (char*)calloc(CAPACITY, sizeof(char)); + p->len = 0; + p->cap = CAPACITY; + p->left = -1; + + uvBuf->base = p->buf; + uvBuf->len = CAPACITY; + } else { + if (p->len >= p->cap) { + if (p->left == -1) { + p->cap *= 2; + p->buf = realloc(p->buf, p->cap); + } else if (p->len + p->left > p->cap) { + p->cap = p->len + p->left; + p->buf = realloc(p->buf, p->len + p->left); + } + } + uvBuf->base = p->buf + p->len; + uvBuf->len = p->cap - p->len; + } + return 0; +} +int transDestroyBuffer(SConnBuffer* buf) { + if (buf->cap > 0) { + tfree(buf->buf); + } + transClearBuffer(buf); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c70b1a5b28..8aa3995651 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -17,7 +17,7 @@ #include "transComm.h" -typedef struct SConn { +typedef struct SSrvConn { uv_tcp_t* pTcp; uv_write_t* pWriter; uv_timer_t* pTimer; @@ -26,13 +26,14 @@ typedef struct SConn { queue queue; int ref; int persist; // persist connection or not - SConnBuffer connBuf; // read buf, + SConnBuffer readBuf; // read buf, int inType; void* pTransInst; // rpc init void* ahandle; // void* hostThrd; + void* pSrvMsg; - SRpcMsg sendMsg; + // SRpcMsg sendMsg; // del later char secured; int spi; @@ -40,7 +41,13 @@ typedef struct SConn { char user[TSDB_UNI_LEN]; // user ID for the link char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; // ciphering key -} SConn; +} SSrvConn; + +typedef struct SSrvMsg { + SSrvConn* pConn; + SRpcMsg msg; + queue q; +} SSrvMsg; typedef struct SWorkThrdObj { pthread_t thread; @@ -48,8 +55,8 @@ typedef struct SWorkThrdObj { int fd; uv_loop_t* loop; uv_async_t* workerAsync; // - queue conn; - pthread_mutex_t connMtx; + queue msg; + pthread_mutex_t msgMtx; void* pTransInst; } SWorkThrdObj; @@ -68,9 +75,9 @@ typedef struct SServerObj { static const char* notify = "a"; // refactor later -static int transAddAuthPart(SConn* pConn, char* msg, int msgLen); +static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); -static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); +static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -82,12 +89,13 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); -static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); - +static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); +static void uvStartSendResp(SSrvMsg* msg); +static void destroySrvMsg(SSrvConn* conn); // check whether already read complete packet -static bool readComplete(SConnBuffer* buf); -static SConn* createConn(); -static void destroyConn(SConn* conn, bool clear /*clear handle or not*/); +static bool readComplete(SConnBuffer* buf); +static SSrvConn* createConn(); +static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void uvDestroyConn(uv_handle_t* handle); @@ -105,31 +113,9 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b * |<--------------------------data from socket------------------------------->| * |<------STransMsgHead------->|<-------------------other data--------------->| */ - static const int CAPACITY = 1024; - - SConn* conn = handle->data; - SConnBuffer* pBuf = &conn->connBuf; - if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); - pBuf->len = 0; - pBuf->cap = CAPACITY; - pBuf->left = -1; - - buf->base = pBuf->buf; - buf->len = CAPACITY; - } else { - if (pBuf->len >= pBuf->cap) { - if (pBuf->left == -1) { - pBuf->cap *= 2; - pBuf->buf = realloc(pBuf->buf, pBuf->cap); - } else if (pBuf->len + pBuf->left > pBuf->cap) { - pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); - } - } - buf->base = pBuf->buf + pBuf->len; - buf->len = pBuf->cap - pBuf->len; - } + SSrvConn* conn = handle->data; + SConnBuffer* pBuf = &conn->readBuf; + transAllocBuffer(pBuf, buf); } // check data read from socket completely or not @@ -159,7 +145,7 @@ static bool readComplete(SConnBuffer* data) { // // impl later // STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; // SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; -// SConn* pConn = pRecv->thandle; +// SSrvConn* pConn = pRecv->thandle; // tDump(pRecv->msg, pRecv->msgLen); // terrno = 0; // // SRpcReqContext* pContest; @@ -167,7 +153,7 @@ static bool readComplete(SConnBuffer* data) { // // do auth and check //} -static int uvAuthMsg(SConn* pConn, char* msg, int len) { +static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { STransMsgHead* pHead = (STransMsgHead*)msg; int code = 0; @@ -222,14 +208,14 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) { // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { - SConn* conn = handle->data; + SSrvConn* conn = handle->data; tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SConn* pConn) { +static void uvHandleReq(SSrvConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; - SConnBuffer* pBuf = &pConn->connBuf; + SConnBuffer* pBuf = &pConn->readBuf; p->msg = pBuf->buf; p->msgLen = pBuf->len; p->ip = 0; @@ -255,7 +241,6 @@ static void uvHandleReq(SConn* pConn) { pHead->code = htonl(pHead->code); int32_t dlen = 0; - SRpcMsg rpcMsg; if (transDecompressMsg(NULL, 0, NULL)) { // add compress later // pHead = rpcDecompressRpcMsg(pHead); @@ -264,6 +249,8 @@ static void uvHandleReq(SConn* pConn) { // impl later // } + + SRpcMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; @@ -271,6 +258,7 @@ static void uvHandleReq(SConn* pConn) { rpcMsg.ahandle = NULL; rpcMsg.handle = pConn; + transClearBuffer(&pConn->readBuf); pConn->ref++; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); @@ -280,8 +268,8 @@ static void uvHandleReq(SConn* pConn) { void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SConn* conn = cli->data; - SConnBuffer* pBuf = &conn->connBuf; + SSrvConn* conn = cli->data; + SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); @@ -294,11 +282,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } if (nread == 0) { - tDebug("conn %p except read", conn); - // destroyConn(conn, true); return; } - if (nread != UV_EOF) { + if (nread < 0 || nread != UV_EOF) { + if (conn->ref > 1) { + conn->ref++; // ref > 1 signed that write is in progress + } tDebug("conn %p read error: %s", conn, uv_err_name(nread)); destroyConn(conn, true); } @@ -310,25 +299,20 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b void uvOnTimeoutCb(uv_timer_t* handle) { // opt - SConn* pConn = handle->data; + SSrvConn* pConn = handle->data; tDebug("conn %p time out", pConn); } void uvOnWriteCb(uv_write_t* req, int status) { - SConn* conn = req->data; - - SConnBuffer* buf = &conn->connBuf; - buf->len = 0; - memset(buf->buf, 0, buf->cap); - buf->left = -1; - - SRpcMsg* pMsg = &conn->sendMsg; - transFreeMsg(pMsg->pCont); + SSrvConn* conn = req->data; + SSrvMsg* smsg = conn->pSrvMsg; + transClearBuffer(&conn->readBuf); if (status == 0) { tDebug("conn %p data already was written on stream", conn); } else { tDebug("conn %p failed to write data, %s", conn, uv_err_name(status)); + // destroyConn(conn, true); } // opt @@ -341,16 +325,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } } -static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { +static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { // impl later; - tDebug("conn %p prepare to send resp", conn); - SRpcMsg* pMsg = &conn->sendMsg; + tDebug("conn %p prepare to send resp", smsg->pConn); + SRpcMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - pHead->msgType = conn->inType + 1; + pHead->msgType = smsg->pConn->inType + 1; // add more info char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); @@ -361,28 +345,55 @@ static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { wb->base = msg; wb->len = len; } +static void uvStartSendResp(SSrvMsg* smsg) { + // impl + uv_buf_t wb; + uvPrepareSendData(smsg, &wb); + + SSrvConn* pConn = smsg->pConn; + uv_timer_stop(pConn->pTimer); + + pConn->pSrvMsg = smsg; + // conn->pWriter->data = smsg; + uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); + + // SRpcMsg* rpcMsg = smsg->msg; + + return; +} +static void destroySrvMsg(SSrvConn* conn) { + SSrvMsg* smsg = conn->pSrvMsg; + if (smsg == NULL) { + return; + } + transFreeMsg(smsg->msg.pCont); + free(conn->pSrvMsg); + conn->pSrvMsg = NULL; +} void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = handle->data; - SConn* conn = NULL; + SSrvConn* conn = NULL; queue wq; // batch process to avoid to lock/unlock frequently - pthread_mutex_lock(&pThrd->connMtx); - QUEUE_MOVE(&pThrd->conn, &wq); - pthread_mutex_unlock(&pThrd->connMtx); + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_MOVE(&pThrd->msg, &wq); + pthread_mutex_unlock(&pThrd->msgMtx); while (!QUEUE_IS_EMPTY(&wq)) { queue* head = QUEUE_HEAD(&wq); QUEUE_REMOVE(head); - SConn* conn = QUEUE_DATA(head, SConn, queue); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; - } - uv_buf_t wb; - uvPrepareSendData(conn, &wb); - uv_timer_stop(conn->pTimer); - uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); + SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q); + if (msg == NULL) { + tError("except occurred, continue"); + continue; + } + uvStartSendResp(msg); + // uv_buf_t wb; + // uvPrepareSendData(msg, &wb); + // uv_timer_stop(conn->pTimer); + + // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } @@ -435,7 +446,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SConn* pConn = createConn(); + SSrvConn* pConn = createConn(); pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ @@ -484,8 +495,8 @@ static bool addHandleToWorkloop(void* arg) { pThrd->pipe->data = pThrd; - QUEUE_INIT(&pThrd->conn); - pthread_mutex_init(&pThrd->connMtx, NULL); + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); pThrd->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); @@ -523,34 +534,42 @@ void* workerThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -static SConn* createConn() { - SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); +static SSrvConn* createConn() { + SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); + tDebug("conn %p created", pConn); ++pConn->ref; return pConn; } -static void destroyConn(SConn* conn, bool clear) { +static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - if (--conn->ref == 0) { + // SRpcMsg* pMsg = &conn->sendMsg; + // transFreeMsg(pMsg->pCont); + // pMsg->pCont = NULL; + + tDebug("conn %p try to destroy", conn); + if (--conn->ref > 0) { return; } + transDestroyBuffer(&conn->readBuf); + destroySrvMsg(conn); + if (clear) { - uv_close((uv_handle_t*)conn->pTcp, NULL); + uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } +} +static void uvDestroyConn(uv_handle_t* handle) { + SSrvConn* conn = handle->data; + tDebug("conn %p destroy", conn); uv_timer_stop(conn->pTimer); free(conn->pTimer); - free(conn->pTcp); - free(conn->connBuf.buf); + // free(conn->pTcp); free(conn->pWriter); free(conn); } -static void uvDestroyConn(uv_handle_t* handle) { - SConn* conn = handle->data; - destroyConn(conn, false); -} -static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) { +static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { STransMsgHead* pHead = (STransMsgHead*)msg; if (pConn->spi && pConn->secured == 0) { @@ -632,6 +651,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); // free(srv->pipe[i]); free(pThrd->loop); + pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd); } void taosCloseServer(void* arg) { @@ -648,17 +668,20 @@ void taosCloseServer(void* arg) { } void rpcSendResponse(const SRpcMsg* pMsg) { - SConn* pConn = pMsg->handle; + SSrvConn* pConn = pMsg->handle; SWorkThrdObj* pThrd = pConn->hostThrd; - // opt later - pConn->sendMsg = *pMsg; - pthread_mutex_lock(&pThrd->connMtx); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - pthread_mutex_unlock(&pThrd->connMtx); + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + srvMsg->pConn = pConn; + srvMsg->msg = *pMsg; + + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + pthread_mutex_unlock(&pThrd->msgMtx); + tDebug("conn %p start to send resp", pConn); - uv_async_send(pConn->pWorkerAsync); + uv_async_send(pThrd->workerAsync); } #endif diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 84814f39fc..f9ad20c065 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -64,6 +64,7 @@ static void *sendRequest(void *param) { // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); tDebug("recv response succefully"); + // usleep(100000000); }