Merge pull request #10783 from taosdata/feature/supportQuery

Feature/support query
This commit is contained in:
Yihao Deng 2022-03-16 23:09:14 +08:00 committed by GitHub
commit a6e2e85367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 9 deletions

View File

@ -38,11 +38,14 @@ typedef struct SRpcConnInfo {
typedef struct SRpcMsg { typedef struct SRpcMsg {
tmsg_t msgType; tmsg_t msgType;
tmsg_t expectMsgType;
void * pCont; void * pCont;
int contLen; int contLen;
int32_t code; int32_t code;
void * handle; // rpc handle returned to app void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client void * ahandle; // app handle set by client
int noResp; // has response or not(default 0 indicate resp);
} SRpcMsg; } SRpcMsg;
typedef struct SRpcInit { typedef struct SRpcInit {

View File

@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} while (0) } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
bool cliMayContinueSendMsg(SCliConn* conn) { bool cliMaySendCachedMsg(SCliConn* conn) {
if (taosArrayGetSize(conn->cliMsgs) > 0) { if (taosArrayGetSize(conn->cliMsgs) > 0) {
cliSend(conn); cliSend(conn);
return true; return true;
@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) {
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
if (cliMayContinueSendMsg(conn) == true) { if (cliMaySendCachedMsg(conn) == true) {
return; return;
} }
@ -441,7 +443,25 @@ static void cliDestroy(uv_handle_t* handle) {
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
free(conn); free(conn);
} }
static bool cliHandleNoResp(SCliConn* conn) {
bool res = false;
SArray* msgs = conn->cliMsgs;
if (taosArrayGetSize(msgs) > 0) {
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
taosArrayRemove(msgs, 0);
destroyCmsg(pMsg);
res = true;
}
if (res == true) {
if (cliMaySendCachedMsg(conn) == false) {
SCliThrdObj* thrd = conn->hostThrd;
addConnToPool(thrd->pool, conn);
}
}
}
return res;
}
static void cliSendCb(uv_write_t* req, int status) { static void cliSendCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
@ -452,6 +472,10 @@ static void cliSendCb(uv_write_t* req, int status) {
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
if (cliHandleNoResp(pConn) == true) {
tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
return;
}
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
@ -489,6 +513,7 @@ void cliSend(SCliConn* pConn) {
msgLen += sizeof(STransUserMsg); msgLen += sizeof(STransUserMsg);
} }
pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);

View File

@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
transMsg.ahandle = NULL; transMsg.ahandle = NULL;
transMsg.handle = pConn; transMsg.handle = NULL;
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
transRefSrvHandle(pConn);
if (pHead->resflag == 0) {
transRefSrvHandle(pConn);
transMsg.handle = pConn;
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), transMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
}
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = (STrans*)p->shandle;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);

View File

@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) {
tr->SetCliPersistFp(cliPersistHandle); tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle}; SRpcMsg req = {.handle = resp.handle, .noResp = 0};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
// conn broken // conn broken
// //
} }
TEST_F(TransEnv, cliPersistHandleExcept) {
tr->SetSrvContinueSend(processContinueSend);
tr->SetCliPersistFp(cliPersistHandle);
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
if (i > 2) {
tr->StopSrv();
break;
}
}
taosMsleep(2000);
// conn broken
//
}
TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken // conn broken
@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) {
// query and conn is broken // query and conn is broken
} }
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.noResp = 1};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
}
taosMsleep(2000);
// no resp // no resp
} }