add debug

This commit is contained in:
yihaoDeng 2022-07-22 21:15:13 +08:00
parent 87a15d4523
commit 5bb762e59c
1 changed files with 26 additions and 33 deletions

View File

@ -27,7 +27,6 @@ typedef struct SCliConn {
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue cliMsgs; STransQueue cliMsgs;
queue q; queue q;
uint64_t expireTime;
STransCtx ctx; STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
@ -97,7 +96,7 @@ static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void cliTimeoutCb(uv_timer_t* handle); // static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv // alloc buf for recv
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
@ -187,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \ pThrd = (SCliThrd*)(exh)->pThrd; \
} \ } \
} while (0) } while (0)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : 10 * (para)) #define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
@ -387,10 +386,6 @@ void cliHandleResp(SCliConn* conn) {
} }
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
} }
void cliHandleExcept(SCliConn* pConn) { void cliHandleExcept(SCliConn* pConn) {
@ -444,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
void cliTimeoutCb(uv_timer_t* handle) { // void cliTimeoutCb(uv_timer_t* handle) {
SCliThrd* pThrd = handle->data; // SCliThrd* pThrd = handle->data;
STrans* pTransInst = pThrd->pTransInst; // STrans* pTransInst = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; // int64_t currentTime = pThrd->nextTimeout;
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); // tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); // SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { // while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) { // while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn); // queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, q); // SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) { // if (c->expireTime < currentTime) {
QUEUE_REMOVE(h); // QUEUE_REMOVE(h);
transUnrefCliHandle(c); // transUnrefCliHandle(c);
} else { // } else {
break; // break;
} // }
} // }
p = taosHashIterate((SHashObj*)pThrd->pool, p); // p = taosHashIterate((SHashObj*)pThrd->pool, p);
} // }
//
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); // pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); // uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
} // }
void* createConnPool(int size) { void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
@ -555,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true); allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
cliReleaseUnfinishedMsg(conn); cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs); transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
@ -576,7 +570,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn; arg->param1 = conn;
arg->param2 = thrd; arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
} }
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {