diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d47e9d8b74..638067e7ef 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,7 +27,6 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; queue q; - uint64_t expireTime; STransCtx ctx; bool broken; // link broken or not @@ -97,7 +96,7 @@ static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); // register timer in each thread to clear expire conn -static void cliTimeoutCb(uv_timer_t* handle); +// static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket @@ -187,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : 10 * (para)) +#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -387,10 +386,6 @@ void cliHandleResp(SCliConn* conn) { } uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); - // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); - } } void cliHandleExcept(SCliConn* pConn) { @@ -444,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) { transUnrefCliHandle(pConn); } -void cliTimeoutCb(uv_timer_t* handle) { - SCliThrd* pThrd = handle->data; - STrans* pTransInst = pThrd->pTransInst; - int64_t currentTime = pThrd->nextTimeout; - tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); - - SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); - while (p != NULL) { - while (!QUEUE_IS_EMPTY(&p->conn)) { - queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, q); - if (c->expireTime < currentTime) { - QUEUE_REMOVE(h); - transUnrefCliHandle(c); - } else { - break; - } - } - p = taosHashIterate((SHashObj*)pThrd->pool, p); - } - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); -} +// void cliTimeoutCb(uv_timer_t* handle) { +// SCliThrd* pThrd = handle->data; +// STrans* pTransInst = pThrd->pTransInst; +// int64_t currentTime = pThrd->nextTimeout; +// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); +// +// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); +// while (p != NULL) { +// while (!QUEUE_IS_EMPTY(&p->conn)) { +// queue* h = QUEUE_HEAD(&p->conn); +// SCliConn* c = QUEUE_DATA(h, SCliConn, q); +// if (c->expireTime < currentTime) { +// QUEUE_REMOVE(h); +// transUnrefCliHandle(c); +// } else { +// break; +// } +// } +// p = taosHashIterate((SHashObj*)pThrd->pool, p); +// } +// +// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); +// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); +// } void* createConnPool(int size) { // thread local, no lock @@ -555,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { allocConnRef(conn, true); STrans* pTransInst = thrd->pTransInst; - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); cliReleaseUnfinishedMsg(conn); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); @@ -576,7 +570,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {