handle except
This commit is contained in:
parent
3843b689bb
commit
3cc9979a99
|
@ -42,9 +42,10 @@ typedef struct SRpcMsg {
|
|||
void * pCont;
|
||||
int contLen;
|
||||
int32_t code;
|
||||
void * handle; // rpc handle returned to app
|
||||
void * ahandle; // app handle set by client
|
||||
int noResp; // has response or not(default 0 indicate resp);
|
||||
void * handle; // rpc handle returned to app
|
||||
void * ahandle; // app handle set by client
|
||||
int noResp; // has response or not(default 0 indicate resp);
|
||||
int persistHandle; // persist handle or not
|
||||
|
||||
} SRpcMsg;
|
||||
|
||||
|
@ -69,15 +70,9 @@ typedef struct SRpcInit {
|
|||
// call back to retrieve the client auth info, for server app only
|
||||
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
||||
// call back to keep conn or not
|
||||
bool (*pfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
// to support Send messages multiple times on a link
|
||||
void *(*mfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
// call back to handle except when query/fetch in progress
|
||||
bool (*efp)(void *parent, tmsg_t msgType);
|
||||
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
||||
|
|
|
@ -90,7 +90,6 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
rpcInit.label = "TSC";
|
||||
rpcInit.numOfThreads = numOfThread;
|
||||
rpcInit.cfp = processMsgFromServer;
|
||||
rpcInit.pfp = persistConnForSpecificMsg;
|
||||
rpcInit.sessions = tsMaxConnections;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.user = (char *)user;
|
||||
|
|
|
@ -155,6 +155,10 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
|
|||
.ahandle = (void*)pInfo,
|
||||
.handle = pInfo->msgInfo.handle,
|
||||
.code = 0};
|
||||
if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH ||
|
||||
pInfo->msgType == TDMT_VND_QUERY_CONTINUE) {
|
||||
rpcMsg.persistHandle = 1;
|
||||
}
|
||||
|
||||
assert(pInfo->fp != NULL);
|
||||
|
||||
|
|
|
@ -150,11 +150,12 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char version : 4; // RPC version
|
||||
char comp : 4; // compression algorithm, 0:no compression 1:lz4
|
||||
char resflag : 2; // reserved bits
|
||||
char spi : 1; // security parameter index
|
||||
char comp : 2; // compression algorithm, 0:no compression 1:lz4
|
||||
char noResp : 2; // noResp bits, 0: resp, 1: resp
|
||||
char persist : 2; // persist handle,0: no persit, 1: persist handle
|
||||
char release : 2;
|
||||
char secured : 2;
|
||||
char encrypt : 3; // encrypt algorithm, 0: no encryption
|
||||
char spi : 2;
|
||||
|
||||
uint32_t code; // del later
|
||||
uint32_t msgType;
|
||||
|
@ -179,6 +180,9 @@ typedef struct {
|
|||
|
||||
#pragma pack(pop)
|
||||
|
||||
typedef enum { Normal, Quit, Release } STransMsgType;
|
||||
typedef enum { ConnNormal, ConnAcquire, ConnRelease } ConnStatus;
|
||||
|
||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
|
||||
|
||||
|
|
|
@ -39,9 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
// register callback handle
|
||||
pRpc->cfp = pInit->cfp;
|
||||
pRpc->afp = pInit->afp;
|
||||
pRpc->pfp = pInit->pfp;
|
||||
pRpc->mfp = pInit->mfp;
|
||||
pRpc->efp = pInit->efp;
|
||||
|
||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
|
|
|
@ -17,11 +17,6 @@
|
|||
|
||||
#include "transComm.h"
|
||||
|
||||
// Normal(default): send/recv msg
|
||||
// Quit: quit rpc inst
|
||||
// Release: release handle to rpc inst
|
||||
typedef enum { Normal, Quit, Release } SCliMsgType;
|
||||
|
||||
typedef struct SCliConn {
|
||||
T_REF_DECLARE()
|
||||
uv_connect_t connReq;
|
||||
|
@ -36,7 +31,8 @@ typedef struct SCliConn {
|
|||
int hThrdIdx;
|
||||
bool broken; // link broken or not
|
||||
|
||||
int persist; //
|
||||
ConnStatus status; //
|
||||
int release; // 1: release
|
||||
// spi configure
|
||||
char spi;
|
||||
char secured;
|
||||
|
@ -55,7 +51,7 @@ typedef struct SCliMsg {
|
|||
STransMsg msg;
|
||||
queue q;
|
||||
uint64_t st;
|
||||
SCliMsgType type;
|
||||
STransMsgType type;
|
||||
} SCliMsg;
|
||||
|
||||
typedef struct SCliThrdObj {
|
||||
|
@ -113,10 +109,12 @@ static void cliSend(SCliConn* pConn);
|
|||
static void cliHandleResp(SCliConn* conn);
|
||||
// handle except about conn
|
||||
static void cliHandleExcept(SCliConn* conn);
|
||||
|
||||
// handle req from app
|
||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrdObj* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease};
|
||||
|
||||
static void cliSendQuit(SCliThrdObj* thrd);
|
||||
static void destroyUserdata(STransMsg* userdata);
|
||||
|
@ -133,6 +131,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
if (T_REF_VAL_GET(conn) == 1) { \
|
||||
SCliThrdObj* thrd = conn->hostThrd; \
|
||||
addConnToPool(thrd->pool, conn); \
|
||||
} \
|
||||
goto _RETURN; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
||||
do { \
|
||||
if (thrd->quit) { \
|
||||
|
@ -151,14 +163,15 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
|
||||
#define CONN_SET_PERSIST_BY_APP(conn) \
|
||||
do { \
|
||||
if (conn->persist == false) { \
|
||||
conn->persist = true; \
|
||||
if (conn->status == ConnNormal) { \
|
||||
conn->status = ConnAcquire; \
|
||||
transRefCliHandle(conn); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->status == ConnNormal && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
|
||||
static void* cliWorkThread(void* arg);
|
||||
|
||||
|
@ -177,7 +190,6 @@ void cliHandleResp(SCliConn* conn) {
|
|||
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
|
||||
pHead->code = htonl(pHead->code);
|
||||
pHead->msgLen = htonl(pHead->msgLen);
|
||||
|
||||
STransMsg transMsg = {0};
|
||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
transMsg.pCont = transContFromHead((char*)pHead);
|
||||
|
@ -185,6 +197,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
transMsg.msgType = pHead->msgType;
|
||||
transMsg.ahandle = NULL;
|
||||
|
||||
CONN_SHOULD_RELEASE(conn, pHead);
|
||||
|
||||
SCliMsg* pMsg = NULL;
|
||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
||||
pMsg = taosArrayGetP(conn->cliMsgs, 0);
|
||||
|
@ -200,9 +214,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
// buf's mem alread translated to transMsg.pCont
|
||||
transClearBuffer(&conn->readBuf);
|
||||
|
||||
if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) {
|
||||
if (!CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
transMsg.handle = conn;
|
||||
CONN_SET_PERSIST_BY_APP(conn);
|
||||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
|
@ -241,6 +254,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
||||
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||
}
|
||||
_RETURN:
|
||||
return;
|
||||
}
|
||||
|
||||
void cliHandleExcept(SCliConn* pConn) {
|
||||
|
@ -367,6 +382,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
|
||||
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
conn->status = ConnNormal;
|
||||
// list already create before
|
||||
assert(plist != NULL);
|
||||
QUEUE_PUSH(&plist->conn, &conn->conn);
|
||||
|
@ -423,8 +439,8 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
|
|||
|
||||
QUEUE_INIT(&conn->conn);
|
||||
conn->hostThrd = pThrd;
|
||||
conn->persist = false;
|
||||
conn->broken = false;
|
||||
conn->status = ConnNormal;
|
||||
conn->broken = 0;
|
||||
transRefCliHandle(conn);
|
||||
return conn;
|
||||
}
|
||||
|
@ -513,7 +529,9 @@ void cliSend(SCliConn* pConn) {
|
|||
msgLen += sizeof(STransUserMsg);
|
||||
}
|
||||
|
||||
pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||
|
||||
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
|
||||
pHead->msgType = pMsg->msgType;
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||
|
||||
|
@ -522,6 +540,9 @@ void cliSend(SCliConn* pConn) {
|
|||
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
}
|
||||
pConn->writeReq.data = pConn;
|
||||
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
|
||||
|
||||
|
@ -571,12 +592,12 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
}
|
||||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
|
||||
conn->persist = false;
|
||||
conn->status = ConnRelease;
|
||||
int ref = T_REF_VAL_GET(conn);
|
||||
if (ref == 2) {
|
||||
transUnrefCliHandle(conn);
|
||||
} else if (ref == 1) {
|
||||
addConnToPool(pThrd->pool, conn);
|
||||
} else {
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -652,14 +673,10 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
if (pMsg->type == Normal) {
|
||||
cliHandleReq(pMsg, pThrd);
|
||||
} else if (pMsg->type == Quit) {
|
||||
cliHandleQuit(pMsg, pThrd);
|
||||
} else if (pMsg->type == Release) {
|
||||
cliHandleRelease(pMsg, pThrd);
|
||||
if (pMsg == NULL) {
|
||||
continue;
|
||||
}
|
||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||
count++;
|
||||
}
|
||||
if (count >= 2) {
|
||||
|
@ -802,8 +819,8 @@ void transReleaseCliHandle(void* handle) {
|
|||
|
||||
STransMsg tmsg = {.handle = handle};
|
||||
SCliMsg* cmsg = calloc(1, sizeof(SCliMsg));
|
||||
cmsg->type = Release;
|
||||
cmsg->msg = tmsg;
|
||||
cmsg->type = Release;
|
||||
|
||||
transSendAsync(thrd->asyncPool, &cmsg->q);
|
||||
}
|
||||
|
@ -833,6 +850,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
|||
cliMsg->ctx = pCtx;
|
||||
cliMsg->msg = *pMsg;
|
||||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
|
@ -858,6 +876,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
|
|||
cliMsg->ctx = pCtx;
|
||||
cliMsg->msg = *pReq;
|
||||
cliMsg->st = taosGetTimestampUs();
|
||||
cliMsg->type = Normal;
|
||||
|
||||
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
|
||||
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SSrvConn {
|
|||
|
||||
bool broken; // conn broken;
|
||||
|
||||
ConnStatus status;
|
||||
struct sockaddr_in addr;
|
||||
struct sockaddr_in locaddr;
|
||||
|
||||
|
@ -47,18 +48,18 @@ typedef struct SSrvConn {
|
|||
} SSrvConn;
|
||||
|
||||
typedef struct SSrvMsg {
|
||||
SSrvConn* pConn;
|
||||
STransMsg msg;
|
||||
queue q;
|
||||
SSrvConn* pConn;
|
||||
STransMsg msg;
|
||||
queue q;
|
||||
STransMsgType type;
|
||||
} SSrvMsg;
|
||||
|
||||
typedef struct SWorkThrdObj {
|
||||
pthread_t thread;
|
||||
uv_pipe_t* pipe;
|
||||
uv_os_fd_t fd;
|
||||
uv_loop_t* loop;
|
||||
SAsyncPool* asyncPool;
|
||||
|
||||
pthread_t thread;
|
||||
uv_pipe_t* pipe;
|
||||
uv_os_fd_t fd;
|
||||
uv_loop_t* loop;
|
||||
SAsyncPool* asyncPool;
|
||||
queue msg;
|
||||
pthread_mutex_t msgMtx;
|
||||
|
||||
|
@ -113,6 +114,11 @@ static void destroySmsg(SSrvMsg* smsg);
|
|||
static SSrvConn* createConn(void* hThrd);
|
||||
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
||||
|
||||
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||
static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease};
|
||||
|
||||
static void uvDestroyConn(uv_handle_t* handle);
|
||||
|
||||
// server and worker thread
|
||||
|
@ -217,7 +223,6 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
if (pHead->secured == 1) {
|
||||
pHead->msgLen -= sizeof(STransUserMsg);
|
||||
}
|
||||
//
|
||||
}
|
||||
|
||||
STransMsg transMsg;
|
||||
|
@ -230,24 +235,32 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
pConn->inType = pHead->msgType;
|
||||
|
||||
if (pHead->resflag == 0) {
|
||||
if (pConn->status == ConnNormal) {
|
||||
if (pHead->persist == 1) {
|
||||
pConn->status = ConnAcquire;
|
||||
transRefSrvHandle(pConn);
|
||||
}
|
||||
}
|
||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
transMsg.handle = pConn;
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
} else {
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
||||
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
||||
// no ref here
|
||||
}
|
||||
|
||||
if (pHead->noResp == 0) {
|
||||
transMsg.handle = pConn;
|
||||
}
|
||||
|
||||
STrans* pTransInst = (STrans*)p->shandle;
|
||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||
// auth
|
||||
// validate msg type
|
||||
}
|
||||
|
||||
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||
|
@ -272,7 +285,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||
if (nread < 0) {
|
||||
conn->broken = true;
|
||||
uvNotifyLinkBrokenToApp(conn);
|
||||
// uvNotifyLinkBrokenToApp(conn);
|
||||
|
||||
// STrans* pTransInst = conn->pTransInst;
|
||||
// if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) {
|
||||
|
@ -301,8 +314,11 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
||||
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||
taosArrayRemove(conn->srvMsgs, 0);
|
||||
if (msg->type == Release && conn->status != ConnNormal) {
|
||||
conn->status = ConnNormal;
|
||||
transUnrefSrvHandle(conn);
|
||||
}
|
||||
destroySmsg(msg);
|
||||
|
||||
// send second data, just use for push
|
||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
||||
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||
|
@ -339,6 +355,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
|||
|
||||
pHead->secured = pMsg->code == 0 ? 1 : 0; //
|
||||
pHead->msgType = smsg->pConn->inType + 1;
|
||||
pHead->release = smsg->type == Release ? 1 : 0;
|
||||
pHead->code = htonl(pMsg->code);
|
||||
// add more info
|
||||
char* msg = (char*)pHead;
|
||||
|
@ -371,10 +388,12 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
|||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
transUnrefSrvHandle(pConn);
|
||||
if (pConn->status == ConnNormal) {
|
||||
transUnrefSrvHandle(pConn);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pConn->srvMsgs) > 0) {
|
||||
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
||||
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
|
||||
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
taosArrayPush(pConn->srvMsgs, &smsg);
|
||||
return;
|
||||
|
@ -408,6 +427,9 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
|
|||
QUEUE_INIT(h);
|
||||
|
||||
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
|
||||
while (T_REF_VAL_GET(c) >= 2) {
|
||||
transUnrefSrvHandle(c);
|
||||
}
|
||||
transUnrefSrvHandle(c);
|
||||
}
|
||||
}
|
||||
|
@ -431,20 +453,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
|||
tError("unexcept occurred, continue");
|
||||
continue;
|
||||
}
|
||||
if (msg->pConn == NULL) {
|
||||
free(msg);
|
||||
bool noConn = QUEUE_IS_EMPTY(&pThrd->conn);
|
||||
if (noConn == true) {
|
||||
uv_loop_close(pThrd->loop);
|
||||
uv_stop(pThrd->loop);
|
||||
} else {
|
||||
destroyAllConn(pThrd);
|
||||
// uv_loop_close(pThrd->loop);
|
||||
pThrd->quit = true;
|
||||
}
|
||||
} else {
|
||||
uvStartSendResp(msg);
|
||||
}
|
||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||
}
|
||||
}
|
||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||
|
@ -633,6 +642,7 @@ static SSrvConn* createConn(void* hThrd) {
|
|||
tTrace("conn %p created", pConn);
|
||||
|
||||
pConn->broken = false;
|
||||
pConn->status = ConnNormal;
|
||||
|
||||
transRefSrvHandle(pConn);
|
||||
return pConn;
|
||||
|
@ -748,7 +758,38 @@ End:
|
|||
transCloseServer(srv);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||
uv_loop_close(thrd->loop);
|
||||
uv_stop(thrd->loop);
|
||||
} else {
|
||||
destroyAllConn(thrd);
|
||||
thrd->quit = true;
|
||||
}
|
||||
free(msg);
|
||||
}
|
||||
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||
// release handle to rpc init
|
||||
SSrvConn* conn = msg->pConn;
|
||||
if (conn->status == ConnAcquire) {
|
||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
||||
taosArrayPush(conn->srvMsgs, &msg);
|
||||
}
|
||||
taosArrayPush(conn->srvMsgs, &msg);
|
||||
uvStartSendRespInternal(msg);
|
||||
return;
|
||||
} else if (conn->status == ConnRelease) {
|
||||
// already release by server app, do nothing
|
||||
} else if (conn->status == ConnNormal) {
|
||||
// no nothing
|
||||
// user should not call this rpcRelease handle;
|
||||
}
|
||||
free(msg);
|
||||
}
|
||||
void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||
// send msg to client
|
||||
uvStartSendResp(msg);
|
||||
}
|
||||
void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||
if (pThrd == NULL) {
|
||||
return;
|
||||
|
@ -759,10 +800,10 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
free(pThrd);
|
||||
}
|
||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
SSrvMsg* msg = calloc(1, sizeof(SSrvMsg));
|
||||
msg->type = Quit;
|
||||
tDebug("server send quit msg to work thread");
|
||||
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
transSendAsync(pThrd->asyncPool, &msg->q);
|
||||
}
|
||||
|
||||
void transCloseServer(void* arg) {
|
||||
|
@ -813,8 +854,21 @@ void transUnrefSrvHandle(void* handle) {
|
|||
}
|
||||
|
||||
void transReleaseSrvHandle(void* handle) {
|
||||
// do nothing currently
|
||||
//
|
||||
if (handle == NULL) {
|
||||
return;
|
||||
}
|
||||
SSrvConn* pConn = handle;
|
||||
SWorkThrdObj* pThrd = pConn->hostThrd;
|
||||
|
||||
STransMsg tmsg = {.handle = handle, .code = 0};
|
||||
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
srvMsg->msg = tmsg;
|
||||
srvMsg->type = Release;
|
||||
srvMsg->pConn = pConn;
|
||||
|
||||
tTrace("server conn %p start to release", pConn);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
}
|
||||
void transSendResponse(const STransMsg* pMsg) {
|
||||
if (pMsg->handle == NULL) {
|
||||
|
@ -826,6 +880,7 @@ void transSendResponse(const STransMsg* pMsg) {
|
|||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
srvMsg->pConn = pConn;
|
||||
srvMsg->msg = *pMsg;
|
||||
srvMsg->type = Normal;
|
||||
tTrace("server conn %p start to send resp", pConn);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
}
|
||||
|
|
|
@ -31,11 +31,6 @@ class Server;
|
|||
int port = 7000;
|
||||
// server process
|
||||
|
||||
static bool cliPersistHandle(void *parent, tmsg_t msgType) {
|
||||
// client persist handle
|
||||
return msgType == 2 || msgType == 4;
|
||||
}
|
||||
|
||||
typedef struct CbArgs {
|
||||
tmsg_t msgType;
|
||||
} CbArgs;
|
||||
|
@ -93,7 +88,6 @@ class Client {
|
|||
}
|
||||
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.pfp = pfp;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
|
@ -103,8 +97,6 @@ class Client {
|
|||
}
|
||||
void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
|
||||
rpcInit_.pfp = pfp;
|
||||
rpcInit_.mfp = mfp;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
|
@ -149,7 +141,6 @@ class Server {
|
|||
rpcInit_.label = (char *)label;
|
||||
rpcInit_.numOfThreads = 5;
|
||||
rpcInit_.cfp = processReq;
|
||||
rpcInit_.efp = NULL;
|
||||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.secret = (char *)secret;
|
||||
rpcInit_.ckey = (char *)ckey;
|
||||
|
@ -167,7 +158,6 @@ class Server {
|
|||
}
|
||||
void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
|
||||
this->Stop();
|
||||
rpcInit_.efp = efp;
|
||||
this->Start();
|
||||
}
|
||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||
|
@ -358,10 +348,10 @@ TEST_F(TransEnv, clientUserDefined) {
|
|||
}
|
||||
|
||||
TEST_F(TransEnv, cliPersistHandle) {
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
// tr->SetCliPersistFp(cliPersistHandle);
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle, .noResp = 0};
|
||||
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
|
||||
req.msgType = 1;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
|
@ -378,11 +368,9 @@ TEST_F(TransEnv, cliPersistHandle) {
|
|||
}
|
||||
|
||||
TEST_F(TransEnv, cliReleaseHandle) {
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle};
|
||||
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
|
||||
req.msgType = 1;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
|
@ -398,7 +386,7 @@ TEST_F(TransEnv, cliReleaseHandle) {
|
|||
//////////////////
|
||||
}
|
||||
TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
// tr->SetCliPersistFp(cliPersistHandle);
|
||||
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
@ -431,7 +419,7 @@ TEST_F(TransEnv, srvContinueSend) {
|
|||
|
||||
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||
tr->SetSrvContinueSend(processContinueSend);
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
// tr->SetCliPersistFp(cliPersistHandle);
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle};
|
||||
|
@ -450,7 +438,6 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
|
|||
}
|
||||
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||
tr->SetSrvContinueSend(processContinueSend);
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle};
|
||||
|
|
Loading…
Reference in New Issue