Merge pull request #10246 from taosdata/feature/trans_impl
refactor code
This commit is contained in:
commit
11e25c9c66
|
@ -233,7 +233,7 @@ typedef struct {
|
||||||
uv_async_t* asyncs;
|
uv_async_t* asyncs;
|
||||||
} SAsyncPool;
|
} SAsyncPool;
|
||||||
|
|
||||||
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb);
|
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
|
||||||
void transDestroyAsyncPool(SAsyncPool* pool);
|
void transDestroyAsyncPool(SAsyncPool* pool);
|
||||||
int transSendAsync(SAsyncPool* pool, queue* mq);
|
int transSendAsync(SAsyncPool* pool, queue* mq);
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,11 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||||
}
|
}
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
// pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
} else {
|
||||||
|
pRpc->numOfThreads = pInit->numOfThreads;
|
||||||
|
}
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->idleTime = pInit->idleTime;
|
pRpc->idleTime = pInit->idleTime;
|
||||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
|
|
|
@ -38,6 +38,7 @@ typedef struct SCliConn {
|
||||||
int32_t ref;
|
int32_t ref;
|
||||||
// debug and log info
|
// debug and log info
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
struct sockaddr_in locaddr;
|
||||||
} SCliConn;
|
} SCliConn;
|
||||||
|
|
||||||
typedef struct SCliMsg {
|
typedef struct SCliMsg {
|
||||||
|
@ -130,8 +131,9 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
|
|
||||||
tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr),
|
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
|
||||||
ntohs(conn->addr.sin_port));
|
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
|
||||||
|
ntohs(conn->locaddr.sin_port));
|
||||||
|
|
||||||
if (conn->push != NULL && conn->notifyCount != 0) {
|
if (conn->push != NULL && conn->notifyCount != 0) {
|
||||||
(*conn->push->callback)(conn->push->arg, &rpcMsg);
|
(*conn->push->callback)(conn->push->arg, &rpcMsg);
|
||||||
|
@ -417,8 +419,9 @@ static void clientWrite(SCliConn* pConn) {
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
|
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||||
ntohs(pConn->addr.sin_port));
|
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port));
|
||||||
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
||||||
}
|
}
|
||||||
static void clientConnCb(uv_connect_t* req, int status) {
|
static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
|
@ -433,6 +436,9 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
int addrlen = sizeof(pConn->addr);
|
int addrlen = sizeof(pConn->addr);
|
||||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
|
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
|
||||||
|
|
||||||
|
addrlen = sizeof(pConn->locaddr);
|
||||||
|
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
|
||||||
|
|
||||||
tTrace("client conn %p create", pConn);
|
tTrace("client conn %p create", pConn);
|
||||||
|
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
|
@ -579,7 +585,7 @@ static SCliThrdObj* createThrdObj() {
|
||||||
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(pThrd->loop);
|
uv_loop_init(pThrd->loop);
|
||||||
|
|
||||||
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb);
|
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb);
|
||||||
|
|
||||||
pThrd->timer = malloc(sizeof(uv_timer_t));
|
pThrd->timer = malloc(sizeof(uv_timer_t));
|
||||||
uv_timer_init(pThrd->loop, pThrd->timer);
|
uv_timer_init(pThrd->loop, pThrd->timer);
|
||||||
|
|
|
@ -250,9 +250,7 @@ int transDestroyBuffer(SConnBuffer* buf) {
|
||||||
transClearBuffer(buf);
|
transClearBuffer(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) {
|
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
|
||||||
static int sz = 10;
|
|
||||||
|
|
||||||
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
|
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
|
||||||
pool->index = 0;
|
pool->index = 0;
|
||||||
pool->nAsync = sz;
|
pool->nAsync = sz;
|
||||||
|
|
|
@ -31,9 +31,11 @@ typedef struct SSrvConn {
|
||||||
void* pTransInst; // rpc init
|
void* pTransInst; // rpc init
|
||||||
void* ahandle; //
|
void* ahandle; //
|
||||||
void* hostThrd;
|
void* hostThrd;
|
||||||
void* pSrvMsg;
|
SArray* srvMsgs;
|
||||||
|
// void* pSrvMsg;
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
struct sockaddr_in locaddr;
|
||||||
|
|
||||||
// SRpcMsg sendMsg;
|
// SRpcMsg sendMsg;
|
||||||
// del later
|
// del later
|
||||||
|
@ -94,6 +96,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
|
||||||
static void uvWorkerAsyncCb(uv_async_t* handle);
|
static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
static void uvAcceptAsyncCb(uv_async_t* handle);
|
static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
|
|
||||||
|
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
|
|
||||||
|
@ -263,8 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
|
|
||||||
transClearBuffer(&pConn->readBuf);
|
transClearBuffer(&pConn->readBuf);
|
||||||
pConn->ref++;
|
pConn->ref++;
|
||||||
tDebug("server conn %p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType),
|
||||||
ntohs(pConn->addr.sin_port));
|
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port));
|
||||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||||
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||||
// auth
|
// auth
|
||||||
|
@ -310,14 +314,19 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||||
|
|
||||||
void uvOnWriteCb(uv_write_t* req, int status) {
|
void uvOnWriteCb(uv_write_t* req, int status) {
|
||||||
SSrvConn* conn = req->data;
|
SSrvConn* conn = req->data;
|
||||||
|
|
||||||
SSrvMsg* smsg = conn->pSrvMsg;
|
|
||||||
destroySmsg(smsg);
|
|
||||||
conn->pSrvMsg = NULL;
|
|
||||||
|
|
||||||
transClearBuffer(&conn->readBuf);
|
transClearBuffer(&conn->readBuf);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
tTrace("server conn %p data already was written on stream", conn);
|
tTrace("server conn %p data already was written on stream", conn);
|
||||||
|
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
|
||||||
|
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
||||||
|
taosArrayRemove(conn->srvMsgs, 0);
|
||||||
|
destroySmsg(msg);
|
||||||
|
|
||||||
|
// send second data, just use for push
|
||||||
|
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
||||||
|
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
|
||||||
|
uvStartSendRespInternal(msg);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
|
tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
|
||||||
//
|
//
|
||||||
|
@ -354,27 +363,37 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
if (transCompressMsg(msg, len, NULL)) {
|
if (transCompressMsg(msg, len, NULL)) {
|
||||||
// impl later
|
// impl later
|
||||||
}
|
}
|
||||||
tDebug("server conn %p %s is sent to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
||||||
ntohs(pConn->addr.sin_port));
|
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port));
|
||||||
|
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
}
|
}
|
||||||
static void uvStartSendResp(SSrvMsg* smsg) {
|
|
||||||
// impl
|
static void uvStartSendRespInternal(SSrvMsg* smsg) {
|
||||||
uv_buf_t wb;
|
uv_buf_t wb;
|
||||||
uvPrepareSendData(smsg, &wb);
|
uvPrepareSendData(smsg, &wb);
|
||||||
|
|
||||||
SSrvConn* pConn = smsg->pConn;
|
SSrvConn* pConn = smsg->pConn;
|
||||||
uv_timer_stop(pConn->pTimer);
|
uv_timer_stop(pConn->pTimer);
|
||||||
|
|
||||||
pConn->pSrvMsg = smsg;
|
// pConn->pSrvMsg = smsg;
|
||||||
// conn->pWriter->data = smsg;
|
// conn->pWriter->data = smsg;
|
||||||
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
||||||
|
}
|
||||||
// SRpcMsg* rpcMsg = smsg->msg;
|
static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
|
// impl
|
||||||
|
SSrvConn* pConn = smsg->pConn;
|
||||||
|
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
|
||||||
|
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
||||||
|
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
|
taosArrayPush(pConn->srvMsgs, &smsg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taosArrayPush(pConn->srvMsgs, &smsg);
|
||||||
|
uvStartSendRespInternal(smsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static void destroySmsg(SSrvMsg* smsg) {
|
static void destroySmsg(SSrvMsg* smsg) {
|
||||||
|
@ -496,13 +515,23 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||||
tTrace("server conn %p created, fd: %d", pConn, fd);
|
tTrace("server conn %p created, fd: %d", pConn, fd);
|
||||||
|
|
||||||
int addrlen = sizeof(pConn->addr);
|
int addrlen = sizeof(pConn->addr);
|
||||||
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
||||||
tError("server conn %p failed to get peer info", pConn);
|
tError("server conn %p failed to get peer info", pConn);
|
||||||
destroyConn(pConn, true);
|
destroyConn(pConn, true);
|
||||||
} else {
|
return;
|
||||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addrlen = sizeof(pConn->locaddr);
|
||||||
|
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) {
|
||||||
|
tError("server conn %p failed to get local info", pConn);
|
||||||
|
destroyConn(pConn, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("failed to create new connection");
|
tDebug("failed to create new connection");
|
||||||
destroyConn(pConn, true);
|
destroyConn(pConn, true);
|
||||||
|
@ -531,7 +560,7 @@ static bool addHandleToWorkloop(void* arg) {
|
||||||
QUEUE_INIT(&pThrd->msg);
|
QUEUE_INIT(&pThrd->msg);
|
||||||
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
||||||
|
|
||||||
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb);
|
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
|
||||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -571,6 +600,7 @@ void* workerThread(void* arg) {
|
||||||
|
|
||||||
static SSrvConn* createConn() {
|
static SSrvConn* createConn() {
|
||||||
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
|
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
|
||||||
|
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
|
||||||
tTrace("conn %p created", pConn);
|
tTrace("conn %p created", pConn);
|
||||||
++pConn->ref;
|
++pConn->ref;
|
||||||
return pConn;
|
return pConn;
|
||||||
|
@ -585,8 +615,15 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
destroySmsg(conn->pSrvMsg);
|
|
||||||
conn->pSrvMsg = NULL;
|
for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) {
|
||||||
|
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
|
||||||
|
destroySmsg(msg);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(conn->srvMsgs);
|
||||||
|
|
||||||
|
// destroySmsg(conn->pSrvMsg);
|
||||||
|
// conn->pSrvMsg = NULL;
|
||||||
|
|
||||||
if (clear) {
|
if (clear) {
|
||||||
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
||||||
|
|
|
@ -77,7 +77,7 @@ void processShellMsg() {
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
|
|
||||||
{
|
{
|
||||||
sleep(1);
|
// sleep(1);
|
||||||
SRpcMsg nRpcMsg = {0};
|
SRpcMsg nRpcMsg = {0};
|
||||||
nRpcMsg.pCont = rpcMallocCont(msgSize);
|
nRpcMsg.pCont = rpcMallocCont(msgSize);
|
||||||
nRpcMsg.contLen = msgSize;
|
nRpcMsg.contLen = msgSize;
|
||||||
|
|
Loading…
Reference in New Issue