refactor transport

This commit is contained in:
Yihao Deng 2024-08-28 01:01:59 +00:00
parent a367e6a0d3
commit c2cccbcf40
1 changed files with 162 additions and 130 deletions

View File

@ -162,7 +162,7 @@ static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn);
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port);
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port);
// register conn timer
@ -195,12 +195,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
void cliResetConnTimer(SCliConn* conn);
static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
@ -413,6 +412,20 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
return false;
}
int32_t cliGetTimerFrom(SCliThrd* pThrd, SCliConn* pConn) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDebug("no available timer, create a timer %p", timer);
(void)uv_timer_init(pThrd->loop, timer);
}
timer->data = pConn;
pConn->timer = timer;
return 0;
}
void cliResetConnTimer(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
if (conn->timer) {
@ -837,7 +850,6 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
int32_t code = 0;
void* pool = pThrd->pool;
STrans* pInst = pThrd->pInst;
// size_t klen = strlen(key);
SConnList* plist = NULL;
code = getOrCreateMsgList(pThrd, key, &plist);
@ -1163,31 +1175,35 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
}
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
int32_t code = 0;
SCliConn* pConn = NULL;
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
int32_t code = cliCreateConn(pThrd, &pConn);
if (code != 0) {
return code;
}
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
pConn->dstAddr = taosStrdup(addr);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
transQueuePush(&pConn->reqs, pReq);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
return cliDoConn(pThrd, pConn, ip, port);
_exception:
// free conn
return code;
}
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int32_t port) {
int32_t code = 0;
int32_t lino = 0;
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
if (conn == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed);
}
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
conn->dstAddr = taosStrdup(addr);
transReqQueueInit(&conn->wreqQueue);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
@ -1215,6 +1231,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed);
TAOS_CHECK_GOTO(cliGetTimerFrom(pThrd, conn), &lino, _failed);
// read/write stream handle
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
if (conn->stream == NULL) {
@ -1240,6 +1258,7 @@ _failed:
(void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs);
}
tError("failed to create conn, code:%d", code);
taosMemoryFree(conn);
return code;
}
@ -1412,7 +1431,6 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
if (size == 0) {
tError("%s conn %p not msg to send", pInst->label, pConn);
ASSERT(0);
// cliHandleExcept(pConn);
return;
}
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
@ -1657,8 +1675,9 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
}
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
int32_t lino = 0;
STrans* pInst = pThrd->pInst;
int32_t lino = 0;
STrans* pInst = pThrd->pInst;
uint32_t ipaddr;
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr);
if (code != 0) {
@ -1688,25 +1707,11 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception);
return code;
}
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
tDebug("no available timer, create a timer %p", timer);
(void)uv_timer_init(pThrd->loop, timer);
}
timer->data = conn;
conn->timer = timer;
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
// cliResetConnTimer(conn);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception);
}
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
@ -1747,7 +1752,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
return;
}
if (conn == NULL) {
code = cliCreateConn(pThrd, &conn);
code = cliCreateConn(pThrd, &conn, pList->ip, pList->port);
if (code != 0) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label,
pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code));
@ -1757,7 +1762,13 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
conn->pBatch = pBatch;
conn->dstAddr = taosStrdup(pList->dst);
(void)cliDoConn(pThrd, conn, pList->ip, pList->port);
if (conn->dstAddr == NULL) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label,
pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
cliDestroyBatch(pBatch);
return;
}
code = cliDoConn(pThrd, conn, pList->ip, pList->port);
}
conn->pBatch = pBatch;
@ -1826,6 +1837,25 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
cliHandleExcept(pConn, status);
}
int32_t cliConnSetSockInfo(SCliConn* pConn) {
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
(void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
(void)transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname);
(void)uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
(void)transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in saddr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
return 0;
};
void cliConnCb(uv_connect_t* req, int status) {
SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd;
@ -1840,29 +1870,22 @@ void cliConnCb(uv_connect_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
if (status != 0) {
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
if (timeout == false) {
cliHandleFastFail(pConn, status);
} else if (timeout == true) {
// already deal by timeout
}
return;
tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr,
uv_strerror(status));
// handle err
// 1. update statis
// 2. notifyCb or retry
// 3. clear conn and
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
// if (timeout == false) {
// cliHandleFastFail(pConn, status);
// } else if (timeout == true) {
// // already deal by timeout
// }
// return;
}
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
(void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
(void)transSockInfo2Str(&peername, pConn->dst);
addrlen = sizeof(sockname);
(void)uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
(void)transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in saddr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
cliConnSetSockInfo(pConn);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) {
@ -1906,6 +1929,7 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) {
}
pThrd->stopMsg = NULL;
pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd);
destroyReq(pReq);
@ -2147,8 +2171,10 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) {
STraceId* trace = &pReq->msg.info.traceId;
STrans* pInst = pThrd->pInst;
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
if (pConn == NULL) {
@ -2167,13 +2193,13 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) {
return;
}
code = cliCreateConn(pThrd, &pConn);
pConn->dstAddr = taosStrdup(addr);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception);
transQueuePush(&pConn->reqs, pReq);
cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
code = cliDoConn(pThrd, pConn, ip, port);
_exception:
resp.code = code;
@ -2248,9 +2274,68 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq);
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port);
static void destroyBatchList(SCliBatchList* pList);
static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
int32_t code = 0;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
return;
}
pBatchList->batchLenLimit = pInst->batchSize;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
return;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
return;
}
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
@ -2329,64 +2414,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
cliBuildBatch(pReq, h, pThrd);
continue;
}
if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) {
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
continue;
}
pBatchList->batchLenLimit = pInst->batchSize;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
continue;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->batchSize += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
continue;
}
(*cliAsyncHandle[pReq->type])(pThrd, pReq);
count++;
}
@ -2896,6 +2923,10 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) {
int32_t len = pResp->contLen - tlen;
if (len != 0) {
buf = rpcMallocCont(len);
if (buf == NULL) {
pResp->code = TSDB_CODE_OUT_OF_MEMORY;
return false;
}
// TODO: check buf
memcpy(buf, (char*)pResp->pCont + tlen, len);
}
@ -2909,6 +2940,7 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) {
epsetAssign(dst, &epset);
return true;
}
bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
bool noDelay = true;
if (hasEpSet == false) {
@ -3749,7 +3781,7 @@ _exception:
return code;
}
static int32_t getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) {
static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) {
int32_t code = 0;
size_t klen = strlen(key);
@ -3780,7 +3812,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
int code = 0;
SHeap* pHeap = NULL;
SCliConn* pConn = NULL;
code = getOrCreateHeapIfNotExist(pConnHeapCache, key, &pHeap);
code = getOrCreateHeap(pConnHeapCache, key, &pHeap);
if (code != 0) {
tDebug("failed to get conn heap from cache for key:%s", key);
return NULL;
@ -3795,7 +3827,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
SHeap* p = NULL;
int32_t code = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr, &p);
int32_t code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
if (code != 0) {
return code;
}