commit
9fe7540539
|
@ -29,7 +29,6 @@ extern "C" {
|
||||||
|
|
||||||
extern int tsRpcHeadSize;
|
extern int tsRpcHeadSize;
|
||||||
|
|
||||||
typedef struct SRpcPush SRpcPush;
|
|
||||||
|
|
||||||
typedef struct SRpcConnInfo {
|
typedef struct SRpcConnInfo {
|
||||||
uint32_t clientIp;
|
uint32_t clientIp;
|
||||||
|
@ -45,14 +44,8 @@ typedef struct SRpcMsg {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void * handle; // rpc handle returned to app
|
void * handle; // rpc handle returned to app
|
||||||
void * ahandle; // app handle set by client
|
void * ahandle; // app handle set by client
|
||||||
int persist; // keep handle or not, default 0
|
|
||||||
|
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef struct SRpcPush {
|
|
||||||
void *arg;
|
|
||||||
int (*callback)(void *arg, SRpcMsg *rpcMsg);
|
|
||||||
} SRpcPush;
|
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
|
|
|
@ -63,12 +63,12 @@ typedef struct SCliThrdObj {
|
||||||
bool quit;
|
bool quit;
|
||||||
} SCliThrdObj;
|
} SCliThrdObj;
|
||||||
|
|
||||||
typedef struct SClientObj {
|
typedef struct SCliObj {
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
int32_t index;
|
int32_t index;
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
SCliThrdObj** pThreadObj;
|
SCliThrdObj** pThreadObj;
|
||||||
} SClientObj;
|
} SCliObj;
|
||||||
|
|
||||||
typedef struct SConnList {
|
typedef struct SConnList {
|
||||||
queue conn;
|
queue conn;
|
||||||
|
@ -82,32 +82,32 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
||||||
|
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
static void clientTimeoutCb(uv_timer_t* handle);
|
static void cliTimeoutCb(uv_timer_t* handle);
|
||||||
// alloc buf for read
|
// alloc buf for recv
|
||||||
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
// callback after read nbytes from socket
|
// callback after read nbytes from socket
|
||||||
static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
// callback after write data to socket
|
// callback after write data to socket
|
||||||
static void clientSendDataCb(uv_write_t* req, int status);
|
static void cliSendCb(uv_write_t* req, int status);
|
||||||
// callback after conn to server
|
// callback after conn to server
|
||||||
static void clientConnCb(uv_connect_t* req, int status);
|
static void cliConnCb(uv_connect_t* req, int status);
|
||||||
static void clientAsyncCb(uv_async_t* handle);
|
static void cliAsyncCb(uv_async_t* handle);
|
||||||
|
|
||||||
static SCliConn* clientConnCreate(SCliThrdObj* thrd);
|
static SCliConn* cliCreateConn(SCliThrdObj* thrd);
|
||||||
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
|
||||||
static void clientDestroy(uv_handle_t* handle);
|
static void cliDestroy(uv_handle_t* handle);
|
||||||
|
|
||||||
// process data read from server, add decompress etc later
|
// process data read from server, add decompress etc later
|
||||||
static void clientHandleResp(SCliConn* conn);
|
static void cliHandleResp(SCliConn* conn);
|
||||||
// handle except about conn
|
// handle except about conn
|
||||||
static void clientHandleExcept(SCliConn* conn);
|
static void cliHandleExcept(SCliConn* conn);
|
||||||
// handle req from app
|
// handle req from app
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void clientSendQuit(SCliThrdObj* thrd);
|
static void cliSendQuit(SCliThrdObj* thrd);
|
||||||
static void destroyUserdata(SRpcMsg* userdata);
|
static void destroyUserdata(SRpcMsg* userdata);
|
||||||
|
|
||||||
static int clientRBChoseIdx(SRpcInfo* pTransInst);
|
static int cliRBChoseIdx(SRpcInfo* pTransInst);
|
||||||
|
|
||||||
static void destroyCmsg(SCliMsg* cmsg);
|
static void destroyCmsg(SCliMsg* cmsg);
|
||||||
static void transDestroyConnCtx(STransConnCtx* ctx);
|
static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
|
@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
||||||
do { \
|
do { \
|
||||||
if (thrd->quit) { \
|
if (thrd->quit) { \
|
||||||
clientHandleExcept(conn); \
|
cliHandleExcept(conn); \
|
||||||
goto _RETURE; \
|
goto _RETURE; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
@ -130,15 +130,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
#define CONN_HANDLE_BROKEN(conn) \
|
#define CONN_HANDLE_BROKEN(conn) \
|
||||||
do { \
|
do { \
|
||||||
if (conn->broken) { \
|
if (conn->broken) { \
|
||||||
clientHandleExcept(conn); \
|
cliHandleExcept(conn); \
|
||||||
goto _RETURE; \
|
goto _RETURE; \
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
static void* clientThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
||||||
static void* clientNotifyApp() {}
|
static void* cliNotifyApp() {}
|
||||||
static void clientHandleResp(SCliConn* conn) {
|
static void cliHandleResp(SCliConn* conn) {
|
||||||
SCliMsg* pMsg = conn->data;
|
SCliMsg* pMsg = conn->data;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
|
@ -164,25 +164,25 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
transRefCliHandle(conn);
|
transRefCliHandle(conn);
|
||||||
|
|
||||||
conn->persist = 1;
|
conn->persist = 1;
|
||||||
tDebug("client conn %p persist by app", conn);
|
tDebug("cli conn %p persist by app", conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||||
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||||
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
|
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
|
|
||||||
if (pCtx->pSem == NULL) {
|
if (pCtx->pSem == NULL) {
|
||||||
tTrace("%s client conn %p handle resp", pTransInst->label, conn);
|
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
|
||||||
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn);
|
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
|
||||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb);
|
uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb);
|
||||||
|
|
||||||
// user owns conn->persist = 1
|
// user owns conn->persist = 1
|
||||||
if (conn->persist == 0) {
|
if (conn->persist == 0) {
|
||||||
|
@ -193,10 +193,10 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
// start thread's timer of conn pool if not active
|
// start thread's timer of conn pool if not active
|
||||||
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
||||||
// uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientHandleExcept(SCliConn* pConn) {
|
static void cliHandleExcept(SCliConn* pConn) {
|
||||||
if (pConn->data == NULL) {
|
if (pConn->data == NULL) {
|
||||||
// handle conn except in conn pool
|
// handle conn except in conn pool
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
|
@ -214,25 +214,25 @@ static void clientHandleExcept(SCliConn* pConn) {
|
||||||
rpcMsg.msgType = pMsg->msg.msgType + 1;
|
rpcMsg.msgType = pMsg->msg.msgType + 1;
|
||||||
|
|
||||||
if (pCtx->pSem == NULL) {
|
if (pCtx->pSem == NULL) {
|
||||||
tTrace("%s client conn %p handle resp", pTransInst->label, pConn);
|
tTrace("%s cli conn %p handle resp", pTransInst->label, pConn);
|
||||||
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
(pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn);
|
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn);
|
||||||
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
destroyCmsg(pConn->data);
|
destroyCmsg(pConn->data);
|
||||||
pConn->data = NULL;
|
pConn->data = NULL;
|
||||||
|
|
||||||
tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
static void cliTimeoutCb(uv_timer_t* handle) {
|
||||||
SCliThrdObj* pThrd = handle->data;
|
SCliThrdObj* pThrd = handle->data;
|
||||||
SRpcInfo* pRpc = pThrd->pTransInst;
|
SRpcInfo* pRpc = pThrd->pTransInst;
|
||||||
int64_t currentTime = pThrd->nextTimeout;
|
int64_t currentTime = pThrd->nextTimeout;
|
||||||
tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label);
|
tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label);
|
||||||
|
|
||||||
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
|
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
|
||||||
while (p != NULL) {
|
while (p != NULL) {
|
||||||
|
@ -250,7 +250,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
}
|
}
|
||||||
static void* createConnPool(int size) {
|
static void* createConnPool(int size) {
|
||||||
// thread local, no lock
|
// thread local, no lock
|
||||||
|
@ -263,7 +263,7 @@ static void* destroyConnPool(void* pool) {
|
||||||
queue* h = QUEUE_HEAD(&connList->conn);
|
queue* h = QUEUE_HEAD(&connList->conn);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
|
||||||
clientConnDestroy(c, true);
|
cliDestroyConn(c, true);
|
||||||
}
|
}
|
||||||
connList = taosHashIterate((SHashObj*)pool, connList);
|
connList = taosHashIterate((SHashObj*)pool, connList);
|
||||||
}
|
}
|
||||||
|
@ -299,7 +299,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
|
|
||||||
tstrncpy(key, ip, strlen(ip));
|
tstrncpy(key, ip, strlen(ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||||
tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||||
|
|
||||||
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||||
|
|
||||||
|
@ -309,12 +309,12 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
assert(plist != NULL);
|
assert(plist != NULL);
|
||||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||||
}
|
}
|
||||||
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
transAllocBuffer(pBuf, buf);
|
transAllocBuffer(pBuf, buf);
|
||||||
}
|
}
|
||||||
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
// impl later
|
// impl later
|
||||||
if (handle->data == NULL) {
|
if (handle->data == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -324,10 +324,10 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
if (transReadComplete(pBuf)) {
|
if (transReadComplete(pBuf)) {
|
||||||
tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
clientHandleResp(conn);
|
cliHandleResp(conn);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -340,13 +340,13 @@ static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
tError("%s cli conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
|
||||||
conn->broken = true;
|
conn->broken = true;
|
||||||
clientHandleExcept(conn);
|
cliHandleExcept(conn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* clientConnCreate(SCliThrdObj* pThrd) {
|
static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
||||||
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
||||||
// read/write stream handle
|
// read/write stream handle
|
||||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||||
|
@ -362,40 +362,40 @@ static SCliConn* clientConnCreate(SCliThrdObj* pThrd) {
|
||||||
transRefCliHandle(conn);
|
transRefCliHandle(conn);
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
static void clientConnDestroy(SCliConn* conn, bool clear) {
|
static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
QUEUE_REMOVE(&conn->conn);
|
QUEUE_REMOVE(&conn->conn);
|
||||||
if (clear) {
|
if (clear) {
|
||||||
uv_close((uv_handle_t*)conn->stream, clientDestroy);
|
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientDestroy(uv_handle_t* handle) {
|
static void cliDestroy(uv_handle_t* handle) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
|
|
||||||
free(conn->stream);
|
free(conn->stream);
|
||||||
tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientSendDataCb(uv_write_t* req, int status) {
|
static void cliSendCb(uv_write_t* req, int status) {
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
|
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
SCliMsg* pMsg = pConn->data;
|
SCliMsg* pMsg = pConn->data;
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
destroyUserdata(&pMsg->msg);
|
destroyUserdata(&pMsg->msg);
|
||||||
} else {
|
} else {
|
||||||
tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
|
tError("%s cli conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
|
||||||
clientHandleExcept(pConn);
|
cliHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb);
|
uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientSendData(SCliConn* pConn) {
|
static void cliSend(SCliConn* pConn) {
|
||||||
CONN_HANDLE_BROKEN(pConn);
|
CONN_HANDLE_BROKEN(pConn);
|
||||||
|
|
||||||
SCliMsg* pCliMsg = pConn->data;
|
SCliMsg* pCliMsg = pConn->data;
|
||||||
|
@ -432,22 +432,22 @@ static void clientSendData(SCliConn* pConn) {
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
|
|
||||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb);
|
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
_RETURE:
|
_RETURE:
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static void clientConnCb(uv_connect_t* req, int status) {
|
static void cliConnCb(uv_connect_t* req, int status) {
|
||||||
// impl later
|
// impl later
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
|
tError("%s cli conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
|
||||||
clientHandleExcept(pConn);
|
cliHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int addrlen = sizeof(pConn->addr);
|
int addrlen = sizeof(pConn->addr);
|
||||||
|
@ -456,14 +456,14 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
addrlen = sizeof(pConn->locaddr);
|
addrlen = sizeof(pConn->locaddr);
|
||||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
||||||
|
|
||||||
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s cli conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
clientSendData(pConn);
|
cliSend(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tDebug("client work thread %p start to quit", pThrd);
|
tDebug("cli work thread %p start to quit", pThrd);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
destroyConnPool(pThrd->pool);
|
destroyConnPool(pThrd->pool);
|
||||||
|
|
||||||
|
@ -472,57 +472,57 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
SCliConn* conn = NULL;
|
SCliConn* conn = NULL;
|
||||||
if (pMsg->msg.handle != NULL) {
|
if (pMsg->msg.handle != NULL) {
|
||||||
conn = (SCliConn*)(pMsg->msg.handle);
|
conn = (SCliConn*)(pMsg->msg.handle);
|
||||||
transUnrefCliHandle(conn);
|
transUnrefCliHandle(conn);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
uint64_t et = taosGetTimestampUs();
|
uint64_t et = taosGetTimestampUs();
|
||||||
uint64_t el = et - pMsg->st;
|
uint64_t el = et - pMsg->st;
|
||||||
tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
|
tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el);
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SRpcInfo* pTransInst = pThrd->pTransInst;
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
SCliConn* conn = clientGetConn(pMsg, pThrd);
|
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
clientSendData(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = clientConnCreate(pThrd);
|
conn = cliCreateConn(pThrd);
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
|
||||||
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||||
}
|
}
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||||
// handle error in callback if fail to connect
|
// handle error in callback if fail to connect
|
||||||
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
}
|
}
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
}
|
}
|
||||||
static void clientAsyncCb(uv_async_t* handle) {
|
static void cliAsyncCb(uv_async_t* handle) {
|
||||||
SAsyncItem* item = handle->data;
|
SAsyncItem* item = handle->data;
|
||||||
SCliThrdObj* pThrd = item->pThrd;
|
SCliThrdObj* pThrd = item->pThrd;
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = NULL;
|
||||||
queue wq;
|
|
||||||
|
|
||||||
// batch process to avoid to lock/unlock frequently
|
// batch process to avoid to lock/unlock frequently
|
||||||
|
queue wq;
|
||||||
pthread_mutex_lock(&item->mtx);
|
pthread_mutex_lock(&item->mtx);
|
||||||
QUEUE_MOVE(&item->qmsg, &wq);
|
QUEUE_MOVE(&item->qmsg, &wq);
|
||||||
pthread_mutex_unlock(&item->mtx);
|
pthread_mutex_unlock(&item->mtx);
|
||||||
|
@ -534,25 +534,25 @@ static void clientAsyncCb(uv_async_t* handle) {
|
||||||
|
|
||||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
if (pMsg->ctx == NULL) {
|
if (pMsg->ctx == NULL) {
|
||||||
clientHandleQuit(pMsg, pThrd);
|
cliHandleQuit(pMsg, pThrd);
|
||||||
} else {
|
} else {
|
||||||
clientHandleReq(pMsg, pThrd);
|
cliHandleReq(pMsg, pThrd);
|
||||||
}
|
}
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (count >= 2) {
|
if (count >= 2) {
|
||||||
tTrace("client process batch size: %d", count);
|
tTrace("cli process batch size: %d", count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* clientThread(void* arg) {
|
static void* cliWorkThread(void* arg) {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||||
setThreadName("trans-client-work");
|
setThreadName("trans-cli-work");
|
||||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
SClientObj* cli = calloc(1, sizeof(SClientObj));
|
SCliObj* cli = calloc(1, sizeof(SCliObj));
|
||||||
|
|
||||||
SRpcInfo* pRpc = shandle;
|
SRpcInfo* pRpc = shandle;
|
||||||
memcpy(cli->label, label, strlen(label));
|
memcpy(cli->label, label, strlen(label));
|
||||||
|
@ -564,9 +564,9 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
pThrd->pTransInst = shandle;
|
pThrd->pTransInst = shandle;
|
||||||
|
|
||||||
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
|
int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd));
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tDebug("success to create tranport-client thread %d", i);
|
tDebug("success to create tranport-cli thread %d", i);
|
||||||
}
|
}
|
||||||
cli->pThreadObj[i] = pThrd;
|
cli->pThreadObj[i] = pThrd;
|
||||||
}
|
}
|
||||||
|
@ -591,13 +591,14 @@ static void destroyCmsg(SCliMsg* pMsg) {
|
||||||
|
|
||||||
static SCliThrdObj* createThrdObj() {
|
static SCliThrdObj* createThrdObj() {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
||||||
|
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
||||||
|
|
||||||
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(pThrd->loop);
|
uv_loop_init(pThrd->loop);
|
||||||
|
|
||||||
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb);
|
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
|
||||||
|
|
||||||
uv_timer_init(pThrd->loop, &pThrd->timer);
|
uv_timer_init(pThrd->loop, &pThrd->timer);
|
||||||
pThrd->timer.data = pThrd;
|
pThrd->timer.data = pThrd;
|
||||||
|
@ -628,21 +629,21 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
|
||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
static void clientSendQuit(SCliThrdObj* thrd) {
|
static void cliSendQuit(SCliThrdObj* thrd) {
|
||||||
// cli can stop gracefully
|
// cli can stop gracefully
|
||||||
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
||||||
transSendAsync(thrd->asyncPool, &msg->q);
|
transSendAsync(thrd->asyncPool, &msg->q);
|
||||||
}
|
}
|
||||||
void taosCloseClient(void* arg) {
|
void taosCloseClient(void* arg) {
|
||||||
SClientObj* cli = arg;
|
SCliObj* cli = arg;
|
||||||
for (int i = 0; i < cli->numOfThreads; i++) {
|
for (int i = 0; i < cli->numOfThreads; i++) {
|
||||||
clientSendQuit(cli->pThreadObj[i]);
|
cliSendQuit(cli->pThreadObj[i]);
|
||||||
destroyThrdObj(cli->pThreadObj[i]);
|
destroyThrdObj(cli->pThreadObj[i]);
|
||||||
}
|
}
|
||||||
free(cli->pThreadObj);
|
free(cli->pThreadObj);
|
||||||
free(cli);
|
free(cli);
|
||||||
}
|
}
|
||||||
static int clientRBChoseIdx(SRpcInfo* pTransInst) {
|
static int cliRBChoseIdx(SRpcInfo* pTransInst) {
|
||||||
int64_t index = pTransInst->index;
|
int64_t index = pTransInst->index;
|
||||||
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
||||||
pTransInst->index = 0;
|
pTransInst->index = 0;
|
||||||
|
@ -662,7 +663,7 @@ void transUnrefCliHandle(void* handle) {
|
||||||
}
|
}
|
||||||
int ref = T_REF_DEC((SCliConn*)handle);
|
int ref = T_REF_DEC((SCliConn*)handle);
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
clientConnDestroy((SCliConn*)handle, true);
|
cliDestroyConn((SCliConn*)handle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// unref cli handle
|
// unref cli handle
|
||||||
|
@ -676,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
|
int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pTransInst);
|
index = cliRBChoseIdx(pTransInst);
|
||||||
}
|
}
|
||||||
int32_t flen = 0;
|
int32_t flen = 0;
|
||||||
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
|
||||||
|
@ -697,7 +698,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
cliMsg->msg = *pMsg;
|
cliMsg->msg = *pMsg;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -709,7 +710,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
|
|
||||||
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
int index = CONN_HOST_THREAD_INDEX(pReq->handle);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = clientRBChoseIdx(pTransInst);
|
index = cliRBChoseIdx(pTransInst);
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
|
@ -727,7 +728,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
cliMsg->msg = *pReq;
|
cliMsg->msg = *pReq;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
|
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
tsem_t* pSem = pCtx->pSem;
|
tsem_t* pSem = pCtx->pSem;
|
||||||
tsem_wait(pSem);
|
tsem_wait(pSem);
|
||||||
|
|
|
@ -3,7 +3,6 @@ add_executable(client "")
|
||||||
add_executable(server "")
|
add_executable(server "")
|
||||||
add_executable(transUT "")
|
add_executable(transUT "")
|
||||||
add_executable(syncClient "")
|
add_executable(syncClient "")
|
||||||
add_executable(pushClient "")
|
|
||||||
add_executable(pushServer "")
|
add_executable(pushServer "")
|
||||||
|
|
||||||
target_sources(transUT
|
target_sources(transUT
|
||||||
|
@ -27,10 +26,6 @@ target_sources (syncClient
|
||||||
"syncClient.c"
|
"syncClient.c"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_sources(pushClient
|
|
||||||
PRIVATE
|
|
||||||
"pushClient.c"
|
|
||||||
)
|
|
||||||
target_sources(pushServer
|
target_sources(pushServer
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"pushServer.c"
|
"pushServer.c"
|
||||||
|
@ -102,19 +97,6 @@ target_link_libraries (syncClient
|
||||||
transport
|
transport
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(pushClient
|
|
||||||
PUBLIC
|
|
||||||
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
|
||||||
)
|
|
||||||
target_link_libraries (pushClient
|
|
||||||
os
|
|
||||||
util
|
|
||||||
common
|
|
||||||
gtest_main
|
|
||||||
transport
|
|
||||||
)
|
|
||||||
|
|
||||||
target_include_directories(pushServer
|
target_include_directories(pushServer
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||||
|
|
|
@ -1,242 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#include <sys/time.h>
|
|
||||||
|
|
||||||
#include <tdatablock.h>
|
|
||||||
#include "os.h"
|
|
||||||
#include "rpcLog.h"
|
|
||||||
#include "taoserror.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "trpc.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int index;
|
|
||||||
SEpSet epSet;
|
|
||||||
int num;
|
|
||||||
int numOfReqs;
|
|
||||||
int msgSize;
|
|
||||||
tsem_t rspSem;
|
|
||||||
tsem_t * pOverSem;
|
|
||||||
pthread_t thread;
|
|
||||||
void * pRpc;
|
|
||||||
} SInfo;
|
|
||||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|
||||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
|
||||||
pMsg->code);
|
|
||||||
|
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
// tsem_post(&pInfo->rspSem);
|
|
||||||
tsem_post(&pInfo->rspSem);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tcount = 0;
|
|
||||||
|
|
||||||
typedef struct SPushArg {
|
|
||||||
tsem_t sem;
|
|
||||||
} SPushArg;
|
|
||||||
// ping
|
|
||||||
int pushCallback(void *arg, SRpcMsg *msg) {
|
|
||||||
SPushArg *push = arg;
|
|
||||||
tsem_post(&push->sem);
|
|
||||||
}
|
|
||||||
SRpcPush *createPushArg() {
|
|
||||||
SRpcPush *push = calloc(1, sizeof(SRpcPush));
|
|
||||||
push->arg = calloc(1, sizeof(SPushArg));
|
|
||||||
|
|
||||||
tsem_init(&(((SPushArg *)push->arg)->sem), 0, 0);
|
|
||||||
push->callback = pushCallback;
|
|
||||||
return push;
|
|
||||||
}
|
|
||||||
static void *sendRequest(void *param) {
|
|
||||||
SInfo * pInfo = (SInfo *)param;
|
|
||||||
SRpcMsg rpcMsg = {0};
|
|
||||||
|
|
||||||
tDebug("thread:%d, start to send request", pInfo->index);
|
|
||||||
|
|
||||||
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
|
|
||||||
int u100 = 0;
|
|
||||||
int u500 = 0;
|
|
||||||
int u1000 = 0;
|
|
||||||
int u10000 = 0;
|
|
||||||
|
|
||||||
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
|
||||||
SRpcPush *push = createPushArg();
|
|
||||||
pInfo->num++;
|
|
||||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
|
||||||
rpcMsg.ahandle = pInfo;
|
|
||||||
rpcMsg.msgType = 1;
|
|
||||||
// rpcMsg.push = push;
|
|
||||||
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
|
||||||
int64_t start = taosGetTimestampUs();
|
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
|
||||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
|
||||||
tsem_wait(&pInfo->rspSem); // ping->pong
|
|
||||||
// tsem_wait(&pInfo->rspSem);
|
|
||||||
SPushArg *arg = push->arg;
|
|
||||||
/// e
|
|
||||||
tsem_wait(&arg->sem); // push callback
|
|
||||||
|
|
||||||
// query_fetch(client->h)
|
|
||||||
int64_t end = taosGetTimestampUs() - start;
|
|
||||||
if (end <= 100) {
|
|
||||||
u100++;
|
|
||||||
} else if (end > 100 && end <= 500) {
|
|
||||||
u500++;
|
|
||||||
} else if (end > 500 && end < 1000) {
|
|
||||||
u1000++;
|
|
||||||
} else {
|
|
||||||
u10000++;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDebug("recv response succefully");
|
|
||||||
|
|
||||||
// taosSsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
|
|
||||||
tDebug("thread:%d, it is over", pInfo->index);
|
|
||||||
tcount++;
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
|
||||||
SRpcInit rpcInit;
|
|
||||||
SEpSet epSet;
|
|
||||||
int msgSize = 128;
|
|
||||||
int numOfReqs = 0;
|
|
||||||
int appThreads = 1;
|
|
||||||
char serverIp[40] = "127.0.0.1";
|
|
||||||
char secret[20] = "mypassword";
|
|
||||||
struct timeval systemTime;
|
|
||||||
int64_t startTime, endTime;
|
|
||||||
pthread_attr_t thattr;
|
|
||||||
|
|
||||||
// server info
|
|
||||||
epSet.inUse = 0;
|
|
||||||
addEpIntoEpSet(&epSet, serverIp, 7000);
|
|
||||||
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
|
||||||
|
|
||||||
// client info
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
|
||||||
rpcInit.localPort = 0;
|
|
||||||
rpcInit.label = "APP";
|
|
||||||
rpcInit.numOfThreads = 1;
|
|
||||||
rpcInit.cfp = processResponse;
|
|
||||||
rpcInit.sessions = 100;
|
|
||||||
rpcInit.idleTime = 100;
|
|
||||||
rpcInit.user = "michael";
|
|
||||||
rpcInit.secret = secret;
|
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = 1;
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
|
||||||
|
|
||||||
for (int i = 1; i < argc; ++i) {
|
|
||||||
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
|
||||||
epSet.eps[0].port = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
|
||||||
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
|
|
||||||
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
|
||||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
|
||||||
msgSize = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
|
||||||
rpcInit.sessions = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
|
||||||
numOfReqs = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
|
||||||
appThreads = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
|
||||||
tsCompressMsgSize = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
|
||||||
rpcInit.user = argv[++i];
|
|
||||||
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
|
||||||
rpcInit.secret = argv[++i];
|
|
||||||
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
|
||||||
rpcInit.spi = atoi(argv[++i]);
|
|
||||||
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
|
||||||
rpcDebugFlag = atoi(argv[++i]);
|
|
||||||
} else {
|
|
||||||
printf("\nusage: %s [options] \n", argv[0]);
|
|
||||||
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
|
||||||
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
|
|
||||||
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
|
||||||
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
|
||||||
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
|
||||||
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
|
||||||
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
|
||||||
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
|
||||||
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
|
||||||
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
|
||||||
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
|
||||||
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
|
||||||
printf(" [-h help]: print out this help\n\n");
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosInitLog("client.log", 10);
|
|
||||||
|
|
||||||
void *pRpc = rpcOpen(&rpcInit);
|
|
||||||
if (pRpc == NULL) {
|
|
||||||
tError("failed to initialize RPC");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tInfo("client is initialized");
|
|
||||||
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
|
||||||
|
|
||||||
taosGetTimeOfDay(&systemTime);
|
|
||||||
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
|
||||||
|
|
||||||
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
|
|
||||||
for (int i = 0; i < appThreads; ++i) {
|
|
||||||
pInfo->index = i;
|
|
||||||
pInfo->epSet = epSet;
|
|
||||||
pInfo->numOfReqs = numOfReqs;
|
|
||||||
pInfo->msgSize = msgSize;
|
|
||||||
tsem_init(&pInfo->rspSem, 0, 0);
|
|
||||||
pInfo->pRpc = pRpc;
|
|
||||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
|
||||||
pInfo++;
|
|
||||||
}
|
|
||||||
|
|
||||||
do {
|
|
||||||
taosUsleep(1);
|
|
||||||
} while (tcount < appThreads);
|
|
||||||
|
|
||||||
taosGetTimeOfDay(&systemTime);
|
|
||||||
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
|
||||||
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
|
|
||||||
|
|
||||||
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
|
||||||
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
|
||||||
|
|
||||||
int ch = getchar();
|
|
||||||
UNUSED(ch);
|
|
||||||
|
|
||||||
taosCloseLog();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue