refactor transport

This commit is contained in:
Yihao Deng 2024-07-03 06:46:38 +00:00
parent 93d391beb1
commit 620f4c5806
1 changed files with 66 additions and 15 deletions

View File

@ -85,6 +85,7 @@ typedef struct SCliConn {
int64_t refId; int64_t refId;
int32_t seq; int32_t seq;
int32_t shareCnt;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -230,6 +231,10 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key);
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn);
static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn);
// thread obj // thread obj
static SCliThrd* createThrdObj(void* trans); static SCliThrd* createThrdObj(void* trans);
static void destroyThrdObj(SCliThrd* pThrd); static void destroyThrdObj(SCliThrd* pThrd);
@ -420,6 +425,15 @@ SCliMsg* cliFindMsgBySeqnum(SCliConn* conn, int32_t seqNum) {
} }
return pMsg; return pMsg;
} }
bool cliShouldAddConnToPool(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
bool empty = transQueueEmpty(&conn->cliMsgs);
if (empty) {
delConnFromHeapCache(pThrd->connHeapCache, conn);
}
return empty;
}
void cliHandleResp_shareConn(SCliConn* conn) { void cliHandleResp_shareConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
@ -1113,12 +1127,42 @@ static void cliSendCb(uv_write_t* req, int status) {
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
int32_t code = -1;
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) {
SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, i);
ASSERT(pMsg->type != Release);
ASSERT(REQUEST_NO_RESP(&pMsg->msg) == 0);
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
transMsg.code = code == -1 ? (conn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL;
transMsg.info.cliVer = pTransInst->compatibilityVer;
transMsg.info.ahandle = pCtx->ahandle;
int32_t ret = cliAppCb(conn, &transMsg, pMsg);
}
// SCliConn* conn = req->data;
// if (status != 0) {
// tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
// return;
// }
// uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// taosMemoryFree(req);
}
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
conn->shareCnt -= 1;
if (status != 0) { if (status != 0) {
tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) { if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
cliHandleExcept(conn); cliHandleBatch_shareConnExcept(conn);
} }
return; return;
} }
@ -1189,6 +1233,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
} }
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn; req->data = pConn;
pConn->shareCnt += 1;
tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, tDebug("%s conn %p start to send batch msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size,
totalLen); totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb);
@ -1359,7 +1404,12 @@ static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip);
if (ipaddr == (uint32_t)(-1)) { if (ipaddr == (uint32_t)(-1)) {
cliResetConnTimer(conn); cliResetConnTimer(conn);
if (conn->pBatch != NULL) {
cliHandleFastFail(conn, -1); cliHandleFastFail(conn, -1);
} else {
cliHandleBatch_shareConnExcept(conn);
}
return; return;
} }
@ -1774,19 +1824,17 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
return pConn; return pConn;
} }
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, char* key, SCliConn* pConn) { static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, key); SHeap* p = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr);
if (p == NULL) { if (p == NULL) {
return 0; return 0;
} }
return transHeapInsert(p, pConn); return transHeapInsert(p, pConn);
} }
static void delConnFromHeapCache(SHashObj* pConnHeapCache, char* key, SCliConn* pConn) { static void delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
size_t klen = strlen(key); SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr));
SHeap* p = taosHashGet(pConnHeapCache, key, klen);
if (p == NULL) { if (p == NULL) {
tDebug("failed to get heap cache for key:%s, no need to del", key); tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr);
return; return;
} }
int ret = transHeapDelete(p, pConn); int ret = transHeapDelete(p, pConn);
@ -1814,19 +1862,22 @@ void cliHandleReq__shareConn(SCliMsg* pMsg, SCliThrd* pThrd) {
bool ignore = false; bool ignore = false;
pConn = getConnFromPool(pThrd, addr, &ignore); pConn = getConnFromPool(pThrd, addr, &ignore);
if (pConn != NULL) { if (pConn != NULL) {
addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->cliMsgs, pMsg);
return cliSendBatch_shareConn(pConn); return cliSendBatch_shareConn(pConn);
} }
} else {
tGTrace("%s conn %p get from heap cache", CONN_GET_INST_LABEL(pConn), pConn);
transQueuePush(&pConn->cliMsgs, pMsg);
cliSendBatch_shareConn(pConn);
return;
} }
pConn = cliCreateConn(pThrd); pConn = cliCreateConn(pThrd);
pConn->dstAddr = taosStrdup(addr); pConn->dstAddr = taosStrdup(addr);
code = addConnToHeapCache(pThrd->connHeapCache, addr, pConn); code = addConnToHeapCache(pThrd->connHeapCache, pConn);
if (code != 0) {
// do nothing
} else {
// do nothing
}
transQueuePush(&pConn->cliMsgs, pMsg);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet)); return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pMsg->ctx->epSet), EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet));
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {