Merge pull request #10849 from taosdata/feature/supportQ

Feature/support q
This commit is contained in:
Yihao Deng 2022-03-20 21:05:27 +08:00 committed by GitHub
commit 524edc1552
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 181 deletions

View File

@ -144,7 +144,7 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle); (*taosUnRefHandle[type])(handle);
} }
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { rpcSendResponse(msg); } void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); }
void rpcReleaseHandle(void* handle, int8_t type) { void rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle); (*transReleaseHandle[type])(handle);

View File

@ -138,7 +138,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
conn->status = ConnRelease; \ conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \ transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \ transFreeMsg(transContFromHead((char*)head)); \
tDebug("cli conn %p receive release request", conn); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
while (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
if (T_REF_VAL_GET(conn) == 1) { \ if (T_REF_VAL_GET(conn) == 1) { \
SCliThrdObj* thrd = conn->hostThrd; \ SCliThrdObj* thrd = conn->hostThrd; \
addConnToPool(thrd->pool, conn); \ addConnToPool(thrd->pool, conn); \
@ -147,10 +150,9 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \ } \
} while (0) } while (0)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ #define CONN_HANDLE_THREAD_QUIT(thrd) \
do { \ do { \
if (thrd->quit) { \ if (thrd->quit) { \
cliHandleExcept(conn); \
return; \ return; \
} \ } \
} while (0) } while (0)
@ -375,6 +377,9 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return conn; return conn;
} }
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrdObj* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
char key[128] = {0}; char key[128] = {0};
transCtxDestroy(&conn->ctx); transCtxDestroy(&conn->ctx);
@ -539,7 +544,6 @@ void cliSend(SCliConn* pConn) {
} }
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0; pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0; pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
@ -594,12 +598,17 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn);
if (T_REF_VAL_GET(conn) == 2) {
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
taosArrayPush(conn->cliMsgs, &pMsg); taosArrayPush(conn->cliMsgs, &pMsg);
if (taosArrayGetSize(conn->cliMsgs) >= 2) { if (taosArrayGetSize(conn->cliMsgs) >= 2) {
return; // send one by one return; // send one by one
} }
cliSend(conn); cliSend(conn);
} else {
// conn already broken down
transUnrefCliHandle(conn);
}
} }
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
@ -836,11 +845,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
if (index == -1) { if (index == -1) {
index = cliRBChoseIdx(pTransInst); index = cliRBChoseIdx(pTransInst);
} }
int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later
}
tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port);
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
@ -851,9 +856,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
if (ctx != NULL) { if (ctx != NULL) {
pCtx->appCtx = *ctx; pCtx->appCtx = *ctx;
} }
assert(pTransInst->connType == TAOS_CONN_CLIENT); assert(pTransInst->connType == TAOS_CONN_CLIENT);
// atomic or not
SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
@ -862,6 +865,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
cliMsg->type = Normal; cliMsg->type = Normal;
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index];
tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }

View File

@ -114,10 +114,6 @@ static const char* notify = "a";
return; \ return; \
} \ } \
} while (0) } while (0)
// refactor later
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen);
static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
@ -144,9 +140,9 @@ static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/)
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleSendResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister}; uvHandleRegister};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
@ -165,59 +161,6 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
} }
static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) {
STransMsgHead* pHead = (STransMsgHead*)msg;
int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
// secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, secured link, no auth is required", pConn->info);
return 0;
}
if (!rpcIsReq(pHead->msgType)) {
// for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return 0;
}
}
code = 0;
if (pHead->spi == pConn->spi) {
// authentication
SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
} else {
if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
// tDebug("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_RPC_AUTH_FAILURE;
} else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client
// tTrace("%s, message is authenticated", pConn->info);
}
}
} else {
tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
}
return code;
}
// refers specifically to query or insert timeout // refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) { static void uvHandleActivityTimeout(uv_timer_t* handle) {
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
@ -225,35 +168,21 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
} }
static void uvHandleReq(SSrvConn* pConn) { static void uvHandleReq(SSrvConn* pConn) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &pConn->readBuf; SConnBuffer* pBuf = &pConn->readBuf;
p->msg = pBuf->buf; char* msg = pBuf->buf;
p->msgLen = pBuf->len; uint32_t msgLen = pBuf->len;
p->ip = 0;
p->port = 0;
p->shandle = pConn->pTransInst; //
p->thandle = pConn;
p->chandle = NULL;
STransMsgHead* pHead = (STransMsgHead*)p->msg; STransMsgHead* pHead = (STransMsgHead*)msg;
if (pHead->secured == 1) { if (pHead->secured == 1) {
STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg)); STransUserMsg* uMsg = (STransUserMsg*)((char*)msg + msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
} }
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0;
if (transDecompressMsg(NULL, 0, NULL)) {
// add compress later
// pHead = rpcDecompresSTransMsg(pHead);
} else {
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
if (pHead->secured == 1) { if (pHead->secured == 1) {
pHead->msgLen -= sizeof(STransUserMsg); pHead->msgLen -= sizeof(STransUserMsg);
} }
}
CONN_SHOULD_RELEASE(pConn, pHead); CONN_SHOULD_RELEASE(pConn, pHead);
@ -289,10 +218,9 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.handle = pConn; transMsg.handle = pConn;
} }
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
} }
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
@ -351,7 +279,13 @@ void uvOnSendCb(uv_write_t* req, int status) {
if (msg->type == Release && conn->status != ConnNormal) { if (msg->type == Release && conn->status != ConnNormal) {
conn->status = ConnNormal; conn->status = ConnNormal;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} else if (msg->type == Register && conn->status == ConnAcquire) { }
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) {
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0; conn->regArg.notifyCount = 0;
conn->regArg.init = 1; conn->regArg.init = 1;
conn->regArg.msg = msg->msg; conn->regArg.msg = msg->msg;
@ -360,17 +294,13 @@ void uvOnSendCb(uv_write_t* req, int status) {
(pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); (pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg)); memset(&conn->regArg, 0, sizeof(conn->regArg));
} }
taosArrayRemove(conn->srvMsgs, 0);
free(msg); free(msg);
return; } else {
}
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) {
tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs));
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
} }
} }
}
} else { } else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
conn->broken = true; conn->broken = true;
@ -387,7 +317,6 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} }
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later;
tTrace("server conn %p prepare to send resp", smsg->pConn); tTrace("server conn %p prepare to send resp", smsg->pConn);
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
@ -398,21 +327,27 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
} }
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
pHead->secured = pMsg->code == 0 ? 1 : 0; // // pHead->secured = pMsg->code == 0 ? 1 : 0; //
pHead->msgType = smsg->pConn->inType + 1; if (!pConn->secured) {
pConn->secured = pMsg->code == 0 ? 1 : 0;
}
pHead->secured = pConn->secured;
if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1;
} else {
pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType;
}
pHead->release = smsg->type == Release ? 1 : 0; pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
// add more info
char* msg = (char*)pHead; char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
if (transCompressMsg(msg, len, NULL)) {
// impl later
}
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port)); ntohs(pConn->locaddr.sin_port));
pHead->msgLen = htonl(len); pHead->msgLen = htonl(len);
wb->base = msg; wb->base = msg;
wb->len = len; wb->len = len;
} }
@ -438,13 +373,12 @@ static void uvStartSendResp(SSrvMsg* smsg) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
} }
if (taosArrayGetSize(pConn->srvMsgs) > 0) { taosArrayPush(pConn->srvMsgs, &smsg);
if (taosArrayGetSize(pConn->srvMsgs) > 1) {
tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr), tDebug("server conn %p send data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg);
return; return;
} }
taosArrayPush(pConn->srvMsgs, &smsg);
uvStartSendRespInternal(smsg); uvStartSendRespInternal(smsg);
return; return;
} }
@ -675,7 +609,7 @@ static SSrvConn* createConn(void* hThrd) {
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn); tTrace("server conn %p created", pConn);
memset(&pConn->regArg, 0, sizeof(pConn->regArg)); memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false; pConn->broken = false;
@ -697,7 +631,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
} }
conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
if (clear) { if (clear) {
tTrace("try to destroy conn %p", conn); tTrace("server conn %p to be destroyed", conn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
} }
@ -720,25 +654,6 @@ static void uvDestroyConn(uv_handle_t* handle) {
uv_stop(thrd->loop); uv_stop(thrd->loop);
} }
} }
static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
STransMsgHead* pHead = (STransMsgHead*)msg;
if (pConn->spi && pConn->secured == 0) {
// add auth part
pHead->spi = pConn->spi;
STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
// transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
return msgLen;
}
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj)); SServerObj* srv = calloc(1, sizeof(SServerObj));
@ -815,20 +730,19 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
} }
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
return; return;
} else if (conn->status == ConnRelease) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
// already release by server app, do nothing tDebug("server conn %p already released, ignore release-msg", conn);
} else if (conn->status == ConnNormal) {
// no nothing
// user should not call this rpcRelease handle;
} }
free(msg); destroySmsg(msg);
} }
void uvHandleSendResp(SSrvMsg* msg, SWorkThrdObj* thrd) { void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
// send msg to client // send msg to client
tDebug("server conn %p start to send resp", msg->pConn);
uvStartSendResp(msg); uvStartSendResp(msg);
} }
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn; SSrvConn* conn = msg->pConn;
tDebug("server conn %p register brokenlink callback", conn);
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (taosArrayGetSize(conn->srvMsgs) > 0) { if (taosArrayGetSize(conn->srvMsgs) > 0) {
taosArrayPush(conn->srvMsgs, &msg); taosArrayPush(conn->srvMsgs, &msg);
@ -901,12 +815,10 @@ void transUnrefSrvHandle(void* handle) {
return; return;
} }
int ref = T_REF_DEC((SSrvConn*)handle); int ref = T_REF_DEC((SSrvConn*)handle);
tDebug("handle %p ref count: %d", handle, ref); tDebug("server conn %p ref count: %d", handle, ref);
if (ref == 0) { if (ref == 0) {
destroyConn((SSrvConn*)handle, true); destroyConn((SSrvConn*)handle, true);
} }
// unref srv handle
} }
void transReleaseSrvHandle(void* handle) { void transReleaseSrvHandle(void* handle) {
@ -951,7 +863,7 @@ void transRegisterMsg(const STransMsg* msg) {
srvMsg->pConn = pConn; srvMsg->pConn = pConn;
srvMsg->msg = *msg; srvMsg->msg = *msg;
srvMsg->type = Register; srvMsg->type = Register;
tTrace("server conn %p start to send resp", pConn); tTrace("server conn %p start to register brokenlink callback", pConn);
transSendAsync(pThrd->asyncPool, &srvMsg->q); transSendAsync(pThrd->asyncPool, &srvMsg->q);
} }
int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {

View File

@ -35,6 +35,8 @@ int port = 7000;
typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
// client process; // client process;
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
@ -167,6 +169,35 @@ static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
} }
static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
rpcReleaseHandle(pMsg->handle, TAOS_CONN_SERVER);
}
static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
void *handle = pMsg->handle;
{
SRpcMsg rpcMsg1 = {0};
rpcMsg1.pCont = rpcMallocCont(100);
rpcMsg1.contLen = 100;
rpcMsg1.handle = handle;
rpcMsg1.code = 0;
rpcRegisterBrokenLinkArg(&rpcMsg1);
}
taosMsleep(10);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = rpcMallocCont(100);
rpcMsg.contLen = 100;
rpcMsg.handle = pMsg->handle;
rpcMsg.code = 0;
rpcSendResponse(&rpcMsg);
}
// client process; // client process;
static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
Client *client = (Client *)parent; Client *client = (Client *)parent;
@ -225,7 +256,7 @@ class TransObj {
srv->SetSrvContinueSend(cfp); srv->SetSrvContinueSend(cfp);
} }
void RestartSrv() { srv->Restart(); } void RestartSrv() { srv->Restart(); }
void cliStop() { void StopCli() {
/////// ///////
cli->Stop(); cli->Stop();
} }
@ -329,32 +360,35 @@ TEST_F(TransEnv, cliPersistHandle) {
////////////////// //////////////////
} }
TEST_F(TransEnv, cliReleaseHandle) { TEST_F(TransEnv, srvReleaseHandle) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { tr->SetSrvContinueSend(processReleaseHandleCb);
// tr->Restart(processReleaseHandleCb);
void *handle = NULL;
for (int i = 0; i < 1; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecvNoHandle(&req, &resp); tr->cliSendAndRecv(&req, &resp);
// tr->cliSendAndRecvNoHandle(&req, &resp);
EXPECT_TRUE(resp.code == 0); EXPECT_TRUE(resp.code == 0);
//}
} }
////////////////// //////////////////
} }
TEST_F(TransEnv, cliReleaseHandleExcept) { TEST_F(TransEnv, cliReleaseHandleExcept) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 10; i++) { for (int i = 0; i < 3; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecvNoHandle(&req, &resp); tr->cliSendAndRecv(&req, &resp);
if (i == 5) { if (i == 1) {
std::cout << "stop server" << std::endl; std::cout << "stop server" << std::endl;
tr->StopSrv(); tr->StopSrv();
} }
if (i >= 6) { if (i > 1) {
EXPECT_TRUE(resp.code != 0); EXPECT_TRUE(resp.code != 0);
} }
} }
@ -383,7 +417,7 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
req.contLen = 10; req.contLen = 10;
tr->cliSendAndRecv(&req, &resp); tr->cliSendAndRecv(&req, &resp);
if (i > 2) { if (i > 2) {
tr->cliStop(); tr->StopCli();
break; break;
} }
} }
@ -413,7 +447,23 @@ TEST_F(TransEnv, cliPersistHandleExcept) {
TEST_F(TransEnv, multiCliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) {
// conn broken // conn broken
} }
TEST_F(TransEnv, queryExcept) {} TEST_F(TransEnv, queryExcept) {
tr->SetSrvContinueSend(processRegisterFailure);
SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1};
req.msgType = 1;
req.pCont = rpcMallocCont(10);
req.contLen = 10;
tr->cliSendAndRecv(&req, &resp);
if (i == 2) {
rpcReleaseHandle(resp.handle, TAOS_CONN_CLIENT);
tr->StopCli();
break;
}
}
taosMsleep(4 * 1000);
}
TEST_F(TransEnv, noResp) { TEST_F(TransEnv, noResp) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {