From 30f602fae0f406f76950ecea5ec2ec5ad7b90bc8 Mon Sep 17 00:00:00 2001 From: ubuntu Date: Sat, 12 Mar 2022 13:59:21 +0800 Subject: [PATCH] update UT test --- source/libs/transport/src/transCli.c | 197 ++++++++++++++------------- 1 file changed, 99 insertions(+), 98 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 834337d0ff..727845b7a9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -63,12 +63,12 @@ typedef struct SCliThrdObj { bool quit; } SCliThrdObj; -typedef struct SClientObj { +typedef struct SCliObj { char label[TSDB_LABEL_LEN]; int32_t index; int numOfThreads; SCliThrdObj** pThreadObj; -} SClientObj; +} SCliObj; typedef struct SConnList { queue conn; @@ -82,32 +82,32 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); // register timer in each thread to clear expire conn -static void clientTimeoutCb(uv_timer_t* handle); -// alloc buf for read -static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -// callback after read nbytes from socket -static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void cliTimeoutCb(uv_timer_t* handle); +// alloc buf for recv +static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +// callback after read nbytes from socket +static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after write data to socket -static void clientSendDataCb(uv_write_t* req, int status); +static void cliSendCb(uv_write_t* req, int status); // callback after conn to server -static void clientConnCb(uv_connect_t* req, int status); -static void clientAsyncCb(uv_async_t* handle); +static void cliConnCb(uv_connect_t* req, int status); +static void cliAsyncCb(uv_async_t* handle); -static SCliConn* clientConnCreate(SCliThrdObj* thrd); -static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void clientDestroy(uv_handle_t* handle); +static SCliConn* cliCreateConn(SCliThrdObj* thrd); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); // process data read from server, add decompress etc later -static void clientHandleResp(SCliConn* conn); +static void cliHandleResp(SCliConn* conn); // handle except about conn -static void clientHandleExcept(SCliConn* conn); +static void cliHandleExcept(SCliConn* conn); // handle req from app -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientSendQuit(SCliThrdObj* thrd); +static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(SRpcMsg* userdata); -static int clientRBChoseIdx(SRpcInfo* pTransInst); +static int cliRBChoseIdx(SRpcInfo* pTransInst); static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); @@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ - clientHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0) @@ -130,15 +130,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_BROKEN(conn) \ do { \ if (conn->broken) { \ - clientHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0); -static void* clientThread(void* arg); +static void* cliWorkThread(void* arg); -static void* clientNotifyApp() {} -static void clientHandleResp(SCliConn* conn) { +static void* cliNotifyApp() {} +static void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -164,25 +164,25 @@ static void clientHandleResp(SCliConn* conn) { transRefCliHandle(conn); conn->persist = 1; - tDebug("client conn %p persist by app", conn); + tDebug("cli conn %p persist by app", conn); } - tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pTransInst->label, conn); + tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); + tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } - uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb); + uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); // user owns conn->persist = 1 if (conn->persist == 0) { @@ -193,10 +193,10 @@ static void clientHandleResp(SCliConn* conn) { // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } -static void clientHandleExcept(SCliConn* pConn) { +static void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { // handle conn except in conn pool transUnrefCliHandle(pConn); @@ -214,25 +214,25 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.msgType = pMsg->msg.msgType + 1; if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pTransInst->label, pConn); + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); + tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); tsem_post(pCtx->pSem); } destroyCmsg(pConn->data); pConn->data = NULL; - tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); transUnrefCliHandle(pConn); } -static void clientTimeoutCb(uv_timer_t* handle) { +static void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); + tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -250,7 +250,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } static void* createConnPool(int size) { // thread local, no lock @@ -263,7 +263,7 @@ static void* destroyConnPool(void* pool) { queue* h = QUEUE_HEAD(&connList->conn); QUEUE_REMOVE(h); SCliConn* c = QUEUE_DATA(h, SCliConn, conn); - clientConnDestroy(c, true); + cliDestroyConn(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); } @@ -299,7 +299,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); + tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -309,12 +309,12 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); } -static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); } -static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { +static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later if (handle->data == NULL) { return; @@ -324,10 +324,10 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (transReadComplete(pBuf)) { - tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - clientHandleResp(conn); + tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn); + cliHandleResp(conn); } else { - tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } @@ -340,13 +340,13 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0) { - tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); + tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); conn->broken = true; - clientHandleExcept(conn); + cliHandleExcept(conn); } } -static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { +static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { SCliConn* conn = calloc(1, sizeof(SCliConn)); // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); @@ -362,40 +362,40 @@ static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { transRefCliHandle(conn); return conn; } -static void clientConnDestroy(SCliConn* conn, bool clear) { - tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); +static void cliDestroyConn(SCliConn* conn, bool clear) { + tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); if (clear) { - uv_close((uv_handle_t*)conn->stream, clientDestroy); + uv_close((uv_handle_t*)conn->stream, cliDestroy); } } -static void clientDestroy(uv_handle_t* handle) { +static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->stream); - tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } -static void clientSendDataCb(uv_write_t* req, int status) { +static void cliSendCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { return; } destroyUserdata(&pMsg->msg); } else { - tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); - clientHandleExcept(pConn); + tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); + cliHandleExcept(pConn); return; } - uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb); + uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); } -static void clientSendData(SCliConn* pConn) { +static void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); SCliMsg* pCliMsg = pConn->data; @@ -432,22 +432,22 @@ static void clientSendData(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb); + uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; _RETURE: return; } -static void clientConnCb(uv_connect_t* req, int status) { +static void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { - tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); - clientHandleExcept(pConn); + tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); + cliHandleExcept(pConn); return; } int addrlen = sizeof(pConn->addr); @@ -456,14 +456,14 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); + tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); - clientSendData(pConn); + cliSend(pConn); } -static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { - tDebug("client work thread %p start to quit", pThrd); +static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { + tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); @@ -472,57 +472,57 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } -static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); transUnrefCliHandle(conn); if (conn != NULL) { - tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); + tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); } } else { STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } return conn; } -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { +static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pTransInst = pThrd->pTransInst; - SCliConn* conn = clientGetConn(pMsg, pThrd); + SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->data = pMsg; transDestroyBuffer(&conn->readBuf); - clientSendData(conn); + cliSend(conn); } else { - conn = clientConnCreate(pThrd); + conn = cliCreateConn(pThrd); conn->data = pMsg; int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { - tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); } struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect - tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); - uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); + uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } conn->hThrdIdx = pCtx->hThrdIdx; } -static void clientAsyncCb(uv_async_t* handle) { +static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SCliThrdObj* pThrd = item->pThrd; SCliMsg* pMsg = NULL; - queue wq; // batch process to avoid to lock/unlock frequently + queue wq; pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); @@ -534,25 +534,25 @@ static void clientAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); if (pMsg->ctx == NULL) { - clientHandleQuit(pMsg, pThrd); + cliHandleQuit(pMsg, pThrd); } else { - clientHandleReq(pMsg, pThrd); + cliHandleReq(pMsg, pThrd); } count++; } if (count >= 2) { - tTrace("client process batch size: %d", count); + tTrace("cli process batch size: %d", count); } } -static void* clientThread(void* arg) { +static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - setThreadName("trans-client-work"); + setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); } void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SClientObj* cli = calloc(1, sizeof(SClientObj)); + SCliObj* cli = calloc(1, sizeof(SCliObj)); SRpcInfo* pRpc = shandle; memcpy(cli->label, label, strlen(label)); @@ -564,9 +564,9 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->pTransInst = shandle; - int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); + int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err == 0) { - tDebug("success to create tranport-client thread %d", i); + tDebug("success to create tranport-cli thread %d", i); } cli->pThreadObj[i] = pThrd; } @@ -591,13 +591,14 @@ static void destroyCmsg(SCliMsg* pMsg) { static SCliThrdObj* createThrdObj() { SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; @@ -628,21 +629,21 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // -static void clientSendQuit(SCliThrdObj* thrd) { +static void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); transSendAsync(thrd->asyncPool, &msg->q); } void taosCloseClient(void* arg) { - SClientObj* cli = arg; + SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - clientSendQuit(cli->pThreadObj[i]); + cliSendQuit(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); free(cli); } -static int clientRBChoseIdx(SRpcInfo* pTransInst) { +static int cliRBChoseIdx(SRpcInfo* pTransInst) { int64_t index = pTransInst->index; if (pTransInst->index++ >= pTransInst->numOfThreads) { pTransInst->index = 0; @@ -662,7 +663,7 @@ void transUnrefCliHandle(void* handle) { } int ref = T_REF_DEC((SCliConn*)handle); if (ref == 0) { - clientConnDestroy((SCliConn*)handle, true); + cliDestroyConn((SCliConn*)handle, true); } // unref cli handle @@ -676,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { - index = clientRBChoseIdx(pTransInst); + index = cliRBChoseIdx(pTransInst); } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { @@ -697,7 +698,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } @@ -709,7 +710,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { - index = clientRBChoseIdx(pTransInst); + index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); @@ -727,7 +728,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem);