Merge pull request #10761 from taosdata/feature/supportQuery
handle except and update UT
This commit is contained in:
commit
82431b840b
|
@ -29,7 +29,6 @@ extern "C" {
|
||||||
|
|
||||||
extern int tsRpcHeadSize;
|
extern int tsRpcHeadSize;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SRpcConnInfo {
|
typedef struct SRpcConnInfo {
|
||||||
uint32_t clientIp;
|
uint32_t clientIp;
|
||||||
uint16_t clientPort;
|
uint16_t clientPort;
|
||||||
|
@ -46,7 +45,6 @@ typedef struct SRpcMsg {
|
||||||
void * ahandle; // app handle set by client
|
void * ahandle; // app handle set by client
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
char * label; // for debug purpose
|
char * label; // for debug purpose
|
||||||
|
@ -71,9 +69,11 @@ typedef struct SRpcInit {
|
||||||
// call back to keep conn or not
|
// call back to keep conn or not
|
||||||
bool (*pfp)(void *parent, tmsg_t msgType);
|
bool (*pfp)(void *parent, tmsg_t msgType);
|
||||||
|
|
||||||
// to support Send messages multiple times on a link
|
// to support Send messages multiple times on a link
|
||||||
//
|
void *(*mfp)(void *parent, tmsg_t msgType);
|
||||||
void* (*mfp)(void *parent, tmsg_t msgType);
|
|
||||||
|
// call back to handle except when query/fetch in progress
|
||||||
|
void (*efp)(void *parent, tmsg_t msgType);
|
||||||
|
|
||||||
void *parent;
|
void *parent;
|
||||||
} SRpcInit;
|
} SRpcInit;
|
||||||
|
@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
void rpcCancelRequest(int64_t rid);
|
void rpcCancelRequest(int64_t rid);
|
||||||
|
|
||||||
// just release client conn to rpc instance, no close sock
|
// just release client conn to rpc instance, no close sock
|
||||||
void rpcReleaseHandle(void *handle);
|
void rpcReleaseHandle(void *handle, int8_t type);
|
||||||
|
|
||||||
void rpcRefHandle(void *handle, int8_t type);
|
void rpcRefHandle(void *handle, int8_t type);
|
||||||
void rpcUnrefHandle(void *handle, int8_t type);
|
void rpcUnrefHandle(void *handle, int8_t type);
|
||||||
|
|
|
@ -125,9 +125,8 @@ typedef SRpcInfo STrans;
|
||||||
typedef SRpcConnInfo STransHandleInfo;
|
typedef SRpcConnInfo STransHandleInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SEpSet epSet; // ip list provided by app
|
SEpSet epSet; // ip list provided by app
|
||||||
void* ahandle; // handle provided by app
|
void* ahandle; // handle provided by app
|
||||||
// struct SRpcConn* pConn; // pConn allocated
|
|
||||||
tmsg_t msgType; // message type
|
tmsg_t msgType; // message type
|
||||||
uint8_t* pCont; // content provided by app
|
uint8_t* pCont; // content provided by app
|
||||||
int32_t contLen; // content length
|
int32_t contLen; // content length
|
||||||
|
@ -135,7 +134,7 @@ typedef struct {
|
||||||
// int16_t numOfTry; // number of try for different servers
|
// int16_t numOfTry; // number of try for different servers
|
||||||
// int8_t oldInUse; // server EP inUse passed by app
|
// int8_t oldInUse; // server EP inUse passed by app
|
||||||
// int8_t redirect; // flag to indicate redirect
|
// int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type cli/srv
|
||||||
int64_t rid; // refId returned by taosAddRef
|
int64_t rid; // refId returned by taosAddRef
|
||||||
|
|
||||||
STransMsg* pRsp; // for synchronous API
|
STransMsg* pRsp; // for synchronous API
|
||||||
|
@ -253,6 +252,9 @@ void transUnrefSrvHandle(void* handle);
|
||||||
void transRefCliHandle(void* handle);
|
void transRefCliHandle(void* handle);
|
||||||
void transUnrefCliHandle(void* handle);
|
void transUnrefCliHandle(void* handle);
|
||||||
|
|
||||||
|
void transReleaseCliHandle(void* handle);
|
||||||
|
void transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
|
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg);
|
||||||
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
|
void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
void transSendResponse(const STransMsg* pMsg);
|
void transSendResponse(const STransMsg* pMsg);
|
||||||
|
|
|
@ -65,6 +65,7 @@ typedef struct {
|
||||||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||||
bool (*pfp)(void* parent, tmsg_t msgType);
|
bool (*pfp)(void* parent, tmsg_t msgType);
|
||||||
void* (*mfp)(void* parent, tmsg_t msgType);
|
void* (*mfp)(void* parent, tmsg_t msgType);
|
||||||
|
void (*efp)(void* parent, tmsg_t msgType);
|
||||||
|
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
void* parent;
|
void* parent;
|
||||||
|
|
|
@ -22,6 +22,11 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThre
|
||||||
|
|
||||||
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
|
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
|
||||||
|
|
||||||
|
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
|
||||||
|
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
|
||||||
|
|
||||||
|
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
|
||||||
|
|
||||||
void* rpcOpen(const SRpcInit* pInit) {
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||||
if (pRpc == NULL) {
|
if (pRpc == NULL) {
|
||||||
|
@ -36,6 +41,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
pRpc->afp = pInit->afp;
|
pRpc->afp = pInit->afp;
|
||||||
pRpc->pfp = pInit->pfp;
|
pRpc->pfp = pInit->pfp;
|
||||||
pRpc->mfp = pInit->mfp;
|
pRpc->mfp = pInit->mfp;
|
||||||
|
pRpc->efp = pInit->efp;
|
||||||
|
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
@ -126,9 +132,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
|
||||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
|
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
|
||||||
|
|
||||||
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
|
|
||||||
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
|
|
||||||
|
|
||||||
void rpcRefHandle(void* handle, int8_t type) {
|
void rpcRefHandle(void* handle, int8_t type) {
|
||||||
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
(*taosRefHandle[type])(handle);
|
(*taosRefHandle[type])(handle);
|
||||||
|
@ -139,6 +142,11 @@ void rpcUnrefHandle(void* handle, int8_t type) {
|
||||||
(*taosUnRefHandle[type])(handle);
|
(*taosUnRefHandle[type])(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rpcReleaseHandle(void* handle, int8_t type) {
|
||||||
|
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
|
||||||
|
(*transReleaseHandle[type])(handle);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rpcInit() {
|
int32_t rpcInit() {
|
||||||
// impl later
|
// impl later
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -17,6 +17,11 @@
|
||||||
|
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
|
// Normal(default): send/recv msg
|
||||||
|
// Quit: quit rpc inst
|
||||||
|
// Release: release handle to rpc inst
|
||||||
|
typedef enum { Normal, Quit, Release } SCliMsgType;
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
|
@ -34,6 +39,10 @@ typedef struct SCliConn {
|
||||||
// spi configure
|
// spi configure
|
||||||
char spi;
|
char spi;
|
||||||
char secured;
|
char secured;
|
||||||
|
|
||||||
|
char* ip;
|
||||||
|
uint32_t port;
|
||||||
|
|
||||||
// debug and log info
|
// debug and log info
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
struct sockaddr_in locaddr;
|
struct sockaddr_in locaddr;
|
||||||
|
@ -45,6 +54,7 @@ typedef struct SCliMsg {
|
||||||
STransMsg msg;
|
STransMsg msg;
|
||||||
queue q;
|
queue q;
|
||||||
uint64_t st;
|
uint64_t st;
|
||||||
|
SCliMsgType type;
|
||||||
} SCliMsg;
|
} SCliMsg;
|
||||||
|
|
||||||
typedef struct SCliThrdObj {
|
typedef struct SCliThrdObj {
|
||||||
|
@ -79,7 +89,7 @@ typedef struct SConnList {
|
||||||
static void* createConnPool(int size);
|
static void* createConnPool(int size);
|
||||||
static void* destroyConnPool(void* pool);
|
static void* destroyConnPool(void* pool);
|
||||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
static void addConnToPool(void* pool, SCliConn* conn);
|
||||||
|
|
||||||
// register timer in each thread to clear expire conn
|
// register timer in each thread to clear expire conn
|
||||||
static void cliTimeoutCb(uv_timer_t* handle);
|
static void cliTimeoutCb(uv_timer_t* handle);
|
||||||
|
@ -104,6 +114,8 @@ static void cliHandleExcept(SCliConn* conn);
|
||||||
// handle req from app
|
// handle req from app
|
||||||
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd);
|
||||||
|
|
||||||
static void cliSendQuit(SCliThrdObj* thrd);
|
static void cliSendQuit(SCliThrdObj* thrd);
|
||||||
static void destroyUserdata(STransMsg* userdata);
|
static void destroyUserdata(STransMsg* userdata);
|
||||||
|
|
||||||
|
@ -117,8 +129,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
|
|
||||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||||
|
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
|
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||||
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
|
||||||
do { \
|
do { \
|
||||||
if (thrd->quit) { \
|
if (thrd->quit) { \
|
||||||
|
@ -188,6 +200,12 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
|
|
||||||
|
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
|
tTrace("except, server continue send while cli ignore it");
|
||||||
|
// transUnrefCliHandle(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
if (pCtx == NULL || pCtx->pSem == NULL) {
|
||||||
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
|
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
|
||||||
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||||
|
@ -197,14 +215,13 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
|
||||||
|
|
||||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
addConnToPool(pThrd->pool, conn);
|
||||||
}
|
}
|
||||||
destroyCmsg(conn->data);
|
destroyCmsg(conn->data);
|
||||||
conn->data = NULL;
|
conn->data = NULL;
|
||||||
|
|
||||||
|
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||||
// start thread's timer of conn pool if not active
|
// start thread's timer of conn pool if not active
|
||||||
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
||||||
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||||
|
@ -318,11 +335,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
QUEUE_INIT(&conn->conn);
|
QUEUE_INIT(&conn->conn);
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
char key[128] = {0};
|
char key[128] = {0};
|
||||||
|
|
||||||
tstrncpy(key, ip, strlen(ip));
|
tstrncpy(key, conn->ip, strlen(conn->ip));
|
||||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
||||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||||
|
|
||||||
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||||
|
@ -336,6 +353,8 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
|
// avoid conn
|
||||||
|
QUEUE_REMOVE(&conn->conn);
|
||||||
transAllocBuffer(pBuf, buf);
|
transAllocBuffer(pBuf, buf);
|
||||||
}
|
}
|
||||||
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -396,7 +415,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
||||||
}
|
}
|
||||||
static void cliDestroy(uv_handle_t* handle) {
|
static void cliDestroy(uv_handle_t* handle) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
|
free(conn->ip);
|
||||||
free(conn->stream);
|
free(conn->stream);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
|
@ -498,6 +517,22 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
SCliConn* conn = pMsg->msg.handle;
|
||||||
|
tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
conn->data = NULL;
|
||||||
|
|
||||||
|
transDestroyBuffer(&conn->readBuf);
|
||||||
|
if (conn->persist && T_REF_VAL_GET(conn) >= 2) {
|
||||||
|
conn->persist = false;
|
||||||
|
transUnrefCliHandle(conn);
|
||||||
|
addConnToPool(pThrd->pool, conn);
|
||||||
|
} else {
|
||||||
|
transUnrefCliHandle(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
SCliConn* conn = NULL;
|
SCliConn* conn = NULL;
|
||||||
|
@ -509,7 +544,9 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
} else {
|
} else {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
|
||||||
if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
if (conn != NULL) {
|
||||||
|
tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
@ -525,11 +562,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||||
if (conn != NULL) {
|
if (conn != NULL) {
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
|
||||||
transDestroyBuffer(&conn->readBuf);
|
transDestroyBuffer(&conn->readBuf);
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
} else {
|
} else {
|
||||||
conn = cliCreateConn(pThrd);
|
conn = cliCreateConn(pThrd);
|
||||||
conn->data = pMsg;
|
conn->data = pMsg;
|
||||||
|
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||||
|
conn->ip = strdup(pMsg->ctx->ip);
|
||||||
|
conn->port = pMsg->ctx->port;
|
||||||
|
|
||||||
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
|
@ -541,8 +583,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
|
||||||
}
|
}
|
||||||
static void cliAsyncCb(uv_async_t* handle) {
|
static void cliAsyncCb(uv_async_t* handle) {
|
||||||
SAsyncItem* item = handle->data;
|
SAsyncItem* item = handle->data;
|
||||||
|
@ -561,10 +601,13 @@ static void cliAsyncCb(uv_async_t* handle) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
|
||||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||||
if (pMsg->ctx == NULL) {
|
|
||||||
cliHandleQuit(pMsg, pThrd);
|
if (pMsg->type == Normal) {
|
||||||
} else {
|
|
||||||
cliHandleReq(pMsg, pThrd);
|
cliHandleReq(pMsg, pThrd);
|
||||||
|
} else if (pMsg->type == Quit) {
|
||||||
|
cliHandleQuit(pMsg, pThrd);
|
||||||
|
} else if (pMsg->type == Release) {
|
||||||
|
cliHandleRelease(pMsg, pThrd);
|
||||||
}
|
}
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
|
@ -662,8 +705,10 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
|
||||||
void cliSendQuit(SCliThrdObj* thrd) {
|
void cliSendQuit(SCliThrdObj* thrd) {
|
||||||
// cli can stop gracefully
|
// cli can stop gracefully
|
||||||
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
|
||||||
|
msg->type = Quit;
|
||||||
transSendAsync(thrd->asyncPool, &msg->q);
|
transSendAsync(thrd->asyncPool, &msg->q);
|
||||||
}
|
}
|
||||||
|
|
||||||
int cliRBChoseIdx(STrans* pTransInst) {
|
int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
int64_t index = pTransInst->index;
|
int64_t index = pTransInst->index;
|
||||||
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
if (pTransInst->index++ >= pTransInst->numOfThreads) {
|
||||||
|
@ -693,10 +738,24 @@ void transUnrefCliHandle(void* handle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int ref = T_REF_DEC((SCliConn*)handle);
|
int ref = T_REF_DEC((SCliConn*)handle);
|
||||||
|
tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref);
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
cliDestroyConn((SCliConn*)handle, true);
|
cliDestroyConn((SCliConn*)handle, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void transReleaseCliHandle(void* handle) {
|
||||||
|
SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle);
|
||||||
|
if (thrd == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STransMsg tmsg = {.handle = handle};
|
||||||
|
SCliMsg* cmsg = calloc(1, sizeof(SCliMsg));
|
||||||
|
cmsg->type = Release;
|
||||||
|
cmsg->msg = tmsg;
|
||||||
|
|
||||||
|
transSendAsync(thrd->asyncPool, &cmsg->q);
|
||||||
|
}
|
||||||
|
|
||||||
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
|
void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) {
|
||||||
STrans* pTransInst = (STrans*)shandle;
|
STrans* pTransInst = (STrans*)shandle;
|
||||||
|
@ -719,7 +778,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
|
||||||
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
assert(pTransInst->connType == TAOS_CONN_CLIENT);
|
||||||
// atomic or not
|
// atomic or not
|
||||||
|
|
||||||
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg));
|
||||||
cliMsg->ctx = pCtx;
|
cliMsg->ctx = pCtx;
|
||||||
cliMsg->msg = *pMsg;
|
cliMsg->msg = *pMsg;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
@ -744,7 +803,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq
|
||||||
pCtx->pRsp = pRsp;
|
pCtx->pRsp = pRsp;
|
||||||
tsem_init(pCtx->pSem, 0, 0);
|
tsem_init(pCtx->pSem, 0, 0);
|
||||||
|
|
||||||
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg));
|
||||||
cliMsg->ctx = pCtx;
|
cliMsg->ctx = pCtx;
|
||||||
cliMsg->msg = *pReq;
|
cliMsg->msg = *pReq;
|
||||||
cliMsg->st = taosGetTimestampUs();
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
|
@ -289,11 +289,13 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
||||||
if (conn->srvMsgs != NULL) {
|
if (conn->srvMsgs != NULL) {
|
||||||
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
|
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
|
||||||
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
||||||
|
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||||
taosArrayRemove(conn->srvMsgs, 0);
|
taosArrayRemove(conn->srvMsgs, 0);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
|
|
||||||
// send second data, just use for push
|
// send second data, just use for push
|
||||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
||||||
|
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||||
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
|
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
|
||||||
uvStartSendRespInternal(msg);
|
uvStartSendRespInternal(msg);
|
||||||
}
|
}
|
||||||
|
@ -643,7 +645,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
uv_timer_stop(&conn->pTimer);
|
uv_timer_stop(&conn->pTimer);
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
free(conn->pTcp);
|
free(conn->pTcp);
|
||||||
free(conn);
|
// free(conn);
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
|
@ -737,7 +739,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||||
tDebug("send quit msg to work thread");
|
tDebug("server send quit msg to work thread");
|
||||||
|
|
||||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||||
}
|
}
|
||||||
|
@ -788,6 +790,11 @@ void transUnrefSrvHandle(void* handle) {
|
||||||
}
|
}
|
||||||
// unref srv handle
|
// unref srv handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transReleaseSrvHandle(void* handle) {
|
||||||
|
// do nothing currently
|
||||||
|
//
|
||||||
|
}
|
||||||
void transSendResponse(const STransMsg* pMsg) {
|
void transSendResponse(const STransMsg* pMsg) {
|
||||||
if (pMsg->handle == NULL) {
|
if (pMsg->handle == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -29,7 +29,29 @@ const char *ckey = "ckey";
|
||||||
class Server;
|
class Server;
|
||||||
int port = 7000;
|
int port = 7000;
|
||||||
// server process
|
// server process
|
||||||
|
|
||||||
|
static bool cliPersistHandle(void *parent, tmsg_t msgType) {
|
||||||
|
// client persist handle
|
||||||
|
return msgType == 2 || msgType == 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct CbArgs {
|
||||||
|
tmsg_t msgType;
|
||||||
|
} CbArgs;
|
||||||
|
|
||||||
|
static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
|
||||||
|
if (msgType == 1 || msgType == 2) {
|
||||||
|
CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs));
|
||||||
|
args->msgType = msgType;
|
||||||
|
return args;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
// server except
|
||||||
|
static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {}
|
||||||
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
// client process;
|
// client process;
|
||||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
|
@ -61,17 +83,17 @@ class Client {
|
||||||
rpcInit_.cfp = cb;
|
rpcInit_.cfp = cb;
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
}
|
}
|
||||||
void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||||
rpcClose(this->transCli);
|
rpcClose(this->transCli);
|
||||||
rpcInit_.pfp = pfp;
|
rpcInit_.pfp = pfp;
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
}
|
}
|
||||||
void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
rpcClose(this->transCli);
|
rpcClose(this->transCli);
|
||||||
rpcInit_.mfp = mfp;
|
rpcInit_.mfp = mfp;
|
||||||
this->transCli = rpcOpen(&rpcInit_);
|
this->transCli = rpcOpen(&rpcInit_);
|
||||||
}
|
}
|
||||||
void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
rpcClose(this->transCli);
|
rpcClose(this->transCli);
|
||||||
|
|
||||||
rpcInit_.pfp = pfp;
|
rpcInit_.pfp = pfp;
|
||||||
|
@ -88,6 +110,15 @@ class Client {
|
||||||
SemWait();
|
SemWait();
|
||||||
*resp = this->resp;
|
*resp = this->resp;
|
||||||
}
|
}
|
||||||
|
void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) {
|
||||||
|
if (req->handle != NULL) {
|
||||||
|
rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT);
|
||||||
|
req->handle = NULL;
|
||||||
|
}
|
||||||
|
SendAndRecv(req, resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {}
|
||||||
void SemWait() { tsem_wait(&this->sem); }
|
void SemWait() { tsem_wait(&this->sem); }
|
||||||
void SemPost() { tsem_post(&this->sem); }
|
void SemPost() { tsem_post(&this->sem); }
|
||||||
void Reset() {}
|
void Reset() {}
|
||||||
|
@ -105,19 +136,20 @@ class Client {
|
||||||
class Server {
|
class Server {
|
||||||
public:
|
public:
|
||||||
Server() {
|
Server() {
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||||
rpcInit.localPort = port;
|
rpcInit_.localPort = port;
|
||||||
rpcInit.label = (char *)label;
|
rpcInit_.label = (char *)label;
|
||||||
rpcInit.numOfThreads = 5;
|
rpcInit_.numOfThreads = 5;
|
||||||
rpcInit.cfp = processReq;
|
rpcInit_.cfp = processReq;
|
||||||
rpcInit.user = (char *)user;
|
rpcInit_.efp = NULL;
|
||||||
rpcInit.secret = (char *)secret;
|
rpcInit_.user = (char *)user;
|
||||||
rpcInit.ckey = (char *)ckey;
|
rpcInit_.secret = (char *)secret;
|
||||||
rpcInit.spi = 1;
|
rpcInit_.ckey = (char *)ckey;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit_.spi = 1;
|
||||||
|
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||||
}
|
}
|
||||||
void Start() {
|
void Start() {
|
||||||
this->transSrv = rpcOpen(&this->rpcInit);
|
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
}
|
}
|
||||||
void Stop() {
|
void Stop() {
|
||||||
|
@ -125,6 +157,16 @@ class Server {
|
||||||
rpcClose(this->transSrv);
|
rpcClose(this->transSrv);
|
||||||
this->transSrv = NULL;
|
this->transSrv = NULL;
|
||||||
}
|
}
|
||||||
|
void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.efp = efp;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
this->Stop();
|
||||||
|
rpcInit_.cfp = cfp;
|
||||||
|
this->Start();
|
||||||
|
}
|
||||||
void Restart() {
|
void Restart() {
|
||||||
this->Stop();
|
this->Stop();
|
||||||
this->Start();
|
this->Start();
|
||||||
|
@ -135,7 +177,7 @@ class Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit_;
|
||||||
void * transSrv;
|
void * transSrv;
|
||||||
};
|
};
|
||||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
@ -146,6 +188,20 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
rpcMsg.code = 0;
|
rpcMsg.code = 0;
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(100);
|
||||||
|
rpcMsg.contLen = 100;
|
||||||
|
rpcMsg.handle = pMsg->handle;
|
||||||
|
rpcMsg.code = 0;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
// client process;
|
// client process;
|
||||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
Client *client = (Client *)parent;
|
Client *client = (Client *)parent;
|
||||||
|
@ -170,7 +226,7 @@ static void initEnv() {
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
|
|
||||||
std::string path = "/tmp/transport";
|
std::string path = "/tmp/transport";
|
||||||
taosRemoveDir(path.c_str());
|
// taosRemoveDir(path.c_str());
|
||||||
taosMkDir(path.c_str());
|
taosMkDir(path.c_str());
|
||||||
|
|
||||||
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
||||||
|
@ -178,6 +234,7 @@ static void initEnv() {
|
||||||
printf("failed to init log file\n");
|
printf("failed to init log file\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TransObj {
|
class TransObj {
|
||||||
public:
|
public:
|
||||||
TransObj() {
|
TransObj() {
|
||||||
|
@ -188,22 +245,39 @@ class TransObj {
|
||||||
srv->Start();
|
srv->Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RestartCli(CB cb) { cli->Restart(cb); }
|
void RestartCli(CB cb) {
|
||||||
void StopSrv() { srv->Stop(); }
|
//
|
||||||
|
cli->Restart(cb);
|
||||||
|
}
|
||||||
|
void StopSrv() {
|
||||||
|
//
|
||||||
|
srv->Stop();
|
||||||
|
}
|
||||||
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
cli->setPersistFP(pfp);
|
cli->SetPersistFP(pfp);
|
||||||
}
|
}
|
||||||
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
cli->setConstructFP(mfp);
|
cli->SetConstructFP(mfp);
|
||||||
}
|
}
|
||||||
void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
cli->setPAndMFp(pfp, mfp);
|
cli->SetPAndMFp(pfp, mfp);
|
||||||
|
}
|
||||||
|
// call when link broken, and notify query or fetch stop
|
||||||
|
void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||||
|
////////
|
||||||
|
srv->SetExceptFp(efp);
|
||||||
|
}
|
||||||
|
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||||
|
///////
|
||||||
|
srv->SetSrvContinueSend(cfp);
|
||||||
}
|
}
|
||||||
void RestartSrv() { srv->Restart(); }
|
void RestartSrv() { srv->Restart(); }
|
||||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||||
|
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||||
|
|
||||||
~TransObj() {
|
~TransObj() {
|
||||||
delete cli;
|
delete cli;
|
||||||
delete srv;
|
delete srv;
|
||||||
|
@ -256,20 +330,97 @@ TEST_F(TransEnv, 02StopServer) {
|
||||||
tr->cliSendAndRecv(&req, &resp);
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
assert(resp.code != 0);
|
assert(resp.code != 0);
|
||||||
}
|
}
|
||||||
TEST_F(TransEnv, clientUserDefined) {}
|
TEST_F(TransEnv, clientUserDefined) {
|
||||||
|
tr->RestartSrv();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SRpcMsg req = {0}, resp = {0};
|
||||||
|
req.msgType = 0;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
|
assert(resp.code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(TransEnv, cliPersistHandle) {
|
TEST_F(TransEnv, cliPersistHandle) {
|
||||||
// impl late
|
tr->SetCliPersistFp(cliPersistHandle);
|
||||||
}
|
SRpcMsg resp = {0};
|
||||||
TEST_F(TransEnv, srvPersistHandle) {
|
for (int i = 0; i < 10; i++) {
|
||||||
// impl later
|
SRpcMsg req = {.handle = resp.handle};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
|
if (i == 5) {
|
||||||
|
std::cout << "stop server" << std::endl;
|
||||||
|
tr->StopSrv();
|
||||||
|
}
|
||||||
|
if (i >= 6) {
|
||||||
|
EXPECT_TRUE(resp.code != 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//////////////////
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(TransEnv, srvPersisHandleExcept) {
|
TEST_F(TransEnv, cliReleaseHandle) {
|
||||||
|
tr->SetCliPersistFp(cliPersistHandle);
|
||||||
|
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SRpcMsg req = {.handle = resp.handle};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||||
|
// if (i == 5) {
|
||||||
|
// std::cout << "stop server" << std::endl;
|
||||||
|
// tr->StopSrv();
|
||||||
|
//}
|
||||||
|
// if (i >= 6) {
|
||||||
|
EXPECT_TRUE(resp.code == 0);
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, cliReleaseHandleExcept) {
|
||||||
|
tr->SetCliPersistFp(cliPersistHandle);
|
||||||
|
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SRpcMsg req = {.handle = resp.handle};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecvNoHandle(&req, &resp);
|
||||||
|
if (i == 5) {
|
||||||
|
std::cout << "stop server" << std::endl;
|
||||||
|
tr->StopSrv();
|
||||||
|
}
|
||||||
|
if (i >= 6) {
|
||||||
|
EXPECT_TRUE(resp.code != 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//////////////////
|
||||||
|
}
|
||||||
|
TEST_F(TransEnv, srvContinueSend) {
|
||||||
|
tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SRpcMsg req = {0}, resp = {0};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
|
}
|
||||||
|
taosMsleep(2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||||
// conn breken
|
// conn breken
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
TEST_F(TransEnv, cliPersisHandleExcept) {
|
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||||
// conn breken
|
// conn breken
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,3 +433,6 @@ TEST_F(TransEnv, multiSrvPersisHandleExcept) {
|
||||||
TEST_F(TransEnv, queryExcept) {
|
TEST_F(TransEnv, queryExcept) {
|
||||||
// query and conn is broken
|
// query and conn is broken
|
||||||
}
|
}
|
||||||
|
TEST_F(TransEnv, noResp) {
|
||||||
|
// no resp
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue