opt transport
This commit is contained in:
parent
59946626d9
commit
ccea816fb9
|
@ -443,6 +443,7 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
|
||||||
void transDestroyExHandle(void* handle);
|
void transDestroyExHandle(void* handle);
|
||||||
|
|
||||||
int32_t transGetRefMgt();
|
int32_t transGetRefMgt();
|
||||||
|
int32_t transGetSvrRefMgt();
|
||||||
int32_t transGetInstMgt();
|
int32_t transGetInstMgt();
|
||||||
int32_t transGetSyncMsgMgt();
|
int32_t transGetSyncMsgMgt();
|
||||||
|
|
||||||
|
|
|
@ -464,7 +464,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
if (pHead->noResp == 1) {
|
if (pHead->noResp == 1) {
|
||||||
transMsg.info.handle = NULL;
|
transMsg.info.handle = NULL;
|
||||||
} else {
|
} else {
|
||||||
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
|
transMsg.info.handle = (void*)transAcquireExHandle(transGetSvrRefMgt(), pConn->refId);
|
||||||
acquire = 1;
|
acquire = 1;
|
||||||
}
|
}
|
||||||
transMsg.info.refId = pConn->refId;
|
transMsg.info.refId = pConn->refId;
|
||||||
|
@ -490,7 +490,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
pConnInfo->clientPort = pConn->port;
|
pConnInfo->clientPort = pConn->port;
|
||||||
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
||||||
|
|
||||||
if (acquire) transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
if (acquire) transReleaseExHandle(transGetSvrRefMgt(), pConn->refId);
|
||||||
|
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||||
return true;
|
return true;
|
||||||
|
@ -777,15 +777,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
|
|
||||||
SExHandle* exh1 = transMsg.info.handle;
|
SExHandle* exh1 = transMsg.info.handle;
|
||||||
int64_t refId = transMsg.info.refId;
|
int64_t refId = transMsg.info.refId;
|
||||||
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
|
SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
|
||||||
if (exh2 == NULL || exh1 != exh2) {
|
if (exh2 == NULL || exh1 != exh2) {
|
||||||
tTrace("handle except msg %p, ignore it", exh1);
|
tTrace("handle except msg %p, ignore it", exh1);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
msg->pConn = exh1->handle;
|
msg->pConn = exh1->handle;
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -881,15 +881,15 @@ static void uvPrepareCb(uv_prepare_t* handle) {
|
||||||
|
|
||||||
SExHandle* exh1 = transMsg.info.handle;
|
SExHandle* exh1 = transMsg.info.handle;
|
||||||
int64_t refId = transMsg.info.refId;
|
int64_t refId = transMsg.info.refId;
|
||||||
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
|
SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
|
||||||
if (exh2 == NULL || exh1 != exh2) {
|
if (exh2 == NULL || exh1 != exh2) {
|
||||||
tTrace("handle except msg %p, ignore it", exh1);
|
tTrace("handle except msg %p, ignore it", exh1);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
msg->pConn = exh1->handle;
|
msg->pConn = exh1->handle;
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1222,14 +1222,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
|
||||||
|
|
||||||
exh->handle = pConn;
|
exh->handle = pConn;
|
||||||
exh->pThrd = pThrd;
|
exh->pThrd = pThrd;
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
|
||||||
if (exh->refId < 0) {
|
if (exh->refId < 0) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
|
|
||||||
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
|
||||||
if (pSelf != exh) {
|
if (pSelf != exh) {
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
|
||||||
}
|
}
|
||||||
|
@ -1291,8 +1291,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
|
||||||
}
|
}
|
||||||
static int32_t reallocConnRef(SSvrConn* conn) {
|
static int32_t reallocConnRef(SSvrConn* conn) {
|
||||||
if (conn->refId > 0) {
|
if (conn->refId > 0) {
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
|
||||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
(void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
|
||||||
}
|
}
|
||||||
// avoid app continue to send msg on invalid handle
|
// avoid app continue to send msg on invalid handle
|
||||||
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
||||||
|
@ -1302,14 +1302,14 @@ static int32_t reallocConnRef(SSvrConn* conn) {
|
||||||
|
|
||||||
exh->handle = conn;
|
exh->handle = conn;
|
||||||
exh->pThrd = conn->hostThrd;
|
exh->pThrd = conn->hostThrd;
|
||||||
exh->refId = transAddExHandle(transGetRefMgt(), exh);
|
exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
|
||||||
if (exh->refId < 0) {
|
if (exh->refId < 0) {
|
||||||
taosMemoryFree(exh);
|
taosMemoryFree(exh);
|
||||||
return TSDB_CODE_REF_INVALID_ID;
|
return TSDB_CODE_REF_INVALID_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
QUEUE_INIT(&exh->q);
|
QUEUE_INIT(&exh->q);
|
||||||
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
|
SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
|
||||||
if (pSelf != exh) {
|
if (pSelf != exh) {
|
||||||
tError("conn %p failed to acquire handle", conn);
|
tError("conn %p failed to acquire handle", conn);
|
||||||
taosMemoryFree(exh);
|
taosMemoryFree(exh);
|
||||||
|
@ -1328,8 +1328,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
}
|
}
|
||||||
SWorkThrd* thrd = conn->hostThrd;
|
SWorkThrd* thrd = conn->hostThrd;
|
||||||
|
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
|
||||||
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
|
(void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
|
||||||
|
|
||||||
STrans* pTransInst = thrd->pTransInst;
|
STrans* pTransInst = thrd->pTransInst;
|
||||||
tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
|
tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
|
||||||
|
@ -1759,15 +1759,15 @@ int32_t transReleaseSrvHandle(void* handle) {
|
||||||
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
||||||
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
_return1:
|
_return1:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
|
@ -1810,17 +1810,17 @@ int32_t transSendResponse(const STransMsg* msg) {
|
||||||
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
||||||
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_return1:
|
_return1:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
|
@ -1855,17 +1855,17 @@ int32_t transRegisterMsg(const STransMsg* msg) {
|
||||||
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
||||||
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
|
||||||
destroySmsg(m);
|
destroySmsg(m);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_return1:
|
_return1:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
(void)transReleaseExHandle(transGetRefMgt(), refId);
|
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
|
||||||
return code;
|
return code;
|
||||||
_return2:
|
_return2:
|
||||||
tDebug("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
|
|
Loading…
Reference in New Issue