handle except and update UT
This commit is contained in:
parent
af3a8be54e
commit
607a7ac025
|
@ -29,7 +29,6 @@ extern "C" {
|
|||
|
||||
extern int tsRpcHeadSize;
|
||||
|
||||
|
||||
typedef struct SRpcConnInfo {
|
||||
uint32_t clientIp;
|
||||
uint16_t clientPort;
|
||||
|
@ -46,7 +45,6 @@ typedef struct SRpcMsg {
|
|||
void * ahandle; // app handle set by client
|
||||
} SRpcMsg;
|
||||
|
||||
|
||||
typedef struct SRpcInit {
|
||||
uint16_t localPort; // local port
|
||||
char * label; // for debug purpose
|
||||
|
@ -72,8 +70,10 @@ typedef struct SRpcInit {
|
|||
bool (*pfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
// to support Send messages multiple times on a link
|
||||
//
|
||||
void* (*mfp)(void *parent, tmsg_t msgType);
|
||||
void *(*mfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
// call back to handle except when query/fetch in progress
|
||||
void (*efp)(void *parent, tmsg_t msgType);
|
||||
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
|
|
@ -125,9 +125,8 @@ typedef SRpcInfo STrans;
|
|||
typedef SRpcConnInfo STransHandleInfo;
|
||||
|
||||
typedef struct {
|
||||
SEpSet epSet; // ip list provided by app
|
||||
void* ahandle; // handle provided by app
|
||||
// struct SRpcConn* pConn; // pConn allocated
|
||||
SEpSet epSet; // ip list provided by app
|
||||
void* ahandle; // handle provided by app
|
||||
tmsg_t msgType; // message type
|
||||
uint8_t* pCont; // content provided by app
|
||||
int32_t contLen; // content length
|
||||
|
@ -135,7 +134,7 @@ typedef struct {
|
|||
// int16_t numOfTry; // number of try for different servers
|
||||
// int8_t oldInUse; // server EP inUse passed by app
|
||||
// int8_t redirect; // flag to indicate redirect
|
||||
int8_t connType; // connection type
|
||||
int8_t connType; // connection type cli/srv
|
||||
int64_t rid; // refId returned by taosAddRef
|
||||
|
||||
STransMsg* pRsp; // for synchronous API
|
||||
|
|
|
@ -65,6 +65,7 @@ typedef struct {
|
|||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||
bool (*pfp)(void* parent, tmsg_t msgType);
|
||||
void* (*mfp)(void* parent, tmsg_t msgType);
|
||||
void (*efp)(void* parent, tmsg_t msgType);
|
||||
|
||||
int32_t refCount;
|
||||
void* parent;
|
||||
|
|
|
@ -36,6 +36,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->afp = pInit->afp;
|
||||
pRpc->pfp = pInit->pfp;
|
||||
pRpc->mfp = pInit->mfp;
|
||||
pRpc->efp = pInit->efp;
|
||||
|
||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
|
|
|
@ -34,6 +34,10 @@ typedef struct SCliConn {
|
|||
// spi configure
|
||||
char spi;
|
||||
char secured;
|
||||
|
||||
char* ip;
|
||||
uint32_t port;
|
||||
|
||||
// debug and log info
|
||||
struct sockaddr_in addr;
|
||||
struct sockaddr_in locaddr;
|
||||
|
@ -79,7 +83,7 @@ typedef struct SConnList {
|
|||
static void* createConnPool(int size);
|
||||
static void* destroyConnPool(void* pool);
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn);
|
||||
static void addConnToPool(void* pool, SCliConn* conn);
|
||||
|
||||
// register timer in each thread to clear expire conn
|
||||
static void cliTimeoutCb(uv_timer_t* handle);
|
||||
|
@ -188,6 +192,12 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
tTrace("except, server continue send while cli ignore it");
|
||||
// transUnrefCliHandle(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pCtx == NULL || pCtx->pSem == NULL) {
|
||||
tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
|
||||
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
|
@ -197,14 +207,13 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tsem_post(pCtx->pSem);
|
||||
}
|
||||
|
||||
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||
|
||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||
addConnToPool(pThrd->pool, conn);
|
||||
}
|
||||
destroyCmsg(conn->data);
|
||||
conn->data = NULL;
|
||||
|
||||
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||
// start thread's timer of conn pool if not active
|
||||
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
|
||||
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||
|
@ -317,11 +326,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
|||
QUEUE_INIT(&conn->conn);
|
||||
return conn;
|
||||
}
|
||||
static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
|
||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||
char key[128] = {0};
|
||||
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
tstrncpy(key, conn->ip, strlen(conn->ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
|
||||
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
|
||||
|
@ -395,7 +404,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
}
|
||||
static void cliDestroy(uv_handle_t* handle) {
|
||||
SCliConn* conn = handle->data;
|
||||
|
||||
free(conn->ip);
|
||||
free(conn->stream);
|
||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||
free(conn);
|
||||
|
@ -524,11 +533,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
SCliConn* conn = cliGetConn(pMsg, pThrd);
|
||||
if (conn != NULL) {
|
||||
conn->data = pMsg;
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
|
||||
transDestroyBuffer(&conn->readBuf);
|
||||
cliSend(conn);
|
||||
} else {
|
||||
conn = cliCreateConn(pThrd);
|
||||
conn->data = pMsg;
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
conn->ip = strdup(pMsg->ctx->ip);
|
||||
conn->port = pMsg->ctx->port;
|
||||
|
||||
int ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||
if (ret) {
|
||||
|
@ -540,8 +554,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
}
|
||||
|
||||
conn->hThrdIdx = pCtx->hThrdIdx;
|
||||
}
|
||||
static void cliAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
|
|
|
@ -289,11 +289,13 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
if (conn->srvMsgs != NULL) {
|
||||
assert(taosArrayGetSize(conn->srvMsgs) >= 1);
|
||||
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
|
||||
tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||
taosArrayRemove(conn->srvMsgs, 0);
|
||||
destroySmsg(msg);
|
||||
|
||||
// send second data, just use for push
|
||||
if (taosArrayGetSize(conn->srvMsgs) > 0) {
|
||||
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
|
||||
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
|
||||
uvStartSendRespInternal(msg);
|
||||
}
|
||||
|
@ -733,7 +735,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
|||
}
|
||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
|
||||
tDebug("send quit msg to work thread");
|
||||
tDebug("server send quit msg to work thread");
|
||||
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,29 @@ const char *ckey = "ckey";
|
|||
class Server;
|
||||
int port = 7000;
|
||||
// server process
|
||||
|
||||
static bool cliPersistHandle(void *parent, tmsg_t msgType) {
|
||||
// client persist handle
|
||||
return msgType == 2 || msgType == 4;
|
||||
}
|
||||
|
||||
typedef struct CbArgs {
|
||||
tmsg_t msgType;
|
||||
} CbArgs;
|
||||
|
||||
static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
|
||||
if (msgType == 1 || msgType == 2) {
|
||||
CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs));
|
||||
args->msgType = msgType;
|
||||
return args;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
// server except
|
||||
static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {}
|
||||
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
|
||||
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
// client process;
|
||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
|
@ -61,17 +83,17 @@ class Client {
|
|||
rpcInit_.cfp = cb;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.pfp = pfp;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.mfp = mfp;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
|
||||
rpcInit_.pfp = pfp;
|
||||
|
@ -88,6 +110,7 @@ class Client {
|
|||
SemWait();
|
||||
*resp = this->resp;
|
||||
}
|
||||
void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {}
|
||||
void SemWait() { tsem_wait(&this->sem); }
|
||||
void SemPost() { tsem_post(&this->sem); }
|
||||
void Reset() {}
|
||||
|
@ -105,19 +128,20 @@ class Client {
|
|||
class Server {
|
||||
public:
|
||||
Server() {
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = port;
|
||||
rpcInit.label = (char *)label;
|
||||
rpcInit.numOfThreads = 5;
|
||||
rpcInit.cfp = processReq;
|
||||
rpcInit.user = (char *)user;
|
||||
rpcInit.secret = (char *)secret;
|
||||
rpcInit.ckey = (char *)ckey;
|
||||
rpcInit.spi = 1;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
memset(&rpcInit_, 0, sizeof(rpcInit_));
|
||||
rpcInit_.localPort = port;
|
||||
rpcInit_.label = (char *)label;
|
||||
rpcInit_.numOfThreads = 5;
|
||||
rpcInit_.cfp = processReq;
|
||||
rpcInit_.efp = NULL;
|
||||
rpcInit_.user = (char *)user;
|
||||
rpcInit_.secret = (char *)secret;
|
||||
rpcInit_.ckey = (char *)ckey;
|
||||
rpcInit_.spi = 1;
|
||||
rpcInit_.connType = TAOS_CONN_SERVER;
|
||||
}
|
||||
void Start() {
|
||||
this->transSrv = rpcOpen(&this->rpcInit);
|
||||
this->transSrv = rpcOpen(&this->rpcInit_);
|
||||
taosMsleep(1000);
|
||||
}
|
||||
void Stop() {
|
||||
|
@ -125,6 +149,16 @@ class Server {
|
|||
rpcClose(this->transSrv);
|
||||
this->transSrv = NULL;
|
||||
}
|
||||
void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||
this->Stop();
|
||||
rpcInit_.efp = efp;
|
||||
this->Start();
|
||||
}
|
||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||
this->Stop();
|
||||
rpcInit_.cfp = cfp;
|
||||
this->Start();
|
||||
}
|
||||
void Restart() {
|
||||
this->Stop();
|
||||
this->Start();
|
||||
|
@ -135,7 +169,7 @@ class Server {
|
|||
}
|
||||
|
||||
private:
|
||||
SRpcInit rpcInit;
|
||||
SRpcInit rpcInit_;
|
||||
void * transSrv;
|
||||
};
|
||||
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
|
@ -146,6 +180,20 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
for (int i = 0; i < 9; i++) {
|
||||
rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER);
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = rpcMallocCont(100);
|
||||
rpcMsg.contLen = 100;
|
||||
rpcMsg.handle = pMsg->handle;
|
||||
rpcMsg.code = 0;
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
}
|
||||
// client process;
|
||||
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
Client *client = (Client *)parent;
|
||||
|
@ -170,7 +218,7 @@ static void initEnv() {
|
|||
tsAsyncLog = 0;
|
||||
|
||||
std::string path = "/tmp/transport";
|
||||
taosRemoveDir(path.c_str());
|
||||
// taosRemoveDir(path.c_str());
|
||||
taosMkDir(path.c_str());
|
||||
|
||||
tstrncpy(tsLogDir, path.c_str(), PATH_MAX);
|
||||
|
@ -178,6 +226,7 @@ static void initEnv() {
|
|||
printf("failed to init log file\n");
|
||||
}
|
||||
}
|
||||
|
||||
class TransObj {
|
||||
public:
|
||||
TransObj() {
|
||||
|
@ -188,22 +237,38 @@ class TransObj {
|
|||
srv->Start();
|
||||
}
|
||||
|
||||
void RestartCli(CB cb) { cli->Restart(cb); }
|
||||
void StopSrv() { srv->Stop(); }
|
||||
void RestartCli(CB cb) {
|
||||
//
|
||||
cli->Restart(cb);
|
||||
}
|
||||
void StopSrv() {
|
||||
//
|
||||
srv->Stop();
|
||||
}
|
||||
void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||
// do nothing
|
||||
cli->setPersistFP(pfp);
|
||||
cli->SetPersistFP(pfp);
|
||||
}
|
||||
void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
// do nothing
|
||||
cli->setConstructFP(mfp);
|
||||
cli->SetConstructFP(mfp);
|
||||
}
|
||||
void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) {
|
||||
// do nothing
|
||||
cli->setPAndMFp(pfp, mfp);
|
||||
cli->SetPAndMFp(pfp, mfp);
|
||||
}
|
||||
// call when link broken, and notify query or fetch stop
|
||||
void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||
////////
|
||||
srv->SetExceptFp(efp);
|
||||
}
|
||||
void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) {
|
||||
///////
|
||||
srv->SetSrvContinueSend(cfp);
|
||||
}
|
||||
void RestartSrv() { srv->Restart(); }
|
||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||
|
||||
~TransObj() {
|
||||
delete cli;
|
||||
delete srv;
|
||||
|
@ -256,13 +321,50 @@ TEST_F(TransEnv, 02StopServer) {
|
|||
tr->cliSendAndRecv(&req, &resp);
|
||||
assert(resp.code != 0);
|
||||
}
|
||||
TEST_F(TransEnv, clientUserDefined) {}
|
||||
TEST_F(TransEnv, clientUserDefined) {
|
||||
tr->RestartSrv();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg req = {0}, resp = {0};
|
||||
req.msgType = 0;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendAndRecv(&req, &resp);
|
||||
assert(resp.code == 0);
|
||||
}
|
||||
|
||||
//////////////////
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, cliPersistHandle) {
|
||||
// impl late
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle};
|
||||
req.msgType = 1;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendAndRecv(&req, &resp);
|
||||
if (i == 5) {
|
||||
std::cout << "stop server" << std::endl;
|
||||
tr->StopSrv();
|
||||
}
|
||||
if (i >= 6) {
|
||||
EXPECT_TRUE(resp.code != 0);
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////
|
||||
}
|
||||
TEST_F(TransEnv, srvPersistHandle) {
|
||||
// impl later
|
||||
TEST_F(TransEnv, srvContinueSend) {
|
||||
tr->SetSrvContinueSend(processContinueSend);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
SRpcMsg req = {0}, resp = {0};
|
||||
req.msgType = 1;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendAndRecv(&req, &resp);
|
||||
}
|
||||
taosMsleep(2000);
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, srvPersisHandleExcept) {
|
||||
|
@ -282,3 +384,6 @@ TEST_F(TransEnv, multiSrvPersisHandleExcept) {
|
|||
TEST_F(TransEnv, queryExcept) {
|
||||
// query and conn is broken
|
||||
}
|
||||
TEST_F(TransEnv, noResp) {
|
||||
// no resp
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue