refactor transport

This commit is contained in:
Yihao Deng 2024-07-23 11:09:13 +00:00
parent 4a5c2f661c
commit 4561a888b6
5 changed files with 116 additions and 40 deletions

View File

@ -1783,7 +1783,6 @@ void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) {
} }
} }
taosMemoryFree(pReq->pUserIpWhite); taosMemoryFree(pReq->pUserIpWhite);
// impl later
return; return;
} }
int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) {

View File

@ -299,10 +299,10 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
} \ } \
} while (0) } while (0)
int transInitBuffer(SConnBuffer* buf); int32_t transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int32_t transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int32_t transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf); bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);

View File

@ -197,7 +197,7 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr);
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
@ -1628,8 +1628,9 @@ static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, u
} }
return 0; return 0;
} }
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
// impl later // impl later
int32_t code = 0;
uint32_t addr = taosGetIpv4FromFqdn(fqdn); uint32_t addr = taosGetIpv4FromFqdn(fqdn);
if (addr != 0xffffffff) { if (addr != 0xffffffff) {
size_t len = strlen(fqdn); size_t len = strlen(fqdn);
@ -1639,10 +1640,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
tinet_ntoa(old, *v); tinet_ntoa(old, *v);
tinet_ntoa(new, addr); tinet_ntoa(new, addr);
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); code = taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
} }
} else {
code = TSDB_CODE_RPC_FQDN_ERROR; // TSDB_CODE_RPC_INVALID_FQDN;
} }
return; return code;
} }
static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) {

View File

@ -102,7 +102,7 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) {
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r; return r;
} }
int transInitBuffer(SConnBuffer* buf) { int32_t transInitBuffer(SConnBuffer* buf) {
buf->buf = taosMemoryCalloc(1, BUFFER_CAP); buf->buf = taosMemoryCalloc(1, BUFFER_CAP);
if (buf->buf == NULL) { if (buf->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;

View File

@ -164,7 +164,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
static int reallocConnRef(SSvrConn* conn); static int32_t reallocConnRef(SSvrConn* conn);
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd);
@ -798,7 +798,11 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
} }
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
reallocConnRef(pConn); int32_t code = reallocConnRef(pConn);
if (code != 0) {
destroyConn(pConn, true);
return true;
}
tTrace("conn %p received release request", pConn); tTrace("conn %p received release request", pConn);
STraceId traceId = pHead->traceId; STraceId traceId = pHead->traceId;
@ -969,23 +973,21 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
// uv_handle_type pending = uv_pipe_pending_type(pipe);
SSvrConn* pConn = createConn(pThrd); SSvrConn* pConn = createConn(pThrd);
if (pConn == NULL) {
uv_close((uv_handle_t*)q, NULL);
return;
}
pConn->pTransInst = pThrd->pTransInst; // pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ // /* init conn timer*/
// uv_timer_init(pThrd->loop, &pConn->pTimer); // // uv_timer_init(pThrd->loop, &pConn->pTimer);
// pConn->pTimer.data = pConn; // // pConn->pTimer.data = pConn;
// pConn->hostThrd = pThrd;
pConn->hostThrd = pThrd; // // init client handle
// pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
// init client handle // uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); // pConn->pTcp->data = pConn;
uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// transSetConnOption((uv_tcp_t*)pConn->pTcp);
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
@ -1169,38 +1171,84 @@ void* transWorkerThread(void* arg) {
} }
static FORCE_INLINE SSvrConn* createConn(void* hThrd) { static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
int32_t code = 0;
SWorkThrd* pThrd = hThrd; SWorkThrd* pThrd = hThrd;
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
if (pConn == NULL) { if (pConn == NULL) {
return NULL; TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
} }
transReqQueueInit(&pConn->wreqQueue); transReqQueueInit(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue);
transQueueInit(&pConn->srvMsgs, NULL); if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
if ((code = transInitBuffer(&pConn->readBuf)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
memset(&pConn->regArg, 0, sizeof(pConn->regArg)); memset(&pConn->regArg, 0, sizeof(pConn->regArg));
pConn->broken = false; pConn->broken = false;
pConn->status = ConnNormal; pConn->status = ConnNormal;
transInitBuffer(&pConn->readBuf);
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
if (exh == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
if (exh->refId < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
}
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
transAcquireExHandle(transGetRefMgt(), exh->refId);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
if (pSelf != exh) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
}
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
pConn->refId = exh->refId; pConn->refId = exh->refId;
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId);
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
// uv_timer_init(pThrd->loop, &pConn->pTimer);
// pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd;
// init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
if (pConn->pTcp == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
code = uv_tcp_init(pThrd->loop, pConn->pTcp);
if (code != 0) {
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
pConn->pTcp->data = pConn;
return pConn; return pConn;
_end:
if (pConn) {
transQueueDestroy(&pConn->srvMsgs);
transDestroyBuffer(&pConn->readBuf);
taosMemoryFree(pConn->pTcp);
taosMemoryFree(pConn);
pConn = NULL;
}
tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code));
return NULL;
} }
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) {
@ -1221,16 +1269,33 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
conn->regArg.init = 0; conn->regArg.init = 0;
} }
} }
static int reallocConnRef(SSvrConn* conn) { static int32_t reallocConnRef(SSvrConn* conn) {
if (conn->refId > 0) {
transReleaseExHandle(transGetRefMgt(), conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
}
// avoid app continue to send msg on invalid handle // avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
if (exh == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
if (exh->refId < 0) {
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
transAcquireExHandle(transGetRefMgt(), exh->refId); SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId);
if (pSelf != exh) {
tError("conn %p failed to acquire handle", conn);
taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID;
}
conn->refId = exh->refId; conn->refId = exh->refId;
return 0; return 0;
@ -1483,9 +1548,14 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg); taosMemoryFree(msg);
} }
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
int32_t code = 0;
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
reallocConnRef(conn); code = reallocConnRef(conn);
if (code != 0) {
destroyConn(conn, true);
return;
}
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
} }
@ -1794,6 +1864,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
SUpdateIpWhite* pReq = NULL; SUpdateIpWhite* pReq = NULL;
code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq);
if (code != 0) { if (code != 0) {
@ -1806,6 +1877,9 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) {
code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code);
tFreeSUpdateIpWhiteReq(pReq);
taosMemoryFree(pReq);
taosMemoryFree(msg);
break; break;
} }
} }