From 9cf07ff8f09bb091845e6acf6f1cc23e01f1cf15 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Feb 2022 22:36:00 +0800 Subject: [PATCH] add more UT test --- source/libs/transport/src/transCli.c | 1 - source/libs/transport/src/transSrv.c | 19 ++- source/libs/transport/test/transUT.cc | 214 ++++++++++++++++++++------ 3 files changed, 184 insertions(+), 50 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4ed1cc2e73..f29c47c21e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -350,7 +350,6 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - // uv_read_stop((uv_stream_t*)conn->stream); tTrace("client conn %p read complete", conn); clientHandleResp(conn); } else { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 5292bad209..a038f72adc 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -96,6 +96,7 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); +static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); @@ -446,6 +447,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { free(msg); destroyAllConn(pThrd); + + uv_loop_close(pThrd->loop); uv_stop(pThrd->loop); } else { uvStartSendResp(msg); @@ -463,6 +466,12 @@ static void uvAcceptAsyncCb(uv_async_t* async) { uv_stop(srv->loop); } +static void uvShutDownCb(uv_shutdown_t* req, int status) { + tDebug("conn failed to shut down: %s", uv_err_name(status)); + uv_close((uv_handle_t*)req->handle, uvDestroyConn); + free(req); +} + void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { return; @@ -528,8 +537,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - uv_tcp_nodelay(pConn->pTcp, 1); - uv_tcp_keepalive(pConn->pTcp, 1, 1); + // uv_tcp_nodelay(pConn->pTcp, 1); + // uv_tcp_keepalive(pConn->pTcp, 1, 1); // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); @@ -656,7 +665,11 @@ static void destroyConn(SSrvConn* conn, bool clear) { QUEUE_REMOVE(&conn->queue); if (clear) { + tTrace("try to destroy conn %p", conn); uv_tcp_close_reset(conn->pTcp, uvDestroyConn); + // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); + // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); + // uv_unref((uv_handle_t*)conn->pTcp); // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } } @@ -665,7 +678,7 @@ static void uvDestroyConn(uv_handle_t* handle) { tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); // free(conn->pTimer); - // free(conn->pTcp); + free(conn->pTcp); free(conn->pWriter); free(conn); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 6f80ea42ac..d1fefe2c72 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -16,69 +16,168 @@ #include #include #include "tep.h" +#include "tglobal.h" #include "trpc.h" +#include "ulog.h" using namespace std; -class TransObj { - public: - TransObj() { - const char *label = "APP"; - const char *secret = "secret"; - const char *user = "user"; - const char *ckey = "ckey"; +const char *label = "APP"; +const char *secret = "secret"; +const char *user = "user"; +const char *ckey = "ckey"; +class Server; +int port = 7000; +// server process +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +class Client { + public: + void Init(int nThread) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = (char *)label; - rpcInit.numOfThreads = 5; - rpcInit.cfp = NULL; - rpcInit.sessions = 100; - rpcInit.idleTime = 100; + rpcInit.numOfThreads = nThread; + rpcInit.cfp = processResp; rpcInit.user = (char *)user; rpcInit.secret = (char *)secret; rpcInit.ckey = (char *)ckey; rpcInit.spi = 1; - } - bool startCli() { - trans = NULL; + rpcInit.parent = this; rpcInit.connType = TAOS_CONN_CLIENT; - trans = rpcOpen(&rpcInit); - return trans != NULL ? true : false; + this->transCli = rpcOpen(&rpcInit); + tsem_init(&this->sem, 0, 0); } - bool startSrv() { - trans = NULL; - rpcInit.connType = TAOS_CONN_SERVER; - trans = rpcOpen(&rpcInit); - return trans != NULL ? true : false; + void SetResp(SRpcMsg *pMsg) { + // set up resp; + this->resp = *pMsg; + } + SRpcMsg *Resp() { return &this->resp; } + + void Restart() { + rpcClose(this->transCli); + this->transCli = rpcOpen(&rpcInit); } - bool sendAndRecv() { + void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { SEpSet epSet = {0}; epSet.inUse = 0; - addEpIntoEpSet(&epSet, "192.168.1.1", 7000); - addEpIntoEpSet(&epSet, "192.168.0.1", 7000); + addEpIntoEpSet(&epSet, "127.0.0.1", 7000); - if (trans == NULL) { - return false; - } - SRpcMsg rpcMsg = {0}, reqMsg = {0}; - reqMsg.pCont = rpcMallocCont(10); - reqMsg.contLen = 10; - reqMsg.ahandle = NULL; - rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg); - int code = rpcMsg.code; - std::cout << tstrerror(code) << std::endl; - return true; + rpcSendRequest(this->transCli, &epSet, req, NULL); + SemWait(); + *resp = this->resp; } - bool stop() { - rpcClose(trans); - trans = NULL; - return true; + void SemWait() { tsem_wait(&this->sem); } + void SemPost() { tsem_post(&this->sem); } + void Reset() {} + + ~Client() { + if (this->transCli) rpcClose(this->transCli); } private: - void * trans; + tsem_t sem; SRpcInit rpcInit; + void * transCli; + SRpcMsg resp; +}; +class Server { + public: + Server() { + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = port; + rpcInit.label = (char *)label; + rpcInit.numOfThreads = 5; + rpcInit.cfp = processReq; + rpcInit.user = (char *)user; + rpcInit.secret = (char *)secret; + rpcInit.ckey = (char *)ckey; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_SERVER; + } + void Start() { + this->transSrv = rpcOpen(&this->rpcInit); + taosMsleep(1000); + } + void Stop() { + if (this->transSrv == NULL) return; + rpcClose(this->transSrv); + this->transSrv = NULL; + } + void Restart() { + this->Stop(); + this->Start(); + } + ~Server() { + if (this->transSrv) rpcClose(this->transSrv); + this->transSrv = NULL; + } + + private: + SRpcInit rpcInit; + void * transSrv; +}; +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); +} +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + Client *client = (Client *)parent; + client->SetResp(pMsg); + client->SemPost(); +} +class TransObj { + public: + TransObj() { + dDebugFlag = 143; + vDebugFlag = 0; + mDebugFlag = 143; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; + qDebugFlag = 0; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + cqDebugFlag = 0; + tscEmbeddedInUtil = 1; + tsAsyncLog = 0; + + std::string path = "/tmp/transport"; + taosRemoveDir(path.c_str()); + taosMkDir(path.c_str()); + + char temp[PATH_MAX]; + snprintf(temp, PATH_MAX, "%s/taosdlog", path.c_str()); + if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) { + printf("failed to init log file\n"); + } + cli = new Client; + cli->Init(1); + srv = new Server; + srv->Start(); + } + void RestartCli() { cli->Restart(); } + void StopSrv() { srv->Stop(); } + void RestartSrv() { srv->Restart(); } + void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + ~TransObj() { + delete cli; + delete srv; + } + + private: + Client *cli; + Server *srv; }; class TransEnv : public ::testing::Test { protected: @@ -93,11 +192,34 @@ class TransEnv : public ::testing::Test { TransObj *tr = NULL; }; -TEST_F(TransEnv, test_start_stop) { - assert(tr->startCli()); - assert(tr->sendAndRecv()); - assert(tr->stop()); - assert(tr->startSrv()); - assert(tr->stop()); +// TEST_F(TransEnv, 01sendAndRec) { +// for (int i = 0; i < 1; i++) { +// SRpcMsg req = {0}, resp = {0}; +// req.msgType = 0; +// req.pCont = rpcMallocCont(10); +// req.contLen = 10; +// tr->cliSendAndRecv(&req, &resp); +// assert(resp.code == 0); +// } +//} + +TEST_F(TransEnv, 02StopServer) { + for (int i = 0; i < 1; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 0; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + assert(resp.code == 0); + } + SRpcMsg req = {0}, resp = {0}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->StopSrv(); + // tr->RestartSrv(); + tr->cliSendAndRecv(&req, &resp); + + assert(resp.code != 0); }