refactor tranport

This commit is contained in:
Yihao Deng 2024-05-23 06:49:24 +00:00
parent 81a4ef73f1
commit a0abe67432
2 changed files with 57 additions and 49 deletions

View File

@ -69,6 +69,7 @@ typedef struct {
int8_t connLimitLock; // 0: no lock. 1. lock int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize; int32_t batchSize;
int8_t optBatchFetch;
int32_t timeToGetConn; int32_t timeToGetConn;
int index; int index;
void* parent; void* parent;

View File

@ -1213,23 +1213,67 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
p->sending -= 1; p->sending -= 1;
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
STrans* pTransInst = pThrd->pTransInst;
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip);
if (ipaddr == 0xffffffff) {
cliResetConnTimer(conn);
cliHandleFastFail(conn, -1);
return;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
cliHandleFastFail(conn, -1);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleFastFail(conn, -1);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
if (ret != 0) {
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleFastFail(conn, -1);
return;
}
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
cliResetConnTimer(conn);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
return;
}
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return;
}
if (pThrd->quit == true) { if (pThrd->quit == true) {
cliDestroyBatch(pBatch); cliDestroyBatch(pBatch);
return; return;
} }
if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) {
return;
}
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliBatchList* pList = pBatch->pList; SCliBatchList* pList = pBatch->pList;
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
bool exceed = false; bool exceed = false;
SCliConn* conn = getConnFromPool(pThrd, key, &exceed); SCliConn* conn = getConnFromPool(pThrd, pList->dst, &exceed);
if (conn == NULL && exceed) { if (conn == NULL && exceed) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
@ -1241,48 +1285,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
conn = cliCreateConn(pThrd); conn = cliCreateConn(pThrd);
conn->pBatch = pBatch; conn->pBatch = pBatch;
conn->dstAddr = taosStrdup(pList->dst); conn->dstAddr = taosStrdup(pList->dst);
return cliDoConn(pThrd, conn, pList->ip, pList->port);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
if (ipaddr == 0xffffffff) {
cliResetConnTimer(conn);
cliHandleFastFail(conn, -1);
return;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ipaddr;
addr.sin_port = (uint16_t)htons(pList->port);
tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst);
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd == -1) {
tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
tstrerror(TAOS_SYSTEM_ERROR(errno)));
cliHandleFastFail(conn, -1);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) {
tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleFastFail(conn, -1);
return;
}
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
if (ret != 0) {
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleFastFail(conn, -1);
return;
}
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
cliResetConnTimer(conn);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
return;
} }
conn->pBatch = pBatch; conn->pBatch = pBatch;
@ -1803,6 +1806,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg->type == Normal) {
cliBuildBatch(pMsg, h, pThrd);
count++;
}
if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) {
cliBuildBatch(pMsg, h, pThrd); cliBuildBatch(pMsg, h, pThrd);
continue; continue;