Merge pull request #10768 from taosdata/feature/supportQuery
handle except and update UT
This commit is contained in:
commit
07dda4c8f8
|
@ -73,7 +73,7 @@ typedef struct SRpcInit {
|
|||
void *(*mfp)(void *parent, tmsg_t msgType);
|
||||
|
||||
// call back to handle except when query/fetch in progress
|
||||
void (*efp)(void *parent, tmsg_t msgType);
|
||||
bool (*efp)(void *parent, tmsg_t msgType);
|
||||
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
|
|
@ -65,7 +65,7 @@ typedef struct {
|
|||
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||
bool (*pfp)(void* parent, tmsg_t msgType);
|
||||
void* (*mfp)(void* parent, tmsg_t msgType);
|
||||
void (*efp)(void* parent, tmsg_t msgType);
|
||||
bool (*efp)(void* parent, tmsg_t msgType);
|
||||
|
||||
int32_t refCount;
|
||||
void* parent;
|
||||
|
|
|
@ -106,6 +106,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg);
|
|||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||
static void uvStartSendResp(SSrvMsg* msg);
|
||||
|
||||
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);
|
||||
|
||||
static void destroySmsg(SSrvMsg* smsg);
|
||||
// check whether already read complete packet
|
||||
static SSrvConn* createConn(void* hThrd);
|
||||
|
@ -233,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) {
|
|||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||
|
||||
STrans* pTransInst = (STrans*)p->shandle;
|
||||
(*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||
// auth
|
||||
// validate msg type
|
||||
|
@ -261,13 +263,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||
if (nread < 0) {
|
||||
conn->broken = true;
|
||||
transUnrefSrvHandle(conn);
|
||||
uvNotifyLinkBrokenToApp(conn);
|
||||
|
||||
// if (conn->ref > 1) {
|
||||
// conn->ref++; // ref > 1 signed that write is in progress
|
||||
// STrans* pTransInst = conn->pTransInst;
|
||||
// if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) {
|
||||
//}
|
||||
// tError("server conn %p read error: %s", conn, uv_err_name(nread));
|
||||
// destroyConn(conn, true);
|
||||
transUnrefSrvHandle(conn);
|
||||
}
|
||||
}
|
||||
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||
|
@ -373,6 +374,17 @@ static void uvStartSendResp(SSrvMsg* smsg) {
|
|||
uvStartSendRespInternal(smsg);
|
||||
return;
|
||||
}
|
||||
|
||||
static void uvNotifyLinkBrokenToApp(SSrvConn* conn) {
|
||||
STrans* pTransInst = conn->pTransInst;
|
||||
if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) {
|
||||
STransMsg transMsg = {0};
|
||||
transMsg.msgType = conn->inType;
|
||||
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
// transRefSrvHandle(conn);
|
||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, 0);
|
||||
}
|
||||
}
|
||||
static void destroySmsg(SSrvMsg* smsg) {
|
||||
if (smsg == NULL) {
|
||||
return;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include "rpcLog.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlog.h"
|
||||
|
@ -48,7 +49,9 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) {
|
|||
return NULL;
|
||||
}
|
||||
// server except
|
||||
static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {}
|
||||
static bool handleExcept(void *parent, tmsg_t msgType) {
|
||||
return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP;
|
||||
}
|
||||
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
|
||||
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
|
@ -83,6 +86,10 @@ class Client {
|
|||
rpcInit_.cfp = cb;
|
||||
this->transCli = rpcOpen(&rpcInit_);
|
||||
}
|
||||
void Stop() {
|
||||
rpcClose(this->transCli);
|
||||
this->transCli = NULL;
|
||||
}
|
||||
void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) {
|
||||
rpcClose(this->transCli);
|
||||
rpcInit_.pfp = pfp;
|
||||
|
@ -157,7 +164,7 @@ class Server {
|
|||
rpcClose(this->transSrv);
|
||||
this->transSrv = NULL;
|
||||
}
|
||||
void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||
void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
|
||||
this->Stop();
|
||||
rpcInit_.efp = efp;
|
||||
this->Start();
|
||||
|
@ -207,6 +214,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
Client *client = (Client *)parent;
|
||||
client->SetResp(pMsg);
|
||||
client->SemPost();
|
||||
tDebug("received resp");
|
||||
}
|
||||
|
||||
static void initEnv() {
|
||||
|
@ -266,7 +274,7 @@ class TransObj {
|
|||
cli->SetPAndMFp(pfp, mfp);
|
||||
}
|
||||
// call when link broken, and notify query or fetch stop
|
||||
void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) {
|
||||
void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) {
|
||||
////////
|
||||
srv->SetExceptFp(efp);
|
||||
}
|
||||
|
@ -275,6 +283,10 @@ class TransObj {
|
|||
srv->SetSrvContinueSend(cfp);
|
||||
}
|
||||
void RestartSrv() { srv->Restart(); }
|
||||
void cliStop() {
|
||||
///////
|
||||
cli->Stop();
|
||||
}
|
||||
void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); }
|
||||
void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); }
|
||||
|
||||
|
@ -417,20 +429,31 @@ TEST_F(TransEnv, srvContinueSend) {
|
|||
}
|
||||
|
||||
TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||
// conn breken
|
||||
tr->SetSrvContinueSend(processContinueSend);
|
||||
tr->SetCliPersistFp(cliPersistHandle);
|
||||
SRpcMsg resp = {0};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
SRpcMsg req = {.handle = resp.handle};
|
||||
req.msgType = 1;
|
||||
req.pCont = rpcMallocCont(10);
|
||||
req.contLen = 10;
|
||||
tr->cliSendAndRecv(&req, &resp);
|
||||
if (i > 2) {
|
||||
tr->cliStop();
|
||||
break;
|
||||
}
|
||||
}
|
||||
taosMsleep(2000);
|
||||
// conn broken
|
||||
//
|
||||
}
|
||||
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||
// conn breken
|
||||
}
|
||||
|
||||
TEST_F(TransEnv, multiCliPersisHandleExcept) {
|
||||
// conn breken
|
||||
}
|
||||
TEST_F(TransEnv, multiSrvPersisHandleExcept) {
|
||||
// conn breken
|
||||
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||
// conn broken
|
||||
}
|
||||
TEST_F(TransEnv, queryExcept) {
|
||||
tr->SetSrvExceptFp(handleExcept);
|
||||
|
||||
// query and conn is broken
|
||||
}
|
||||
TEST_F(TransEnv, noResp) {
|
||||
|
|
Loading…
Reference in New Issue