Merge remote-tracking branch 'origin/3.0' into feature/mnode
This commit is contained in:
commit
0d7042254a
|
@ -202,6 +202,8 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
|
|||
void transConnCtxDestroy(STransConnCtx* ctx);
|
||||
|
||||
void transFreeMsg(void* msg);
|
||||
|
||||
//
|
||||
typedef struct SConnBuffer {
|
||||
char* buf;
|
||||
int len;
|
||||
|
@ -209,4 +211,9 @@ typedef struct SConnBuffer {
|
|||
int left;
|
||||
} SConnBuffer;
|
||||
|
||||
int transInitBuffer(SConnBuffer* buf);
|
||||
int transClearBuffer(SConnBuffer* buf);
|
||||
int transDestroyBuffer(SConnBuffer* buf);
|
||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -30,7 +30,8 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||
}
|
||||
pRpc->cfp = pInit->cfp;
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
// pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
pRpc->numOfThreads = pInit->numOfThreads;
|
||||
pRpc->connType = pInit->connType;
|
||||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
@ -55,7 +56,13 @@ void* rpcMallocCont(int contLen) {
|
|||
}
|
||||
return start + sizeof(STransMsgHead);
|
||||
}
|
||||
void rpcFreeCont(void* cont) { return; }
|
||||
void rpcFreeCont(void* cont) {
|
||||
// impl
|
||||
if (cont == NULL) {
|
||||
return;
|
||||
}
|
||||
free((char*)cont - TRANS_MSG_OVERHEAD);
|
||||
}
|
||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
||||
|
||||
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
|
||||
|
|
|
@ -31,6 +31,7 @@ typedef struct SCliConn {
|
|||
char secured;
|
||||
uint64_t expireTime;
|
||||
int8_t notifyCount; // timers already notify to client
|
||||
int32_t ref;
|
||||
} SCliConn;
|
||||
|
||||
typedef struct SCliMsg {
|
||||
|
@ -112,16 +113,21 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
rpcMsg.pCont = transContFromHead(pHead);
|
||||
rpcMsg.pCont = transContFromHead((char*)pHead);
|
||||
rpcMsg.code = pHead->code;
|
||||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
|
||||
tDebug("conn %p handle resp", conn);
|
||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||
conn->notifyCount += 1;
|
||||
|
||||
SCliThrdObj* pThrd = conn->hostThrd;
|
||||
tfree(conn->data);
|
||||
// buf alread translated to rpcMsg.pCont
|
||||
transClearBuffer(&conn->readBuf);
|
||||
|
||||
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
|
||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||
|
||||
// start thread's timer of conn pool if not active
|
||||
|
@ -131,14 +137,18 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
destroyTransConnCtx(pCtx);
|
||||
}
|
||||
static void clientHandleExcept(SCliConn* pConn) {
|
||||
if (pConn->data == NULL) {
|
||||
clientConnDestroy(pConn, true);
|
||||
return;
|
||||
}
|
||||
tDebug("conn %p destroy", pConn);
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
transFreeMsg((pMsg->msg.pCont));
|
||||
pMsg->msg.pCont = NULL;
|
||||
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
SRpcInfo* pRpc = pCtx->pTransInst;
|
||||
|
||||
transFreeMsg((pMsg->msg.pCont));
|
||||
pMsg->msg.pCont = NULL;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
rpcMsg.code = -1;
|
||||
|
@ -213,12 +223,17 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
|||
}
|
||||
queue* h = QUEUE_HEAD(&plist->conn);
|
||||
QUEUE_REMOVE(h);
|
||||
return QUEUE_DATA(h, SCliConn, conn);
|
||||
|
||||
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
|
||||
QUEUE_INIT(&conn->conn);
|
||||
return conn;
|
||||
}
|
||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||
char key[128] = {0};
|
||||
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
|
||||
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime);
|
||||
|
@ -237,40 +252,18 @@ static bool clientReadComplete(SConnBuffer* data) {
|
|||
if (msgLen > data->len) {
|
||||
data->left = msgLen - data->len;
|
||||
return false;
|
||||
} else {
|
||||
} else if (msgLen == data->len) {
|
||||
data->left = 0;
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
// impl later
|
||||
static const int CAPACITY = 512;
|
||||
|
||||
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
SCliConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (pBuf->cap == 0) {
|
||||
pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char));
|
||||
pBuf->len = 0;
|
||||
pBuf->cap = CAPACITY;
|
||||
pBuf->left = -1;
|
||||
|
||||
buf->base = pBuf->buf;
|
||||
buf->len = CAPACITY;
|
||||
} else {
|
||||
if (pBuf->len >= pBuf->cap) {
|
||||
if (pBuf->left == -1) {
|
||||
pBuf->cap *= 2;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
||||
pBuf->cap = pBuf->len + pBuf->left;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||
}
|
||||
}
|
||||
buf->base = pBuf->buf + pBuf->len;
|
||||
buf->len = pBuf->cap - pBuf->len;
|
||||
}
|
||||
transAllocBuffer(pBuf, buf);
|
||||
}
|
||||
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||
// impl later
|
||||
|
@ -279,6 +272,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
if (clientReadComplete(pBuf)) {
|
||||
uv_read_stop((uv_stream_t*)conn->stream);
|
||||
tDebug("conn %p read complete", conn);
|
||||
clientHandleResp(conn);
|
||||
} else {
|
||||
|
@ -288,10 +282,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
}
|
||||
assert(nread <= 0);
|
||||
if (nread == 0) {
|
||||
tError("conn %p closed", conn);
|
||||
return;
|
||||
}
|
||||
if (nread < 0) {
|
||||
if (nread < 0 || nread == UV_EOF) {
|
||||
tError("conn %p read error: %s", conn, uv_err_name(nread));
|
||||
clientHandleExcept(conn);
|
||||
}
|
||||
|
@ -300,43 +293,46 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
|
|||
}
|
||||
|
||||
static void clientConnDestroy(SCliConn* conn, bool clear) {
|
||||
tDebug("conn %p destroy", conn);
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->stream, NULL);
|
||||
//
|
||||
conn->ref--;
|
||||
if (conn->ref == 0) {
|
||||
tDebug("conn %p remove from conn pool", conn);
|
||||
QUEUE_REMOVE(&conn->conn);
|
||||
tDebug("conn %p remove from conn pool successfully", conn);
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->stream, clientDestroy);
|
||||
}
|
||||
}
|
||||
free(conn->stream);
|
||||
free(conn->readBuf.buf);
|
||||
free(conn->writeReq);
|
||||
free(conn);
|
||||
}
|
||||
static void clientDestroy(uv_handle_t* handle) {
|
||||
SCliConn* conn = handle->data;
|
||||
// QUEUE_REMOVE(&conn->conn);
|
||||
clientConnDestroy(conn, false);
|
||||
// transDestroyBuffer(&conn->readBuf);
|
||||
|
||||
free(conn->stream);
|
||||
free(conn->writeReq);
|
||||
tDebug("conn %p destroy successfully", conn);
|
||||
free(conn);
|
||||
|
||||
// clientConnDestroy(conn, false);
|
||||
}
|
||||
|
||||
static void clientWriteCb(uv_write_t* req, int status) {
|
||||
SCliConn* pConn = req->data;
|
||||
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
transFreeMsg((pMsg->msg.pCont));
|
||||
pMsg->msg.pCont = NULL;
|
||||
|
||||
if (status == 0) {
|
||||
tDebug("conn %p data already was written out", pConn);
|
||||
SCliMsg* pMsg = pConn->data;
|
||||
if (pMsg != NULL) {
|
||||
transFreeMsg((pMsg->msg.pCont));
|
||||
pMsg->msg.pCont = NULL;
|
||||
}
|
||||
|
||||
} else {
|
||||
tError("conn %p failed to write: %s", pConn, uv_err_name(status));
|
||||
clientHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
SCliThrdObj* pThrd = pConn->hostThrd;
|
||||
// if (pConn->stream == NULL) {
|
||||
// pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||
// uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
|
||||
// pConn->stream->data = pConn;
|
||||
//}
|
||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb);
|
||||
// impl later
|
||||
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
|
||||
}
|
||||
|
||||
static void clientWrite(SCliConn* pConn) {
|
||||
|
@ -381,14 +377,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
tDebug("conn %p get from conn pool", conn);
|
||||
conn->data = pMsg;
|
||||
conn->writeReq->data = conn;
|
||||
|
||||
conn->readBuf.len = 0;
|
||||
memset(conn->readBuf.buf, 0, conn->readBuf.cap);
|
||||
conn->readBuf.left = -1;
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
clientWrite(conn);
|
||||
} else {
|
||||
SCliConn* conn = calloc(1, sizeof(SCliConn));
|
||||
|
||||
conn->ref++;
|
||||
// read/write stream handle
|
||||
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
|
||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||
|
@ -397,6 +390,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
// write req handle
|
||||
conn->writeReq = malloc(sizeof(uv_write_t));
|
||||
conn->writeReq->data = conn;
|
||||
|
||||
QUEUE_INIT(&conn->conn);
|
||||
|
||||
conn->connReq.data = conn;
|
||||
|
|
|
@ -198,4 +198,51 @@ void transFreeMsg(void* msg) {
|
|||
}
|
||||
free((char*)msg - sizeof(STransMsgHead));
|
||||
}
|
||||
|
||||
int transInitBuffer(SConnBuffer* buf) {
|
||||
transClearBuffer(buf);
|
||||
return 0;
|
||||
}
|
||||
int transClearBuffer(SConnBuffer* buf) {
|
||||
memset(buf, 0, sizeof(*buf));
|
||||
return 0;
|
||||
}
|
||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
|
||||
/*
|
||||
* formate of data buffer:
|
||||
* |<--------------------------data from socket------------------------------->|
|
||||
* |<------STransMsgHead------->|<-------------------other data--------------->|
|
||||
*/
|
||||
static const int CAPACITY = 1024;
|
||||
|
||||
SConnBuffer* p = connBuf;
|
||||
if (p->cap == 0) {
|
||||
p->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||
p->len = 0;
|
||||
p->cap = CAPACITY;
|
||||
p->left = -1;
|
||||
|
||||
uvBuf->base = p->buf;
|
||||
uvBuf->len = CAPACITY;
|
||||
} else {
|
||||
if (p->len >= p->cap) {
|
||||
if (p->left == -1) {
|
||||
p->cap *= 2;
|
||||
p->buf = realloc(p->buf, p->cap);
|
||||
} else if (p->len + p->left > p->cap) {
|
||||
p->cap = p->len + p->left;
|
||||
p->buf = realloc(p->buf, p->len + p->left);
|
||||
}
|
||||
}
|
||||
uvBuf->base = p->buf + p->len;
|
||||
uvBuf->len = p->cap - p->len;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int transDestroyBuffer(SConnBuffer* buf) {
|
||||
if (buf->cap > 0) {
|
||||
tfree(buf->buf);
|
||||
}
|
||||
transClearBuffer(buf);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
#include "transComm.h"
|
||||
|
||||
typedef struct SConn {
|
||||
typedef struct SSrvConn {
|
||||
uv_tcp_t* pTcp;
|
||||
uv_write_t* pWriter;
|
||||
uv_timer_t* pTimer;
|
||||
|
@ -26,13 +26,14 @@ typedef struct SConn {
|
|||
queue queue;
|
||||
int ref;
|
||||
int persist; // persist connection or not
|
||||
SConnBuffer connBuf; // read buf,
|
||||
SConnBuffer readBuf; // read buf,
|
||||
int inType;
|
||||
void* pTransInst; // rpc init
|
||||
void* ahandle; //
|
||||
void* hostThrd;
|
||||
void* pSrvMsg;
|
||||
|
||||
SRpcMsg sendMsg;
|
||||
// SRpcMsg sendMsg;
|
||||
// del later
|
||||
char secured;
|
||||
int spi;
|
||||
|
@ -40,7 +41,13 @@ typedef struct SConn {
|
|||
char user[TSDB_UNI_LEN]; // user ID for the link
|
||||
char secret[TSDB_PASSWORD_LEN];
|
||||
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
||||
} SConn;
|
||||
} SSrvConn;
|
||||
|
||||
typedef struct SSrvMsg {
|
||||
SSrvConn* pConn;
|
||||
SRpcMsg msg;
|
||||
queue q;
|
||||
} SSrvMsg;
|
||||
|
||||
typedef struct SWorkThrdObj {
|
||||
pthread_t thread;
|
||||
|
@ -48,8 +55,8 @@ typedef struct SWorkThrdObj {
|
|||
int fd;
|
||||
uv_loop_t* loop;
|
||||
uv_async_t* workerAsync; //
|
||||
queue conn;
|
||||
pthread_mutex_t connMtx;
|
||||
queue msg;
|
||||
pthread_mutex_t msgMtx;
|
||||
void* pTransInst;
|
||||
} SWorkThrdObj;
|
||||
|
||||
|
@ -68,9 +75,9 @@ typedef struct SServerObj {
|
|||
static const char* notify = "a";
|
||||
|
||||
// refactor later
|
||||
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen);
|
||||
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
|
||||
|
||||
static int uvAuthMsg(SConn* pConn, char* msg, int msgLen);
|
||||
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
|
||||
|
||||
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||
|
@ -82,12 +89,13 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
|
|||
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||
static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||
|
||||
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb);
|
||||
|
||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||
static void uvStartSendResp(SSrvMsg* msg);
|
||||
static void destroySrvMsg(SSrvConn* conn);
|
||||
// check whether already read complete packet
|
||||
static bool readComplete(SConnBuffer* buf);
|
||||
static SConn* createConn();
|
||||
static void destroyConn(SConn* conn, bool clear /*clear handle or not*/);
|
||||
static bool readComplete(SConnBuffer* buf);
|
||||
static SSrvConn* createConn();
|
||||
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
||||
|
||||
static void uvDestroyConn(uv_handle_t* handle);
|
||||
|
||||
|
@ -105,31 +113,9 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
|||
* |<--------------------------data from socket------------------------------->|
|
||||
* |<------STransMsgHead------->|<-------------------other data--------------->|
|
||||
*/
|
||||
static const int CAPACITY = 1024;
|
||||
|
||||
SConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->connBuf;
|
||||
if (pBuf->cap == 0) {
|
||||
pBuf->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||
pBuf->len = 0;
|
||||
pBuf->cap = CAPACITY;
|
||||
pBuf->left = -1;
|
||||
|
||||
buf->base = pBuf->buf;
|
||||
buf->len = CAPACITY;
|
||||
} else {
|
||||
if (pBuf->len >= pBuf->cap) {
|
||||
if (pBuf->left == -1) {
|
||||
pBuf->cap *= 2;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->cap);
|
||||
} else if (pBuf->len + pBuf->left > pBuf->cap) {
|
||||
pBuf->cap = pBuf->len + pBuf->left;
|
||||
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left);
|
||||
}
|
||||
}
|
||||
buf->base = pBuf->buf + pBuf->len;
|
||||
buf->len = pBuf->cap - pBuf->len;
|
||||
}
|
||||
SSrvConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
transAllocBuffer(pBuf, buf);
|
||||
}
|
||||
|
||||
// check data read from socket completely or not
|
||||
|
@ -159,7 +145,7 @@ static bool readComplete(SConnBuffer* data) {
|
|||
// // impl later
|
||||
// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg;
|
||||
// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
|
||||
// SConn* pConn = pRecv->thandle;
|
||||
// SSrvConn* pConn = pRecv->thandle;
|
||||
// tDump(pRecv->msg, pRecv->msgLen);
|
||||
// terrno = 0;
|
||||
// // SRpcReqContext* pContest;
|
||||
|
@ -167,7 +153,7 @@ static bool readComplete(SConnBuffer* data) {
|
|||
// // do auth and check
|
||||
//}
|
||||
|
||||
static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
||||
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
|
||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||
|
||||
int code = 0;
|
||||
|
@ -222,14 +208,14 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
|
|||
|
||||
// refers specifically to query or insert timeout
|
||||
static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
||||
SConn* conn = handle->data;
|
||||
SSrvConn* conn = handle->data;
|
||||
tDebug("%p timeout since no activity", conn);
|
||||
}
|
||||
|
||||
static void uvHandleReq(SConn* pConn) {
|
||||
static void uvHandleReq(SSrvConn* pConn) {
|
||||
SRecvInfo info;
|
||||
SRecvInfo* p = &info;
|
||||
SConnBuffer* pBuf = &pConn->connBuf;
|
||||
SConnBuffer* pBuf = &pConn->readBuf;
|
||||
p->msg = pBuf->buf;
|
||||
p->msgLen = pBuf->len;
|
||||
p->ip = 0;
|
||||
|
@ -255,7 +241,6 @@ static void uvHandleReq(SConn* pConn) {
|
|||
pHead->code = htonl(pHead->code);
|
||||
|
||||
int32_t dlen = 0;
|
||||
SRpcMsg rpcMsg;
|
||||
if (transDecompressMsg(NULL, 0, NULL)) {
|
||||
// add compress later
|
||||
// pHead = rpcDecompressRpcMsg(pHead);
|
||||
|
@ -264,6 +249,8 @@ static void uvHandleReq(SConn* pConn) {
|
|||
// impl later
|
||||
//
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
rpcMsg.pCont = pHead->content;
|
||||
rpcMsg.msgType = pHead->msgType;
|
||||
|
@ -271,6 +258,7 @@ static void uvHandleReq(SConn* pConn) {
|
|||
rpcMsg.ahandle = NULL;
|
||||
rpcMsg.handle = pConn;
|
||||
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
pConn->ref++;
|
||||
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
|
||||
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||
|
@ -280,8 +268,8 @@ static void uvHandleReq(SConn* pConn) {
|
|||
|
||||
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||
// opt
|
||||
SConn* conn = cli->data;
|
||||
SConnBuffer* pBuf = &conn->connBuf;
|
||||
SSrvConn* conn = cli->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
|
||||
|
@ -294,11 +282,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
return;
|
||||
}
|
||||
if (nread == 0) {
|
||||
tDebug("conn %p except read", conn);
|
||||
// destroyConn(conn, true);
|
||||
return;
|
||||
}
|
||||
if (nread != UV_EOF) {
|
||||
if (nread < 0 || nread != UV_EOF) {
|
||||
if (conn->ref > 1) {
|
||||
conn->ref++; // ref > 1 signed that write is in progress
|
||||
}
|
||||
tDebug("conn %p read error: %s", conn, uv_err_name(nread));
|
||||
destroyConn(conn, true);
|
||||
}
|
||||
|
@ -310,25 +299,21 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
|||
|
||||
void uvOnTimeoutCb(uv_timer_t* handle) {
|
||||
// opt
|
||||
SConn* pConn = handle->data;
|
||||
SSrvConn* pConn = handle->data;
|
||||
tDebug("conn %p time out", pConn);
|
||||
}
|
||||
|
||||
void uvOnWriteCb(uv_write_t* req, int status) {
|
||||
SConn* conn = req->data;
|
||||
|
||||
SConnBuffer* buf = &conn->connBuf;
|
||||
buf->len = 0;
|
||||
memset(buf->buf, 0, buf->cap);
|
||||
buf->left = -1;
|
||||
|
||||
SRpcMsg* pMsg = &conn->sendMsg;
|
||||
transFreeMsg(pMsg->pCont);
|
||||
SSrvConn* conn = req->data;
|
||||
SSrvMsg* smsg = conn->pSrvMsg;
|
||||
destroySrvMsg(conn);
|
||||
|
||||
transClearBuffer(&conn->readBuf);
|
||||
if (status == 0) {
|
||||
tDebug("conn %p data already was written on stream", conn);
|
||||
} else {
|
||||
tDebug("conn %p failed to write data, %s", conn, uv_err_name(status));
|
||||
//
|
||||
destroyConn(conn, true);
|
||||
}
|
||||
// opt
|
||||
|
@ -341,16 +326,16 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
|||
}
|
||||
}
|
||||
|
||||
static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
|
||||
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||
// impl later;
|
||||
tDebug("conn %p prepare to send resp", conn);
|
||||
SRpcMsg* pMsg = &conn->sendMsg;
|
||||
tDebug("conn %p prepare to send resp", smsg->pConn);
|
||||
SRpcMsg* pMsg = &smsg->msg;
|
||||
if (pMsg->pCont == 0) {
|
||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||
pHead->msgType = conn->inType + 1;
|
||||
pHead->msgType = smsg->pConn->inType + 1;
|
||||
// add more info
|
||||
char* msg = (char*)pHead;
|
||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||
|
@ -361,28 +346,55 @@ static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) {
|
|||
wb->base = msg;
|
||||
wb->len = len;
|
||||
}
|
||||
static void uvStartSendResp(SSrvMsg* smsg) {
|
||||
// impl
|
||||
uv_buf_t wb;
|
||||
uvPrepareSendData(smsg, &wb);
|
||||
|
||||
SSrvConn* pConn = smsg->pConn;
|
||||
uv_timer_stop(pConn->pTimer);
|
||||
|
||||
pConn->pSrvMsg = smsg;
|
||||
// conn->pWriter->data = smsg;
|
||||
uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb);
|
||||
|
||||
// SRpcMsg* rpcMsg = smsg->msg;
|
||||
|
||||
return;
|
||||
}
|
||||
static void destroySrvMsg(SSrvConn* conn) {
|
||||
SSrvMsg* smsg = conn->pSrvMsg;
|
||||
if (smsg == NULL) {
|
||||
return;
|
||||
}
|
||||
transFreeMsg(smsg->msg.pCont);
|
||||
free(conn->pSrvMsg);
|
||||
conn->pSrvMsg = NULL;
|
||||
}
|
||||
void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||
SWorkThrdObj* pThrd = handle->data;
|
||||
SConn* conn = NULL;
|
||||
SSrvConn* conn = NULL;
|
||||
queue wq;
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
pthread_mutex_lock(&pThrd->connMtx);
|
||||
QUEUE_MOVE(&pThrd->conn, &wq);
|
||||
pthread_mutex_unlock(&pThrd->connMtx);
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_MOVE(&pThrd->msg, &wq);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* head = QUEUE_HEAD(&wq);
|
||||
QUEUE_REMOVE(head);
|
||||
SConn* conn = QUEUE_DATA(head, SConn, queue);
|
||||
if (conn == NULL) {
|
||||
tError("except occurred, do nothing");
|
||||
return;
|
||||
}
|
||||
uv_buf_t wb;
|
||||
uvPrepareSendData(conn, &wb);
|
||||
uv_timer_stop(conn->pTimer);
|
||||
|
||||
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||
SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
|
||||
if (msg == NULL) {
|
||||
tError("except occurred, continue");
|
||||
continue;
|
||||
}
|
||||
uvStartSendResp(msg);
|
||||
// uv_buf_t wb;
|
||||
// uvPrepareSendData(msg, &wb);
|
||||
// uv_timer_stop(conn->pTimer);
|
||||
|
||||
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -435,7 +447,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
||||
assert(pending == UV_TCP);
|
||||
|
||||
SConn* pConn = createConn();
|
||||
SSrvConn* pConn = createConn();
|
||||
|
||||
pConn->pTransInst = pThrd->pTransInst;
|
||||
/* init conn timer*/
|
||||
|
@ -484,8 +496,8 @@ static bool addHandleToWorkloop(void* arg) {
|
|||
|
||||
pThrd->pipe->data = pThrd;
|
||||
|
||||
QUEUE_INIT(&pThrd->conn);
|
||||
pthread_mutex_init(&pThrd->connMtx, NULL);
|
||||
QUEUE_INIT(&pThrd->msg);
|
||||
pthread_mutex_init(&pThrd->msgMtx, NULL);
|
||||
|
||||
pThrd->workerAsync = malloc(sizeof(uv_async_t));
|
||||
uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
|
||||
|
@ -523,34 +535,42 @@ void* workerThread(void* arg) {
|
|||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
static SConn* createConn() {
|
||||
SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
|
||||
static SSrvConn* createConn() {
|
||||
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
|
||||
tDebug("conn %p created", pConn);
|
||||
++pConn->ref;
|
||||
return pConn;
|
||||
}
|
||||
|
||||
static void destroyConn(SConn* conn, bool clear) {
|
||||
static void destroyConn(SSrvConn* conn, bool clear) {
|
||||
if (conn == NULL) {
|
||||
return;
|
||||
}
|
||||
if (--conn->ref == 0) {
|
||||
// SRpcMsg* pMsg = &conn->sendMsg;
|
||||
// transFreeMsg(pMsg->pCont);
|
||||
// pMsg->pCont = NULL;
|
||||
|
||||
tDebug("conn %p try to destroy", conn);
|
||||
if (--conn->ref > 0) {
|
||||
return;
|
||||
}
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
destroySrvMsg(conn);
|
||||
|
||||
if (clear) {
|
||||
uv_close((uv_handle_t*)conn->pTcp, NULL);
|
||||
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
|
||||
}
|
||||
}
|
||||
static void uvDestroyConn(uv_handle_t* handle) {
|
||||
SSrvConn* conn = handle->data;
|
||||
tDebug("conn %p destroy", conn);
|
||||
uv_timer_stop(conn->pTimer);
|
||||
free(conn->pTimer);
|
||||
free(conn->pTcp);
|
||||
free(conn->connBuf.buf);
|
||||
// free(conn->pTcp);
|
||||
free(conn->pWriter);
|
||||
free(conn);
|
||||
}
|
||||
static void uvDestroyConn(uv_handle_t* handle) {
|
||||
SConn* conn = handle->data;
|
||||
destroyConn(conn, false);
|
||||
}
|
||||
static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) {
|
||||
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
|
||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||
|
||||
if (pConn->spi && pConn->secured == 0) {
|
||||
|
@ -632,6 +652,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
pthread_join(pThrd->thread, NULL);
|
||||
// free(srv->pipe[i]);
|
||||
free(pThrd->loop);
|
||||
pthread_mutex_destroy(&pThrd->msgMtx);
|
||||
free(pThrd);
|
||||
}
|
||||
void taosCloseServer(void* arg) {
|
||||
|
@ -648,17 +669,20 @@ void taosCloseServer(void* arg) {
|
|||
}
|
||||
|
||||
void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||
SConn* pConn = pMsg->handle;
|
||||
SSrvConn* pConn = pMsg->handle;
|
||||
SWorkThrdObj* pThrd = pConn->hostThrd;
|
||||
|
||||
// opt later
|
||||
pConn->sendMsg = *pMsg;
|
||||
pthread_mutex_lock(&pThrd->connMtx);
|
||||
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
|
||||
pthread_mutex_unlock(&pThrd->connMtx);
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
srvMsg->pConn = pConn;
|
||||
srvMsg->msg = *pMsg;
|
||||
|
||||
pthread_mutex_lock(&pThrd->msgMtx);
|
||||
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
|
||||
pthread_mutex_unlock(&pThrd->msgMtx);
|
||||
|
||||
tDebug("conn %p start to send resp", pConn);
|
||||
|
||||
uv_async_send(pConn->pWorkerAsync);
|
||||
uv_async_send(pThrd->workerAsync);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -64,6 +64,7 @@ static void *sendRequest(void *param) {
|
|||
// tsem_wait(&pInfo->rspSem);
|
||||
tsem_wait(&pInfo->rspSem);
|
||||
tDebug("recv response succefully");
|
||||
|
||||
// usleep(100000000);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue