Merge pull request #10712 from taosdata/feature/supportQuery
handle persist conn
This commit is contained in:
commit
a5d8b20e19
|
@ -47,8 +47,6 @@ typedef struct SRpcMsg {
|
||||||
void * ahandle; // app handle set by client
|
void * ahandle; // app handle set by client
|
||||||
int persist; // keep handle or not, default 0
|
int persist; // keep handle or not, default 0
|
||||||
|
|
||||||
SRpcPush *push;
|
|
||||||
|
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef struct SRpcPush {
|
typedef struct SRpcPush {
|
||||||
|
|
|
@ -17,11 +17,6 @@
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
|
||||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
|
||||||
|
|
||||||
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
|
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
|
@ -32,7 +27,6 @@ typedef struct SCliConn {
|
||||||
void* data;
|
void* data;
|
||||||
queue conn;
|
queue conn;
|
||||||
uint64_t expireTime;
|
uint64_t expireTime;
|
||||||
int8_t ctnRdCnt; // continue read count
|
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
bool broken; // link broken or not
|
bool broken; // link broken or not
|
||||||
|
|
||||||
|
@ -92,9 +86,9 @@ static void clientTimeoutCb(uv_timer_t* handle);
|
||||||
// alloc buf for read
|
// alloc buf for read
|
||||||
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
// callback after read nbytes from socket
|
// callback after read nbytes from socket
|
||||||
static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
// callback after write data to socket
|
// callback after write data to socket
|
||||||
static void clientWriteCb(uv_write_t* req, int status);
|
static void clientSendDataCb(uv_write_t* req, int status);
|
||||||
// callback after conn to server
|
// callback after conn to server
|
||||||
static void clientConnCb(uv_connect_t* req, int status);
|
static void clientConnCb(uv_connect_t* req, int status);
|
||||||
static void clientAsyncCb(uv_async_t* handle);
|
static void clientAsyncCb(uv_async_t* handle);
|
||||||
|
@ -120,7 +114,27 @@ static void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
// thread obj
|
// thread obj
|
||||||
static SCliThrdObj* createThrdObj();
|
static SCliThrdObj* createThrdObj();
|
||||||
static void destroyThrdObj(SCliThrdObj* pThrd);
|
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
// thread
|
|
||||||
|
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||||
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||||
|
|
||||||
|
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
|
||||||
|
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
||||||
|
do { \
|
||||||
|
if (thrd->quit) { \
|
||||||
|
clientHandleExcept(conn); \
|
||||||
|
goto _RETURE; \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define CONN_HANDLE_BROKEN(conn) \
|
||||||
|
do { \
|
||||||
|
if (conn->broken) { \
|
||||||
|
clientHandleExcept(conn); \
|
||||||
|
goto _RETURE; \
|
||||||
|
} \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
static void* clientThread(void* arg);
|
static void* clientThread(void* arg);
|
||||||
|
|
||||||
static void* clientNotifyApp() {}
|
static void* clientNotifyApp() {}
|
||||||
|
@ -147,6 +161,8 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
|
if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
|
||||||
rpcMsg.handle = conn;
|
rpcMsg.handle = conn;
|
||||||
|
transRefCliHandle(conn);
|
||||||
|
|
||||||
conn->persist = 1;
|
conn->persist = 1;
|
||||||
tDebug("client conn %p persist by app", conn);
|
tDebug("client conn %p persist by app", conn);
|
||||||
}
|
}
|
||||||
|
@ -165,9 +181,8 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
conn->ctnRdCnt += 1;
|
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb);
|
||||||
|
|
||||||
// user owns conn->persist = 1
|
// user owns conn->persist = 1
|
||||||
if (conn->persist == 0) {
|
if (conn->persist == 0) {
|
||||||
|
@ -290,7 +305,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
|
|
||||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
conn->ctnRdCnt = 0;
|
|
||||||
// list already create before
|
// list already create before
|
||||||
assert(plist != NULL);
|
assert(plist != NULL);
|
||||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||||
|
@ -300,7 +314,7 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
transAllocBuffer(pBuf, buf);
|
transAllocBuffer(pBuf, buf);
|
||||||
}
|
}
|
||||||
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
// impl later
|
// impl later
|
||||||
if (handle->data == NULL) {
|
if (handle->data == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -363,7 +377,7 @@ static void clientDestroy(uv_handle_t* handle) {
|
||||||
free(conn);
|
free(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientWriteCb(uv_write_t* req, int status) {
|
static void clientSendDataCb(uv_write_t* req, int status) {
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
|
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
|
@ -378,10 +392,12 @@ static void clientWriteCb(uv_write_t* req, int status) {
|
||||||
clientHandleExcept(pConn);
|
clientHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
|
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientWrite(SCliConn* pConn) {
|
static void clientSendData(SCliConn* pConn) {
|
||||||
|
CONN_HANDLE_BROKEN(pConn);
|
||||||
|
|
||||||
SCliMsg* pCliMsg = pConn->data;
|
SCliMsg* pCliMsg = pConn->data;
|
||||||
STransConnCtx* pCtx = pCliMsg->ctx;
|
STransConnCtx* pCtx = pCliMsg->ctx;
|
||||||
|
|
||||||
|
@ -420,7 +436,11 @@ static void clientWrite(SCliConn* pConn) {
|
||||||
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
|
|
||||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
|
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb);
|
||||||
|
|
||||||
|
return;
|
||||||
|
_RETURE:
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
static void clientConnCb(uv_connect_t* req, int status) {
|
static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
// impl later
|
// impl later
|
||||||
|
@ -439,7 +459,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
|
||||||
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
|
||||||
assert(pConn->stream == req->handle);
|
assert(pConn->stream == req->handle);
|
||||||
clientWrite(pConn);
|
clientSendData(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
@ -452,6 +472,21 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
|
static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
SCliConn* conn = NULL;
|
||||||
|
if (pMsg->msg.handle != NULL) {
|
||||||
|
conn = (SCliConn*)(pMsg->msg.handle);
|
||||||
|
transUnrefCliHandle(conn);
|
||||||
|
if (conn != NULL) {
|
||||||
|
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
|
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
}
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
uint64_t et = taosGetTimestampUs();
|
uint64_t et = taosGetTimestampUs();
|
||||||
uint64_t el = et - pMsg->st;
|
uint64_t el = et - pMsg->st;
|
||||||
|
@ -459,28 +494,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
SRpcInfo* pTransInst = pThrd->pTransInst;
|
SRpcInfo* pTransInst = pThrd->pTransInst;
|
||||||
SCliConn* conn = NULL;
|
|
||||||
|
|
||||||
if (pMsg->msg.handle != NULL) {
|
|
||||||
conn = (SCliConn*)(pMsg->msg.handle);
|
|
||||||
if (conn != NULL) {
|
|
||||||
tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
|
||||||
if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
SCliConn* conn = clientGetConn(pMsg, pThrd);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
conn->writeReq.data = conn;
|
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
clientSendData(conn);
|
||||||
if (pThrd->quit) {
|
|
||||||
clientHandleExcept(conn);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
clientWrite(conn);
|
|
||||||
} else {
|
} else {
|
||||||
conn = clientConnCreate(pThrd);
|
conn = clientConnCreate(pThrd);
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
@ -495,8 +514,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
conn->ctnRdCnt = 0;
|
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
}
|
}
|
||||||
static void clientAsyncCb(uv_async_t* handle) {
|
static void clientAsyncCb(uv_async_t* handle) {
|
||||||
|
|
|
@ -92,9 +92,9 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
|
||||||
|
|
||||||
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void uvOnTimeoutCb(uv_timer_t* handle);
|
static void uvOnTimeoutCb(uv_timer_t* handle);
|
||||||
static void uvOnWriteCb(uv_write_t* req, int status);
|
static void uvOnSendCb(uv_write_t* req, int status);
|
||||||
static void uvOnPipeWriteCb(uv_write_t* req, int status);
|
static void uvOnPipeWriteCb(uv_write_t* req, int status);
|
||||||
static void uvOnAcceptCb(uv_stream_t* stream, int status);
|
static void uvOnAcceptCb(uv_stream_t* stream, int status);
|
||||||
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||||
|
@ -240,7 +240,7 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
// validate msg type
|
// validate msg type
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
// opt
|
// opt
|
||||||
SSrvConn* conn = cli->data;
|
SSrvConn* conn = cli->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
|
@ -282,7 +282,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||||
tError("server conn %p time out", pConn);
|
tError("server conn %p time out", pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvOnWriteCb(uv_write_t* req, int status) {
|
void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
SSrvConn* conn = req->data;
|
SSrvConn* conn = req->data;
|
||||||
transClearBuffer(&conn->readBuf);
|
transClearBuffer(&conn->readBuf);
|
||||||
if (status == 0) {
|
if (status == 0) {
|
||||||
|
@ -350,7 +350,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
|
||||||
|
|
||||||
SSrvConn* pConn = smsg->pConn;
|
SSrvConn* pConn = smsg->pConn;
|
||||||
uv_timer_stop(&pConn->pTimer);
|
uv_timer_stop(&pConn->pTimer);
|
||||||
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
|
||||||
}
|
}
|
||||||
static void uvStartSendResp(SSrvMsg* smsg) {
|
static void uvStartSendResp(SSrvMsg* smsg) {
|
||||||
// impl
|
// impl
|
||||||
|
@ -526,7 +526,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("failed to create new connection");
|
tDebug("failed to create new connection");
|
||||||
|
@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
uv_timer_stop(&conn->pTimer);
|
uv_timer_stop(&conn->pTimer);
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
free(conn->pTcp);
|
free(conn->pTcp);
|
||||||
// free(conn);
|
free(conn);
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
|
|
|
@ -82,7 +82,7 @@ static void *sendRequest(void *param) {
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
rpcMsg.ahandle = pInfo;
|
rpcMsg.ahandle = pInfo;
|
||||||
rpcMsg.msgType = 1;
|
rpcMsg.msgType = 1;
|
||||||
rpcMsg.push = push;
|
// rpcMsg.push = push;
|
||||||
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
int64_t start = taosGetTimestampUs();
|
int64_t start = taosGetTimestampUs();
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
|
|
Loading…
Reference in New Issue