opt transport

This commit is contained in:
yihaoDeng 2024-09-14 14:37:16 +08:00
parent 76cb30080f
commit 80aaa07b26
4 changed files with 102 additions and 25 deletions

View File

@ -864,7 +864,6 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(pCfg, "numOfRpcThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
// tsNumOfRpcThreads = numOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
pItem->i32 = tsNumOfRpcThreads;
pItem->stype = stype;

View File

@ -496,6 +496,19 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf);
enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING };
#define BUFFER_LIMIT 8
typedef struct {
queue q;
uv_write_t wreq;
void* arg;
} SWReqsWrapper;
int32_t initWQ(queue* wq);
void destroyWQ(queue* wq);
uv_write_t* allocWReqFromWQ(queue* wq, void* arg);
void freeWReqToWQ(queue* wq, SWReqsWrapper* w);
#ifdef __cplusplus
}
#endif

View File

@ -100,6 +100,13 @@ typedef struct SCliConn {
SHashObj* pQTable;
int8_t userInited;
void* pInitUserReq;
void* heap; // point to req conn heap
uv_buf_t* buf;
int32_t bufSize;
queue wq; // uv_write_t queue
} SCliConn;
// #define TRANS_CONN_REF_INC(tconn) ((tconn) ? (tconn)->ref++ : 0)
@ -744,7 +751,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
return;
}
uv_read_stop(conn->stream);
conn->seq = 0;
SCliThrd* thrd = conn->hostThrd;
cliResetConnTimer(conn);
@ -757,6 +763,9 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->list->size += 1;
tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
conn->heap = NULL;
conn->seq = 0;
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
if (arg == NULL) return;
@ -931,10 +940,16 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
code = TSDB_CODE_THIRDPARTY_ERROR;
TAOS_CHECK_GOTO(code, NULL, _failed);
}
conn->bufSize = BUFFER_LIMIT;
conn->buf = (uv_buf_t*)taosMemoryCalloc(1, BUFFER_LIMIT * sizeof(uv_buf_t));
initWQ(&conn->wq);
conn->stream->data = conn;
conn->connReq.data = conn;
*pCliConn = conn;
return code;
_failed:
if (conn) {
@ -986,6 +1001,11 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn->pInitUserReq);
conn->pInitUserReq = NULL;
}
taosMemoryFree(conn->buf);
destroyWQ(&conn->wq);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue);
(void)transDestroyBuffer(&conn->readBuf);
@ -1068,12 +1088,15 @@ static void cliConnRmReqs(SCliConn* conn) {
}
static void cliBatchSendCb(uv_write_t* req, int status) {
SCliConn* conn = req->data;
SWReqsWrapper* wrapper = (SWReqsWrapper*)req->data;
SCliConn* conn = wrapper->arg;
SCliThrd* pThrd = conn->hostThrd;
freeWReqToWQ(&conn->wq, wrapper);
int32_t ref = transUnrefCliHandle(conn);
if (ref <= 0) {
taosMemoryFree(req);
return;
}
@ -1085,7 +1108,6 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
}
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
taosMemoryFree(req);
if (!cliMayRecycleConn(conn)) {
cliBatchSend(conn);
@ -1123,7 +1145,15 @@ int32_t cliBatchSend(SCliConn* pConn) {
tDebug("%s conn %p not msg to send", pInst->label, pConn);
return 0;
}
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
uv_buf_t* wb = NULL;
if (pConn->bufSize < size) {
pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t));
pConn->bufSize = size;
taosMemoryFree(wb);
return TSDB_CODE_OUT_OF_MEMORY;
}
wb = pConn->buf;
int j = 0;
while (!transQueueEmpty(&pConn->reqsToSend)) {
@ -1182,11 +1212,9 @@ int32_t cliBatchSend(SCliConn* pConn) {
}
transRefCliHandle(pConn);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn;
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
taosMemoryFree(wb);
return 0;
}
@ -1526,10 +1554,10 @@ int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq) {
STransCtx* pUserCtx = taosHashGet(pConn->pQTable, &qid, sizeof(qid));
if (pUserCtx == NULL) {
code = taosHashPut(pConn->pQTable, &qid, sizeof(qid), &pCtx->userCtx, sizeof(pCtx->userCtx));
tDebug("%s conn %p add statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %p succ to add statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
} else {
transCtxMerge(pUserCtx, &pCtx->userCtx);
tDebug("%s conn %s update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
tDebug("%s conn %s succ to update statue ctx, qid:%ld", transLabel(pThrd->pInst), pConn, qid);
}
return 0;
}
@ -2561,7 +2589,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t
if (exh == NULL) {
return NULL;
} else {
tDebug("conn %p got", exh->handle);
tDebug("%s conn %p got", trans->label, exh->handle);
}
taosWLockLatch(&exh->latch);
if (exh->pThrd == NULL && trans != NULL) {
@ -3118,7 +3146,6 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
return NULL;
}
code = transHeapGet(pHeap, &pConn);
if (code != 0) {
tDebug("failed to get conn from heap cache for key:%s", key);
return NULL;
@ -3137,12 +3164,18 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
return pConn;
}
static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
SHeap* p = NULL;
SHeap* p = NULL;
int32_t code = 0;
int32_t code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
if (code != 0) {
return code;
if (pConn->heap != NULL) {
p = pConn->heap;
} else {
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
if (code != 0) {
return code;
}
}
code = transHeapInsert(p, pConn);
tDebug("add conn %p to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
pConn->reqRefCnt);
@ -3150,6 +3183,10 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
}
static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
if (pConn->heap != NULL) {
return transHeapDelete(pConn->heap, pConn);
}
SHeap* p = taosHashGet(pConnHeapCache, pConn->dstAddr, strlen(pConn->dstAddr));
if (p == NULL) {
tDebug("failed to get heap cache for key:%s, no need to del", pConn->dstAddr);
@ -3207,6 +3244,7 @@ int32_t transHeapInsert(SHeap* heap, SCliConn* p) {
heapInsert(heap->heap, &p->node);
p->inHeap = 1;
p->heap = heap;
return 0;
}
int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
@ -3219,6 +3257,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) {
p->reqRefCnt--;
if (p->reqRefCnt == 0) {
heapRemove(heap->heap, &p->node);
p->heap = NULL;
tDebug("delete conn %p delete from heap", p);
} else if (p->reqRefCnt < 0) {
tDebug("conn %p has %d reqs, not delete from heap,assert", p, p->reqRefCnt);

View File

@ -62,6 +62,9 @@ typedef struct SSvrConn {
// state req dict
SHashObj* pQTable;
uv_buf_t* buf;
int32_t bufSize;
queue wq; // uv_write_t queue
} SSvrConn;
typedef struct SSvrRespMsg {
@ -645,10 +648,14 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SWriteReq* userReq = req->data;
SWReqsWrapper* wrapper = req->data;
SWriteReq* userReq = wrapper->arg;
SSvrConn* conn = userReq->conn;
queue* src = &userReq->node;
freeWReqToWQ(&conn->wq, wrapper);
tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn);
if (status == 0) {
while (!QUEUE_IS_EMPTY(src)) {
@ -752,12 +759,14 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
if (size == 0) {
return 0;
}
int32_t count = 0;
uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t));
if (pWb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
if (pConn->bufSize < size) {
pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t));
pConn->bufSize = size;
}
uv_buf_t* pWb = pConn->buf;
int32_t count = 0;
while (transQueueSize(&pConn->resps) > 0) {
queue* el = transQueuePop(&pConn->resps);
@ -787,11 +796,17 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
if (pConn->broken) {
return;
}
int32_t size = transQueueSize(&pConn->resps);
if (size == 0) {
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
return;
}
SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq));
pWreq->conn = pConn;
QUEUE_INIT(&pWreq->node);
pWreq->req.data = pWreq;
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pWreq);
uv_buf_t* pBuf = NULL;
int32_t bufNum = 0;
@ -807,17 +822,17 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
transRefSrvHandle(pConn);
uv_write_t* req = &pWreq->req;
if (req == NULL) {
if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) {
tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
pConn->broken = true;
freeWReqToWQ(&pConn->wq, req->data);
taosMemoryFree(pWreq);
transUnrefSrvHandle(pConn);
return;
}
}
(void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
taosMemoryFree(pBuf);
}
int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
SSvrConn* pConn = pMsg->pConn;
@ -1258,12 +1273,18 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
code = initWQ(&pConn->wq);
TAOS_CHECK_GOTO(code, &lino, _end);
// init client handle
pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
if (pConn->pTcp == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end);
}
pConn->bufSize = BUFFER_LIMIT;
pConn->buf = taosMemoryCalloc(1, sizeof(uv_buf_t));
code = uv_tcp_init(pThrd->loop, pConn->pTcp);
if (code != 0) {
tError("%s failed to create conn since %s" PRId64, transLabel(pInst), uv_strerror(code));
@ -1282,6 +1303,8 @@ _end:
(void)transDestroyBuffer(&pConn->readBuf);
taosHashCleanup(pConn->pQTable);
taosMemoryFree(pConn->pTcp);
destroyWQ(&pConn->wq);
taosMemoryFree(pConn->buf);
taosMemoryFree(pConn);
pConn = NULL;
}
@ -1345,6 +1368,9 @@ static void uvDestroyConn(uv_handle_t* handle) {
uvConnDestroyAllState(conn);
(void)transDestroyBuffer(&conn->readBuf);
destroyWQ(&conn->wq);
taosMemoryFree(conn->buf);
taosMemoryFree(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {