refactor transport

This commit is contained in:
yihaoDeng 2024-08-26 10:13:06 +08:00
parent 46447c2bb2
commit 4f5b6eb2c7
1 changed files with 57 additions and 36 deletions

View File

@ -90,6 +90,8 @@ typedef struct SCliConn {
int64_t refId; int64_t refId;
int32_t seq; int32_t seq;
int32_t shareCnt; int32_t shareCnt;
int8_t registered;
} SCliConn; } SCliConn;
typedef struct SCliReq { typedef struct SCliReq {
@ -198,7 +200,7 @@ void cliResetConnTimer(SCliConn* conn);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn); static int32_t cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
@ -324,6 +326,17 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p);
} \ } \
} while (0) } while (0)
static int32_t cliConnFindToSendMsg(SCliConn* pConn, SCliReq** pReq) {
int32_t code = 0;
for (int32_t i = 0; i < transQueueSize(&pConn->reqs); i++) {
SCliReq* p = transQueueGet(&pConn->reqs, i);
if (p->sent == 0) {
*pReq = p;
return 0;
}
}
return TSDB_CODE_OUT_OF_RANGE;
}
#define CONN_SET_PERSIST_BY_APP(conn) \ #define CONN_SET_PERSIST_BY_APP(conn) \
do { \ do { \
if (conn->status == ConnNormal) { \ if (conn->status == ConnNormal) { \
@ -377,7 +390,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->reqs)) { if (!transQueueEmpty(&conn->reqs)) {
SCliReq* pCliMsg = NULL; SCliReq* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(conn); CONN_GET_NEXT_SENDMSG(conn);
cliSend(conn); (void)cliSend(conn);
return true; return true;
} }
return false; return false;
@ -404,7 +417,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
(void)transQueuePush(&conn->reqs, t); (void)transQueuePush(&conn->reqs, t);
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetRefMgt(), refId);
cliSend(conn); (void)cliSend(conn);
return true; return true;
} }
taosWUnLockLatch(&exh->latch); taosWUnLockLatch(&exh->latch);
@ -1037,7 +1050,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
(void)transQueuePush(&conn->reqs, pReq); (void)transQueuePush(&conn->reqs, pReq);
conn->status = ConnNormal; conn->status = ConnNormal;
cliSend(conn); (void)cliSend(conn);
return; return;
} }
@ -1195,6 +1208,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
if (code != TSDB_CODE_RPC_ASYNC_IN_PROCESS) { if (code != TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
cliRmReqFromConn(pConn, NULL); cliRmReqFromConn(pConn, NULL);
cliDestroyConn2(pConn); cliDestroyConn2(pConn);
delConnFromHeapCache(pThrd->connHeapCache, pConn);
return code; return code;
} else { } else {
} }
@ -1202,7 +1216,15 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
} }
// not any ref, // not any ref,
static int32_t cliDestroyConn2(SCliConn* conn) { return 0; } static int32_t cliDestroyConn2(SCliConn* conn) {
if (conn->registered == 0) {
taosMemoryFree(conn->dstAddr);
taosMemoryFree(conn);
} else {
cliDestroyConn(conn, true);
}
return 0;
}
static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn) { static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn) {
char addr[TSDB_FQDN_LEN + 64] = {0}; char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
@ -1559,37 +1581,33 @@ _exception:
return; return;
} }
// int32_t cliSend2(SCliConn* pConn) {}
int32_t cliSend2(SCliConn* pConn) {
}
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0; int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg); transQueuePush(&pConn->reqs, pCliMsg);
cliSend(pConn); code = cliSend(pConn);
return code;
return 0;
} }
void cliSend(SCliConn* pConn) { int32_t cliSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
SCliReq* pCliReq = NULL;
int32_t code = cliConnFindToSendMsg(pConn, &pCliReq);
if (transQueueEmpty(&pConn->reqs)) { if (code != 0) {
tError("%s conn %p not msg to send", pInst->label, pConn); return code;
cliHandleExcept(pConn, -1);
return;
} }
SCliReq* pCliMsg = NULL; SReqCtx* pCtx = pCliReq->ctx;
CONN_GET_NEXT_SENDMSG(pConn);
pCliMsg->sent = 1;
SReqCtx* pCtx = pCliMsg->ctx; STransMsg* pReq = (STransMsg*)(&pCliReq->msg);
STransMsg* pReq = (STransMsg*)(&pCliMsg->msg);
if (pReq->pCont == 0) { if (pReq->pCont == 0) {
pReq->pCont = (void*)rpcMallocCont(0); pReq->pCont = (void*)rpcMallocCont(0);
if (pReq->pCont == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDebug("malloc memory: %p", pReq->pCont); tDebug("malloc memory: %p", pReq->pCont);
pReq->contLen = 0; pReq->contLen = 0;
} }
@ -1603,7 +1621,7 @@ void cliSend(SCliConn* pConn) {
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
pHead->msgType = pReq->msgType; pHead->msgType = pReq->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliReq) ? 1 : 0;
memcpy(pHead->user, pInst->user, strlen(pInst->user)); memcpy(pHead->user, pInst->user, strlen(pInst->user));
pHead->traceId = pReq->info.traceId; pHead->traceId = pReq->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
@ -1636,18 +1654,19 @@ void cliSend(SCliConn* pConn) {
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
cliHandleExcept(pConn, -1); cliHandleExcept(pConn, -1);
return; return TSDB_CODE_OUT_OF_MEMORY;
} }
pCliReq->sent = 1;
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) { if (status != 0) {
tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType),
uv_err_name(status)); uv_err_name(status));
cliHandleExcept(pConn, -1); cliHandleExcept(pConn, -1);
return TSDB_CODE_THIRDPARTY_ERROR;
} }
return;
_RETURN: return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
return;
} }
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
@ -1720,7 +1739,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
} }
conn->registered = 1;
return TSDB_CODE_RPC_ASYNC_IN_PROCESS; return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
_exception1: _exception1:
@ -1889,7 +1908,7 @@ void cliConnCb(uv_connect_t* req, int status) {
return cliSendBatch_shareConn(pConn); return cliSendBatch_shareConn(pConn);
} }
return cliSend(pConn); (void)cliSend(pConn);
} }
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
@ -1950,7 +1969,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
if (!transQueuePush(&conn->reqs, pReq)) { if (!transQueuePush(&conn->reqs, pReq)) {
return; return;
} }
cliSend(conn); (void)cliSend(conn);
} else { } else {
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
destroyReq(pReq); destroyReq(pReq);
@ -2211,7 +2230,7 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
if (code == TSDB_CODE_RPC_MAX_SESSIONS) { if (code == TSDB_CODE_RPC_MAX_SESSIONS) {
TAOS_CHECK_GOTO(code, &lino, _exception); TAOS_CHECK_GOTO(code, &lino, _exception);
} else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
// do nothing, notifyCb // do nothing, notiy
return; return;
} else { } else {
code = cliSendReq(pConn, pReq); code = cliSendReq(pConn, pReq);
@ -3526,6 +3545,7 @@ _RETURN1:
pReq->pCont = NULL; pReq->pCont = NULL;
return code; return code;
} }
int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) { int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) {
int32_t code = 0; int32_t code = 0;
tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t));
@ -3562,6 +3582,7 @@ _EXIT:
taosMemoryFree(pSyncMsg); taosMemoryFree(pSyncMsg);
return code; return code;
} }
int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) { int32_t timeoutMs) {
int32_t code = 0; int32_t code = 0;