update UT test

This commit is contained in:
ubuntu 2022-03-12 13:59:21 +08:00
parent baf0ac2d31
commit 30f602fae0
1 changed files with 99 additions and 98 deletions

View File

@ -63,12 +63,12 @@ typedef struct SCliThrdObj {
bool quit; bool quit;
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SCliObj {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int32_t index; int32_t index;
int numOfThreads; int numOfThreads;
SCliThrdObj** pThreadObj; SCliThrdObj** pThreadObj;
} SClientObj; } SCliObj;
typedef struct SConnList { typedef struct SConnList {
queue conn; queue conn;
@ -82,32 +82,32 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn // register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle); static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for read // alloc buf for recv
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket // callback after read nbytes from socket
static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after write data to socket // callback after write data to socket
static void clientSendDataCb(uv_write_t* req, int status); static void cliSendCb(uv_write_t* req, int status);
// callback after conn to server // callback after conn to server
static void clientConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void clientAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static SCliConn* clientConnCreate(SCliThrdObj* thrd); static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void clientDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
static void clientHandleResp(SCliConn* conn); static void cliHandleResp(SCliConn* conn);
// handle except about conn // handle except about conn
static void clientHandleExcept(SCliConn* conn); static void cliHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd); static void cliSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(SRpcMsg* userdata);
static int clientRBChoseIdx(SRpcInfo* pTransInst); static int cliRBChoseIdx(SRpcInfo* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
clientHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ goto _RETURE; \
} \ } \
} while (0) } while (0)
@ -130,15 +130,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_BROKEN(conn) \ #define CONN_HANDLE_BROKEN(conn) \
do { \ do { \
if (conn->broken) { \ if (conn->broken) { \
clientHandleExcept(conn); \ cliHandleExcept(conn); \
goto _RETURE; \ goto _RETURE; \
} \ } \
} while (0); } while (0);
static void* clientThread(void* arg); static void* cliWorkThread(void* arg);
static void* clientNotifyApp() {} static void* cliNotifyApp() {}
static void clientHandleResp(SCliConn* conn) { static void cliHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data; SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
@ -164,25 +164,25 @@ static void clientHandleResp(SCliConn* conn) {
transRefCliHandle(conn); transRefCliHandle(conn);
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn); tDebug("cli conn %p persist by app", conn);
} }
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("%s client conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb); uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->persist == 0) { if (conn->persist == 0) {
@ -193,10 +193,10 @@ static void clientHandleResp(SCliConn* conn) {
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void cliHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL) {
// handle conn except in conn pool // handle conn except in conn pool
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
@ -214,25 +214,25 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.msgType = pMsg->msg.msgType + 1; rpcMsg.msgType = pMsg->msg.msgType + 1;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("%s client conn %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
destroyCmsg(pConn->data); destroyCmsg(pConn->data);
pConn->data = NULL; pConn->data = NULL;
tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
static void clientTimeoutCb(uv_timer_t* handle) { static void cliTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
@ -250,7 +250,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
} }
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
static void* createConnPool(int size) { static void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
@ -263,7 +263,7 @@ static void* destroyConnPool(void* pool) {
queue* h = QUEUE_HEAD(&connList->conn); queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn); SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c, true); cliDestroyConn(c, true);
} }
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
@ -299,7 +299,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
@ -309,12 +309,12 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
assert(plist != NULL); assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
} }
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
if (handle->data == NULL) { if (handle->data == NULL) {
return; return;
@ -324,10 +324,10 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
clientHandleResp(conn); cliHandleResp(conn);
} else { } else {
tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
} }
return; return;
} }
@ -340,13 +340,13 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
return; return;
} }
if (nread < 0) { if (nread < 0) {
tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
conn->broken = true; conn->broken = true;
clientHandleExcept(conn); cliHandleExcept(conn);
} }
} }
static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
// read/write stream handle // read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
@ -362,40 +362,40 @@ static SCliConn* clientConnCreate(SCliThrdObj* pThrd) {
transRefCliHandle(conn); transRefCliHandle(conn);
return conn; return conn;
} }
static void clientConnDestroy(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
} }
static void clientDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
free(conn->stream); free(conn->stream);
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
static void clientSendDataCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) { if (pMsg == NULL) {
return; return;
} }
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
} else { } else {
tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
clientHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb);
} }
static void clientSendData(SCliConn* pConn) { static void cliSend(SCliConn* pConn) {
CONN_HANDLE_BROKEN(pConn); CONN_HANDLE_BROKEN(pConn);
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
@ -432,22 +432,22 @@ static void clientSendData(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return; return;
_RETURE: _RETURE:
return; return;
} }
static void clientConnCb(uv_connect_t* req, int status) { static void cliConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
clientHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
int addrlen = sizeof(pConn->addr); int addrlen = sizeof(pConn->addr);
@ -456,14 +456,14 @@ static void clientConnCb(uv_connect_t* req, int status) {
addrlen = sizeof(pConn->locaddr); addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientSendData(pConn); cliSend(pConn);
} }
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("client work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
@ -472,57 +472,57 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} }
static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.handle != NULL) { if (pMsg->msg.handle != NULL) {
conn = (SCliConn*)(pMsg->msg.handle); conn = (SCliConn*)(pMsg->msg.handle);
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
if (conn != NULL) { if (conn != NULL) {
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
} else { } else {
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} }
return conn; return conn;
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst; SRpcInfo* pTransInst = pThrd->pTransInst;
SCliConn* conn = clientGetConn(pMsg, pThrd); SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) { if (conn != NULL) {
conn->data = pMsg; conn->data = pMsg;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
clientSendData(conn); cliSend(conn);
} else { } else {
conn = clientConnCreate(pThrd); conn = cliCreateConn(pThrd);
conn->data = pMsg; conn->data = pMsg;
int ret = transSetConnOption((uv_tcp_t*)conn->stream); int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) { if (ret) {
tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
} }
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect // handle error in callback if fail to connect
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
} }
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
static void clientAsyncCb(uv_async_t* handle) { static void cliAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data; SAsyncItem* item = handle->data;
SCliThrdObj* pThrd = item->pThrd; SCliThrdObj* pThrd = item->pThrd;
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently // batch process to avoid to lock/unlock frequently
queue wq;
pthread_mutex_lock(&item->mtx); pthread_mutex_lock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq); QUEUE_MOVE(&item->qmsg, &wq);
pthread_mutex_unlock(&item->mtx); pthread_mutex_unlock(&item->mtx);
@ -534,25 +534,25 @@ static void clientAsyncCb(uv_async_t* handle) {
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->ctx == NULL) { if (pMsg->ctx == NULL) {
clientHandleQuit(pMsg, pThrd); cliHandleQuit(pMsg, pThrd);
} else { } else {
clientHandleReq(pMsg, pThrd); cliHandleReq(pMsg, pThrd);
} }
count++; count++;
} }
if (count >= 2) { if (count >= 2) {
tTrace("client process batch size: %d", count); tTrace("cli process batch size: %d", count);
} }
} }
static void* clientThread(void* arg) { static void* cliWorkThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
setThreadName("trans-client-work"); setThreadName("trans-cli-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj)); SCliObj* cli = calloc(1, sizeof(SCliObj));
SRpcInfo* pRpc = shandle; SRpcInfo* pRpc = shandle;
memcpy(cli->label, label, strlen(label)); memcpy(cli->label, label, strlen(label));
@ -564,9 +564,9 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
pThrd->pTransInst = shandle; pThrd->pTransInst = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
tDebug("success to create tranport-client thread %d", i); tDebug("success to create tranport-cli thread %d", i);
} }
cli->pThreadObj[i] = pThrd; cli->pThreadObj[i] = pThrd;
} }
@ -591,13 +591,14 @@ static void destroyCmsg(SCliMsg* pMsg) {
static SCliThrdObj* createThrdObj() { static SCliThrdObj* createThrdObj() {
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop); uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer); uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd; pThrd->timer.data = pThrd;
@ -628,21 +629,21 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx); free(ctx);
} }
// //
static void clientSendQuit(SCliThrdObj* thrd) { static void cliSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg)); SCliMsg* msg = calloc(1, sizeof(SCliMsg));
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
SClientObj* cli = arg; SCliObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
clientSendQuit(cli->pThreadObj[i]); cliSendQuit(cli->pThreadObj[i]);
destroyThrdObj(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]);
} }
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
static int clientRBChoseIdx(SRpcInfo* pTransInst) { static int cliRBChoseIdx(SRpcInfo* pTransInst) {
int64_t index = pTransInst->index; int64_t index = pTransInst->index;
if (pTransInst->index++ >= pTransInst->numOfThreads) { if (pTransInst->index++ >= pTransInst->numOfThreads) {
pTransInst->index = 0; pTransInst->index = 0;
@ -662,7 +663,7 @@ void transUnrefCliHandle(void* handle) {
} }
int ref = T_REF_DEC((SCliConn*)handle); int ref = T_REF_DEC((SCliConn*)handle);
if (ref == 0) { if (ref == 0) {
clientConnDestroy((SCliConn*)handle, true); cliDestroyConn((SCliConn*)handle, true);
} }
// unref cli handle // unref cli handle
@ -676,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
int index = CONN_HOST_THREAD_INDEX(pMsg->handle); int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
int32_t flen = 0; int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
@ -697,7 +698,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
cliMsg->msg = *pMsg; cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
@ -709,7 +710,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
@ -727,7 +728,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem; tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem); tsem_wait(pSem);