add client
This commit is contained in:
parent
df62f45256
commit
a50c3f3d5e
|
@ -68,6 +68,25 @@ typedef void* queue[2];
|
||||||
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
|
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
|
||||||
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
|
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
|
||||||
}
|
}
|
||||||
|
#define QUEUE_SPLIT(h, q, n) \
|
||||||
|
do { \
|
||||||
|
QUEUE_PREV(n) = QUEUE_PREV(h); \
|
||||||
|
QUEUE_PREV_NEXT(n) = (n); \
|
||||||
|
QUEUE_NEXT(n) = (q); \
|
||||||
|
QUEUE_PREV(h) = QUEUE_PREV(q); \
|
||||||
|
QUEUE_PREV_NEXT(h) = (h); \
|
||||||
|
QUEUE_PREV(q) = (n); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define QUEUE_MOVE(h, n) \
|
||||||
|
do { \
|
||||||
|
if (QUEUE_IS_EMPTY(h)) { \
|
||||||
|
QUEUE_INIT(n); \
|
||||||
|
} else { \
|
||||||
|
queue* q = QUEUE_HEAD(h); \
|
||||||
|
QUEUE_SPLIT(h, q, n); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
/* Return the element at the front of the queue. */
|
/* Return the element at the front of the queue. */
|
||||||
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
|
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
|
||||||
|
|
|
@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
if (pInit->label) {
|
if (pInit->label) {
|
||||||
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||||
}
|
}
|
||||||
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
|
|
@ -20,12 +20,14 @@
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
uv_stream_t* stream;
|
uv_stream_t* stream;
|
||||||
|
uv_write_t* writeReq;
|
||||||
void* data;
|
void* data;
|
||||||
queue conn;
|
queue conn;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
SRpcReqContext* context;
|
SRpcReqContext* context;
|
||||||
queue q;
|
queue q;
|
||||||
|
uint64_t st;
|
||||||
} SCliMsg;
|
} SCliMsg;
|
||||||
|
|
||||||
typedef struct SCliThrdObj {
|
typedef struct SCliThrdObj {
|
||||||
|
@ -45,86 +47,153 @@ typedef struct SClientObj {
|
||||||
SCliThrdObj** pThreadObj;
|
SCliThrdObj** pThreadObj;
|
||||||
} SClientObj;
|
} SClientObj;
|
||||||
|
|
||||||
static void clientWriteCb(uv_write_t* req, int status);
|
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void clientConnCb(struct uv_connect_s* req, int status);
|
static void clientWriteCb(uv_write_t* req, int status);
|
||||||
|
static void clientConnCb(uv_connect_t* req, int status);
|
||||||
static void clientAsyncCb(uv_async_t* handle);
|
static void clientAsyncCb(uv_async_t* handle);
|
||||||
|
static void clientDestroy(uv_handle_t* handle);
|
||||||
|
static void clientConnDestroy(SCliConn* pConn);
|
||||||
|
|
||||||
static void* clientThread(void* arg);
|
static void* clientThread(void* arg);
|
||||||
|
|
||||||
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
|
||||||
|
static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
|
// impl later
|
||||||
|
}
|
||||||
|
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
// impl later
|
||||||
|
SCliConn* conn = handle->data;
|
||||||
|
if (nread > 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//
|
||||||
|
uv_close((uv_handle_t*)handle, clientDestroy);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clientConnDestroy(SCliConn* conn) {
|
||||||
|
// impl later
|
||||||
|
//
|
||||||
|
}
|
||||||
|
static void clientDestroy(uv_handle_t* handle) {
|
||||||
|
SCliConn* conn = handle->data;
|
||||||
|
clientConnDestroy(conn);
|
||||||
|
}
|
||||||
|
|
||||||
static void clientWriteCb(uv_write_t* req, int status) {
|
static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
// impl later
|
|
||||||
}
|
|
||||||
static void clientFailedCb(uv_handle_t* handle) {
|
|
||||||
// impl later
|
|
||||||
tDebug("close handle");
|
|
||||||
}
|
|
||||||
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|
||||||
// impl later
|
|
||||||
}
|
|
||||||
static void clientConnCb(struct uv_connect_s* req, int status) {
|
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
SCliMsg* pMsg = pConn->data;
|
if (status == 0) {
|
||||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
tDebug("data already was written on stream");
|
||||||
|
} else {
|
||||||
|
uv_close((uv_handle_t*)pConn->stream, clientDestroy);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb);
|
||||||
|
// impl later
|
||||||
|
}
|
||||||
|
static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
|
// impl later
|
||||||
|
SCliConn* pConn = req->data;
|
||||||
|
if (status != 0) {
|
||||||
|
tError("failed to connect %s", uv_err_name(status));
|
||||||
|
clientConnDestroy(pConn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCliMsg* pMsg = pConn->data;
|
||||||
|
SEpSet* pEpSet = &pMsg->context->epSet;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
rpcMsg.ahandle = pMsg->context->ahandle;
|
||||||
|
rpcMsg.pCont = NULL;
|
||||||
|
|
||||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
||||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
// call user fp later
|
// call user fp later
|
||||||
tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
|
tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
|
||||||
uv_close((uv_handle_t*)req->handle, clientFailedCb);
|
SRpcInfo* pRpc = pMsg->context->pRpc;
|
||||||
|
(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
||||||
|
uv_close((uv_handle_t*)req->handle, clientDestroy);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
|
|
||||||
// impl later
|
uv_buf_t wb;
|
||||||
|
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
|
||||||
// impl later
|
// impl later
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
static void clientAsyncCb(uv_async_t* handle) {
|
|
||||||
SCliThrdObj* pThrd = handle->data;
|
|
||||||
SCliMsg* pMsg = NULL;
|
|
||||||
pthread_mutex_lock(&pThrd->msgMtx);
|
|
||||||
if (!QUEUE_IS_EMPTY(&pThrd->msg)) {
|
|
||||||
queue* head = QUEUE_HEAD(&pThrd->msg);
|
|
||||||
pMsg = QUEUE_DATA(head, SCliMsg, q);
|
|
||||||
QUEUE_REMOVE(head);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
|
||||||
|
|
||||||
SEpSet* pEpSet = &pMsg->context->epSet;
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
SEpSet* pEpSet = &pMsg->context->epSet;
|
||||||
|
|
||||||
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
char* fqdn = pEpSet->fqdn[pEpSet->inUse];
|
||||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
uint32_t port = pEpSet->port[pEpSet->inUse];
|
||||||
|
|
||||||
|
uint64_t el = taosGetTimestampUs() - pMsg->st;
|
||||||
|
tDebug("msg tran time cost: %" PRIu64 "", el);
|
||||||
|
|
||||||
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
|
SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
|
conn->data = pMsg;
|
||||||
|
conn->writeReq->data = conn;
|
||||||
|
uv_buf_t wb;
|
||||||
|
uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb);
|
||||||
// impl later
|
// impl later
|
||||||
} else {
|
} else {
|
||||||
SCliConn* conn = malloc(sizeof(SCliConn));
|
SCliConn* conn = malloc(sizeof(SCliConn));
|
||||||
|
|
||||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||||
|
conn->writeReq = malloc(sizeof(uv_write_t));
|
||||||
|
|
||||||
conn->connReq.data = conn;
|
conn->connReq.data = conn;
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
uv_ip4_addr(fqdn, port, &addr);
|
uv_ip4_addr(fqdn, port, &addr);
|
||||||
// handle error in callback if connect error
|
// handle error in callback if fail to connect
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||||
|
|
||||||
|
// SRpcMsg rpcMsg;
|
||||||
|
// SEpSet* pEpSet = &pMsg->context->epSet;
|
||||||
|
// SRpcInfo* pRpc = pMsg->context->pRpc;
|
||||||
|
//// rpcMsg.ahandle = pMsg->context->ahandle;
|
||||||
|
// rpcMsg.pCont = NULL;
|
||||||
|
// rpcMsg.ahandle = pMsg->context->ahandle;
|
||||||
|
// uint64_t el1 = taosGetTimestampUs() - et;
|
||||||
|
// tError("msg tran back first: time cost: %" PRIu64 "", el1);
|
||||||
|
// et = taosGetTimestampUs();
|
||||||
|
//(pRpc->cfp)(NULL, &rpcMsg, pEpSet);
|
||||||
|
// uint64_t el2 = taosGetTimestampUs() - et;
|
||||||
|
// tError("msg tran back second: time cost: %" PRIu64 "", el2);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
static void clientAsyncCb(uv_async_t* handle) {
|
||||||
|
SCliThrdObj* pThrd = handle->data;
|
||||||
|
SCliMsg* pMsg = NULL;
|
||||||
|
queue wq;
|
||||||
|
|
||||||
// SRpcReqContext* pCxt = pMsg->context;
|
// batch process to avoid to lock/unlock frequently
|
||||||
|
pthread_mutex_lock(&pThrd->msgMtx);
|
||||||
|
QUEUE_MOVE(&pThrd->msg, &wq);
|
||||||
|
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||||
|
|
||||||
// SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont);
|
int count = 0;
|
||||||
// char* msg = (char*)pHead;
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
// int len = rpcMsgLenFromCont(pCtx->contLen);
|
queue* h = QUEUE_HEAD(&wq);
|
||||||
// tmsg_t msgType = pCtx->msgType;
|
QUEUE_REMOVE(h);
|
||||||
|
pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
// impl later
|
clientHandleReq(pMsg, pThrd);
|
||||||
|
count++;
|
||||||
|
if (count >= 2) {
|
||||||
|
tError("send batch size: %d", count);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* clientThread(void* arg) {
|
static void* clientThread(void* arg) {
|
||||||
|
@ -142,9 +211,6 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
||||||
|
|
||||||
// QUEUE_INIT(&pThrd->clientCache);
|
|
||||||
|
|
||||||
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(pThrd->loop);
|
uv_loop_init(pThrd->loop);
|
||||||
|
|
||||||
|
@ -186,6 +252,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
}
|
}
|
||||||
SCliMsg* msg = malloc(sizeof(SCliMsg));
|
SCliMsg* msg = malloc(sizeof(SCliMsg));
|
||||||
msg->context = pContext;
|
msg->context = pContext;
|
||||||
|
msg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||||
|
|
||||||
|
|
|
@ -277,10 +277,6 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (terrno != 0) {
|
|
||||||
// handle err code
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nread != UV_EOF) {
|
if (nread != UV_EOF) {
|
||||||
tDebug("Read error %s\n", uv_err_name(nread));
|
tDebug("Read error %s\n", uv_err_name(nread));
|
||||||
}
|
}
|
||||||
|
@ -309,21 +305,23 @@ void uvOnWriteCb(uv_write_t* req, int status) {
|
||||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync);
|
SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync);
|
||||||
SConn* conn = NULL;
|
SConn* conn = NULL;
|
||||||
|
queue wq;
|
||||||
// opt later
|
// batch process to avoid to lock/unlock frequently
|
||||||
pthread_mutex_lock(&pThrd->connMtx);
|
pthread_mutex_lock(&pThrd->connMtx);
|
||||||
if (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
QUEUE_MOVE(&pThrd->conn, &wq);
|
||||||
queue* head = QUEUE_HEAD(&pThrd->conn);
|
|
||||||
conn = QUEUE_DATA(head, SConn, queue);
|
|
||||||
QUEUE_REMOVE(head);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&pThrd->connMtx);
|
pthread_mutex_unlock(&pThrd->connMtx);
|
||||||
if (conn == NULL) {
|
|
||||||
tError("except occurred, do nothing");
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
return;
|
queue* head = QUEUE_HEAD(&wq);
|
||||||
|
QUEUE_REMOVE(head);
|
||||||
|
SConn* conn = QUEUE_DATA(head, SConn, queue);
|
||||||
|
if (conn == NULL) {
|
||||||
|
tError("except occurred, do nothing");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
|
||||||
|
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||||
}
|
}
|
||||||
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
|
|
||||||
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
|
@ -34,8 +34,8 @@ typedef struct {
|
||||||
|
|
||||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
// tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
pMsg->code);
|
// pMsg->code);
|
||||||
|
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ static void *sendRequest(void *param) {
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
rpcMsg.ahandle = pInfo;
|
rpcMsg.ahandle = pInfo;
|
||||||
rpcMsg.msgType = 1;
|
rpcMsg.msgType = 1;
|
||||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
tsem_wait(&pInfo->rspSem);
|
tsem_wait(&pInfo->rspSem);
|
||||||
|
|
Loading…
Reference in New Issue