diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8dfd736df6..b5b8d6ab66 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -29,7 +29,6 @@ extern "C" { extern int tsRpcHeadSize; - typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; @@ -46,7 +45,6 @@ typedef struct SRpcMsg { void * ahandle; // app handle set by client } SRpcMsg; - typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose @@ -71,9 +69,11 @@ typedef struct SRpcInit { // call back to keep conn or not bool (*pfp)(void *parent, tmsg_t msgType); - // to support Send messages multiple times on a link - // - void* (*mfp)(void *parent, tmsg_t msgType); + // to support Send messages multiple times on a link + void *(*mfp)(void *parent, tmsg_t msgType); + + // call back to handle except when query/fetch in progress + void (*efp)(void *parent, tmsg_t msgType); void *parent; } SRpcInit; @@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle); +void rpcReleaseHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 99f890d3a0..8ea65b193d 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -125,9 +125,8 @@ typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - // struct SRpcConn* pConn; // pConn allocated + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app tmsg_t msgType; // message type uint8_t* pCont; // content provided by app int32_t contLen; // content length @@ -135,7 +134,7 @@ typedef struct { // int16_t numOfTry; // number of try for different servers // int8_t oldInUse; // server EP inUse passed by app // int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type + int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef STransMsg* pRsp; // for synchronous API @@ -253,6 +252,9 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); +void transReleaseCliHandle(void* handle); +void transReleaseSrvHandle(void* handle); + void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 3924a5cf1a..e760fe1de9 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -65,6 +65,7 @@ typedef struct { int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); void* (*mfp)(void* parent, tmsg_t msgType); + void (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 58809ee3be..2cab03f133 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -22,6 +22,11 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThre void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; +void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; + +void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; + void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { @@ -36,6 +41,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->afp = pInit->afp; pRpc->pfp = pInit->pfp; pRpc->mfp = pInit->mfp; + pRpc->efp = pInit->efp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -126,9 +132,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } -void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; -void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; - void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); @@ -139,6 +142,11 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcReleaseHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*transReleaseHandle[type])(handle); +} + int32_t rpcInit() { // impl later return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5be1b7ea90..b7bc428901 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,11 @@ #include "transComm.h" +// Normal(default): send/recv msg +// Quit: quit rpc inst +// Release: release handle to rpc inst +typedef enum { Normal, Quit, Release } SCliMsgType; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -34,6 +39,10 @@ typedef struct SCliConn { // spi configure char spi; char secured; + + char* ip; + uint32_t port; + // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -45,6 +54,7 @@ typedef struct SCliMsg { STransMsg msg; queue q; uint64_t st; + SCliMsgType type; } SCliMsg; typedef struct SCliThrdObj { @@ -79,7 +89,7 @@ typedef struct SConnList { static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); -static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); +static void addConnToPool(void* pool, SCliConn* conn); // register timer in each thread to clear expire conn static void cliTimeoutCb(uv_timer_t* handle); @@ -104,6 +114,8 @@ static void cliHandleExcept(SCliConn* conn); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); + static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -117,8 +129,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) - -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) +#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -188,6 +200,12 @@ void cliHandleResp(SCliConn* conn) { conn->secured = pHead->secured; + if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { + tTrace("except, server continue send while cli ignore it"); + // transUnrefCliHandle(conn); + return; + } + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -197,14 +215,13 @@ void cliHandleResp(SCliConn* conn) { tsem_post(pCtx->pSem); } - uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); - if (CONN_NO_PERSIST_BY_APP(conn)) { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + addConnToPool(pThrd->pool, conn); } destroyCmsg(conn->data); conn->data = NULL; + uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); @@ -318,11 +335,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { QUEUE_INIT(&conn->conn); return conn; } -static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { +static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; - tstrncpy(key, ip, strlen(ip)); - tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + tstrncpy(key, conn->ip, strlen(conn->ip)); + tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -336,6 +353,8 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; + // avoid conn + QUEUE_REMOVE(&conn->conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -396,7 +415,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - + free(conn->ip); free(conn->stream); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -498,6 +517,22 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } +static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SCliConn* conn = pMsg->msg.handle; + tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); + + destroyCmsg(pMsg); + conn->data = NULL; + + transDestroyBuffer(&conn->readBuf); + if (conn->persist && T_REF_VAL_GET(conn) >= 2) { + conn->persist = false; + transUnrefCliHandle(conn); + addConnToPool(pThrd->pool, conn); + } else { + transUnrefCliHandle(conn); + } +} SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; @@ -509,7 +544,9 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } else { STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + if (conn != NULL) { + tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + } } return conn; } @@ -525,11 +562,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->data = pMsg; + conn->hThrdIdx = pCtx->hThrdIdx; + transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); conn->data = pMsg; + conn->hThrdIdx = pCtx->hThrdIdx; + conn->ip = strdup(pMsg->ctx->ip); + conn->port = pMsg->ctx->port; int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { @@ -541,8 +583,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } - - conn->hThrdIdx = pCtx->hThrdIdx; } static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -561,10 +601,13 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - if (pMsg->ctx == NULL) { - cliHandleQuit(pMsg, pThrd); - } else { + + if (pMsg->type == Normal) { cliHandleReq(pMsg, pThrd); + } else if (pMsg->type == Quit) { + cliHandleQuit(pMsg, pThrd); + } else if (pMsg->type == Release) { + cliHandleRelease(pMsg, pThrd); } count++; } @@ -662,8 +705,10 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); + msg->type = Quit; transSendAsync(thrd->asyncPool, &msg->q); } + int cliRBChoseIdx(STrans* pTransInst) { int64_t index = pTransInst->index; if (pTransInst->index++ >= pTransInst->numOfThreads) { @@ -693,10 +738,24 @@ void transUnrefCliHandle(void* handle) { return; } int ref = T_REF_DEC((SCliConn*)handle); + tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref); if (ref == 0) { cliDestroyConn((SCliConn*)handle, true); } } +void transReleaseCliHandle(void* handle) { + SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle); + if (thrd == NULL) { + return; + } + + STransMsg tmsg = {.handle = handle}; + SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); + cmsg->type = Release; + cmsg->msg = tmsg; + + transSendAsync(thrd->asyncPool, &cmsg->q); +} void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { STrans* pTransInst = (STrans*)shandle; @@ -719,7 +778,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not - SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); @@ -744,7 +803,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq pCtx->pRsp = pRsp; tsem_init(pCtx->pSem, 0, 0); - SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index d26c5ec0f4..c1fbb50590 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -289,11 +289,13 @@ void uvOnSendCb(uv_write_t* req, int status) { if (conn->srvMsgs != NULL) { assert(taosArrayGetSize(conn->srvMsgs) >= 1); SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); taosArrayRemove(conn->srvMsgs, 0); destroySmsg(msg); // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { + tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); uvStartSendRespInternal(msg); } @@ -643,7 +645,7 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_timer_stop(&conn->pTimer); QUEUE_REMOVE(&conn->queue); free(conn->pTcp); - free(conn); + // free(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { uv_loop_close(thrd->loop); @@ -737,7 +739,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); - tDebug("send quit msg to work thread"); + tDebug("server send quit msg to work thread"); transSendAsync(pThrd->asyncPool, &srvMsg->q); } @@ -788,6 +790,11 @@ void transUnrefSrvHandle(void* handle) { } // unref srv handle } + +void transReleaseSrvHandle(void* handle) { + // do nothing currently + // +} void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index fa20327003..46f6424b0b 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -29,7 +29,29 @@ const char *ckey = "ckey"; class Server; int port = 7000; // server process + +static bool cliPersistHandle(void *parent, tmsg_t msgType) { + // client persist handle + return msgType == 2 || msgType == 4; +} + +typedef struct CbArgs { + tmsg_t msgType; +} CbArgs; + +static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { + if (msgType == 1 || msgType == 2) { + CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs)); + args->msgType = msgType; + return args; + } + return NULL; +} +// server except +static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {} typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -61,17 +83,17 @@ class Client { rpcInit_.cfp = cb; this->transCli = rpcOpen(&rpcInit_); } - void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { + void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; this->transCli = rpcOpen(&rpcInit_); } - void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } - void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; @@ -88,6 +110,15 @@ class Client { SemWait(); *resp = this->resp; } + void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { + if (req->handle != NULL) { + rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT); + req->handle = NULL; + } + SendAndRecv(req, resp); + } + + void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {} void SemWait() { tsem_wait(&this->sem); } void SemPost() { tsem_post(&this->sem); } void Reset() {} @@ -105,19 +136,20 @@ class Client { class Server { public: Server() { - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = port; - rpcInit.label = (char *)label; - rpcInit.numOfThreads = 5; - rpcInit.cfp = processReq; - rpcInit.user = (char *)user; - rpcInit.secret = (char *)secret; - rpcInit.ckey = (char *)ckey; - rpcInit.spi = 1; - rpcInit.connType = TAOS_CONN_SERVER; + memset(&rpcInit_, 0, sizeof(rpcInit_)); + rpcInit_.localPort = port; + rpcInit_.label = (char *)label; + rpcInit_.numOfThreads = 5; + rpcInit_.cfp = processReq; + rpcInit_.efp = NULL; + rpcInit_.user = (char *)user; + rpcInit_.secret = (char *)secret; + rpcInit_.ckey = (char *)ckey; + rpcInit_.spi = 1; + rpcInit_.connType = TAOS_CONN_SERVER; } void Start() { - this->transSrv = rpcOpen(&this->rpcInit); + this->transSrv = rpcOpen(&this->rpcInit_); taosMsleep(1000); } void Stop() { @@ -125,6 +157,16 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } + void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + this->Stop(); + rpcInit_.efp = efp; + this->Start(); + } + void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + this->Stop(); + rpcInit_.cfp = cfp; + this->Start(); + } void Restart() { this->Stop(); this->Start(); @@ -135,7 +177,7 @@ class Server { } private: - SRpcInit rpcInit; + SRpcInit rpcInit_; void * transSrv; }; static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -146,6 +188,20 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { rpcMsg.code = 0; rpcSendResponse(&rpcMsg); } + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + for (int i = 0; i < 9; i++) { + rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER); + } + for (int i = 0; i < 10; i++) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + } +} // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { Client *client = (Client *)parent; @@ -170,7 +226,7 @@ static void initEnv() { tsAsyncLog = 0; std::string path = "/tmp/transport"; - taosRemoveDir(path.c_str()); + // taosRemoveDir(path.c_str()); taosMkDir(path.c_str()); tstrncpy(tsLogDir, path.c_str(), PATH_MAX); @@ -178,6 +234,7 @@ static void initEnv() { printf("failed to init log file\n"); } } + class TransObj { public: TransObj() { @@ -188,22 +245,39 @@ class TransObj { srv->Start(); } - void RestartCli(CB cb) { cli->Restart(cb); } - void StopSrv() { srv->Stop(); } + void RestartCli(CB cb) { + // + cli->Restart(cb); + } + void StopSrv() { + // + srv->Stop(); + } void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setPersistFP(pfp); + cli->SetPersistFP(pfp); } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setConstructFP(mfp); + cli->SetConstructFP(mfp); } - void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setPAndMFp(pfp, mfp); + cli->SetPAndMFp(pfp, mfp); + } + // call when link broken, and notify query or fetch stop + void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + //////// + srv->SetExceptFp(efp); + } + void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + /////// + srv->SetSrvContinueSend(cfp); } void RestartSrv() { srv->Restart(); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } + ~TransObj() { delete cli; delete srv; @@ -256,20 +330,97 @@ TEST_F(TransEnv, 02StopServer) { tr->cliSendAndRecv(&req, &resp); assert(resp.code != 0); } -TEST_F(TransEnv, clientUserDefined) {} +TEST_F(TransEnv, clientUserDefined) { + tr->RestartSrv(); + for (int i = 0; i < 10; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 0; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + assert(resp.code == 0); + } + + ////////////////// +} TEST_F(TransEnv, cliPersistHandle) { - // impl late -} -TEST_F(TransEnv, srvPersistHandle) { - // impl later + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i == 5) { + std::cout << "stop server" << std::endl; + tr->StopSrv(); + } + if (i >= 6) { + EXPECT_TRUE(resp.code != 0); + } + } + ////////////////// } -TEST_F(TransEnv, srvPersisHandleExcept) { +TEST_F(TransEnv, cliReleaseHandle) { + tr->SetCliPersistFp(cliPersistHandle); + + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecvNoHandle(&req, &resp); + // if (i == 5) { + // std::cout << "stop server" << std::endl; + // tr->StopSrv(); + //} + // if (i >= 6) { + EXPECT_TRUE(resp.code == 0); + //} + } + ////////////////// +} +TEST_F(TransEnv, cliReleaseHandleExcept) { + tr->SetCliPersistFp(cliPersistHandle); + + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecvNoHandle(&req, &resp); + if (i == 5) { + std::cout << "stop server" << std::endl; + tr->StopSrv(); + } + if (i >= 6) { + EXPECT_TRUE(resp.code != 0); + } + } + ////////////////// +} +TEST_F(TransEnv, srvContinueSend) { + tr->SetSrvContinueSend(processContinueSend); + for (int i = 0; i < 10; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(2000); +} + +TEST_F(TransEnv, srvPersistHandleExcept) { // conn breken // } -TEST_F(TransEnv, cliPersisHandleExcept) { +TEST_F(TransEnv, cliPersistHandleExcept) { // conn breken } @@ -282,3 +433,6 @@ TEST_F(TransEnv, multiSrvPersisHandleExcept) { TEST_F(TransEnv, queryExcept) { // query and conn is broken } +TEST_F(TransEnv, noResp) { + // no resp +}