opt transport

This commit is contained in:
yihaoDeng 2024-09-14 11:12:41 +08:00
parent 45bb65336c
commit 7dc3db8b38
1 changed files with 68 additions and 95 deletions

View File

@ -25,9 +25,8 @@ typedef struct {
} SMsgList; } SMsgList;
typedef struct SConnList { typedef struct SConnList {
queue conns; queue conns;
int32_t size; int32_t size;
SMsgList* list;
} SConnList; } SConnList;
typedef struct { typedef struct {
@ -180,18 +179,18 @@ typedef struct {
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
static void* destroyConnPool(SCliThrd* thread); static void* destroyConnPool(SCliThrd* thread);
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); // static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port); static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port);
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn);
static void cliSendBatch_shareConnCb(uv_write_t* req, int status); static void cliBatchSendCb(uv_write_t* req, int status);
void cliSendBatch_shareConn(SCliConn* pConn); void cliBatchSendImpl(SCliConn* pConn);
int32_t cliSend2(SCliConn* conn); static int32_t cliBatchSend(SCliConn* conn);
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// register conn timer // register conn timer
static void cliConnTimeout(uv_timer_t* handle); static void cliConnTimeout(uv_timer_t* handle);
// register timer for read // register timer for read
@ -207,7 +206,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
// callback after conn to server // callback after conn to server
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliSendBatchCb(uv_write_t* req, int status);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList); SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
@ -574,7 +572,6 @@ void cliHandleResp(SCliConn* conn) {
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
STraceId* trace = &resp.info.traceId; STraceId* trace = &resp.info.traceId;
tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn)); tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn));
// retry, notify
} else { } else {
destroyReq(pReq); destroyReq(pReq);
} }
@ -610,9 +607,6 @@ void* destroyConnPool(SCliThrd* pThrd) {
SCliConn* c = QUEUE_DATA(h, SCliConn, q); SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true); cliDestroyConn(c, true);
} }
SMsgList* msglist = connList->list;
taosMemoryFree(msglist);
connList = taosHashIterate((SHashObj*)pool, connList); connList = taosHashIterate((SHashObj*)pool, connList);
} }
taosHashCleanup(pool); taosHashCleanup(pool);
@ -620,57 +614,58 @@ void* destroyConnPool(SCliThrd* pThrd) {
return NULL; return NULL;
} }
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { // static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool; // void* pool = pThrd->pool;
STrans* pTranInst = pThrd->pInst; // STrans* pTranInst = pThrd->pInst;
size_t klen = strlen(key); // size_t klen = strlen(key);
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); // SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
if (plist == NULL) { // if (plist == NULL) {
SConnList list = {0}; // SConnList list = {0};
(void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); // (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, klen); // plist = taosHashGet(pool, key, klen);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); // // SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ); // // QUEUE_INIT(&nList->msgQ);
nList->numOfConn++; // // nList->numOfConn++;
QUEUE_INIT(&plist->conns); // QUEUE_INIT(&plist->conns);
plist->list = nList; // //plist->list = nList;
} // }
if (QUEUE_IS_EMPTY(&plist->conns)) { // if (QUEUE_IS_EMPTY(&plist->conns)) {
if (plist->list->numOfConn >= pTranInst->connLimitNum) { // if (plist->list->numOfConn >= pTranInst->connLimitNum) {
*exceed = true; // *exceed = true;
return NULL; // return NULL;
} // }
plist->list->numOfConn++; // plist->list->numOfConn++;
return NULL; // return NULL;
} // }
queue* h = QUEUE_TAIL(&plist->conns); // queue* h = QUEUE_TAIL(&plist->conns);
QUEUE_REMOVE(h); // QUEUE_REMOVE(h);
plist->size -= 1; // plist->size -= 1;
SCliConn* conn = QUEUE_DATA(h, SCliConn, q); // SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; // conn->status = ConnNormal;
QUEUE_INIT(&conn->q); // QUEUE_INIT(&conn->q);
tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); // tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
if (conn->task != NULL) { // if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); // transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL; // conn->task = NULL;
} // }
conn->seq++; // conn->seq++;
return conn; // return conn;
} // }
static int32_t getOrCreateMsgList(SCliThrd* pThrd, const char* key, SConnList** ppList) { static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList** ppList) {
int32_t code = 0; int32_t code = 0;
void* pool = pThrd->pool; void* pool = pThrd->pool;
size_t klen = strlen(key); size_t klen = strlen(key);
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
if (plist == NULL) { if (plist == NULL) {
SConnList list = {0}; SConnList list = {0};
QUEUE_INIT(&list.conns);
code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
if (code != 0) { if (code != 0) {
return code; return code;
@ -680,16 +675,7 @@ static int32_t getOrCreateMsgList(SCliThrd* pThrd, const char* key, SConnList**
if (plist == NULL) { if (plist == NULL) {
return TSDB_CODE_INVALID_PTR; return TSDB_CODE_INVALID_PTR;
} }
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
if (nList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
QUEUE_INIT(&nList->msgQ);
nList->numOfConn++;
QUEUE_INIT(&plist->conns); QUEUE_INIT(&plist->conns);
plist->list = nList;
*ppList = plist; *ppList = plist;
} else { } else {
*ppList = plist; *ppList = plist;
@ -702,13 +688,13 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
SConnList* plist = NULL; SConnList* plist = NULL;
code = getOrCreateMsgList(pThrd, key, &plist); code = getOrCreateConnList(pThrd, key, &plist);
if (code != 0) { if (code != 0) {
return code; return code;
} }
if (QUEUE_IS_EMPTY(&plist->conns)) { if (QUEUE_IS_EMPTY(&plist->conns)) {
if (plist->list->numOfConn >= pInst->connLimitNum) { if (plist->size >= pInst->connLimitNum) {
return TSDB_CODE_RPC_MAX_SESSIONS; return TSDB_CODE_RPC_MAX_SESSIONS;
} }
return TSDB_CODE_RPC_NETWORK_BUSY; return TSDB_CODE_RPC_NETWORK_BUSY;
@ -721,6 +707,8 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
SCliConn* conn = QUEUE_DATA(h, SCliConn, q); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
conn->seq = 0;
conn->list = plist;
if (conn->task != NULL) { if (conn->task != NULL) {
SDelayTask* task = conn->task; SDelayTask* task = conn->task;
@ -756,7 +744,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
return; return;
} }
uv_read_stop(conn->stream); uv_read_stop(conn->stream);
conn->seq = 0; conn->seq = 0;
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
@ -1080,7 +1067,7 @@ static void cliConnRmReqs(SCliConn* conn) {
return; return;
} }
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { static void cliBatchSendCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
@ -1101,7 +1088,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
taosMemoryFree(req); taosMemoryFree(req);
if (!cliMayRecycleConn(conn)) { if (!cliMayRecycleConn(conn)) {
cliSendBatch_shareConn(conn); cliBatchSend(conn);
} }
} }
bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) { bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) {
@ -1126,7 +1113,7 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg
pConn->userInited = 1; pConn->userInited = 1;
return true; return true;
} }
void cliSendBatch_shareConn(SCliConn* pConn) { int32_t cliBatchSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
int32_t size = transQueueSize(&pConn->reqsToSend); int32_t size = transQueueSize(&pConn->reqsToSend);
@ -1134,7 +1121,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
int32_t totalLen = 0; int32_t totalLen = 0;
if (size == 0) { if (size == 0) {
tDebug("%s conn %p not msg to send", pInst->label, pConn); tDebug("%s conn %p not msg to send", pInst->label, pConn);
return; return 0;
} }
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
@ -1191,38 +1178,25 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
STraceId* trace = &pCliMsg->msg.info.traceId; STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q);
} }
if (j == 0) {
taosMemoryFree(wb);
return;
}
transRefCliHandle(pConn); transRefCliHandle(pConn);
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = pConn; req->data = pConn;
tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen);
uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb);
taosMemoryFree(wb); taosMemoryFree(wb);
return 0;
} }
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0; int32_t code = 0;
transQueuePush(&pConn->reqsToSend, &pCliMsg->q); transQueuePush(&pConn->reqsToSend, &pCliMsg->q);
if (pConn->connnected) { code = cliBatchSend(pConn);
code = cliSend2(pConn);
} else {
// do nothing
}
return code; return code;
} }
int32_t cliSend2(SCliConn* pConn) {
cliSendBatch_shareConn(pConn);
return 0;
}
static void cliDestroyBatch(SCliBatch* pBatch) { static void cliDestroyBatch(SCliBatch* pBatch) {
if (pBatch == NULL) return; if (pBatch == NULL) return;
while (!QUEUE_IS_EMPTY(&pBatch->wq)) { while (!QUEUE_IS_EMPTY(&pBatch->wq)) {
@ -1368,7 +1342,7 @@ void cliConnCb(uv_connect_t* req, int status) {
cliConnSetSockInfo(pConn); cliConnSetSockInfo(pConn);
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
(void)cliSend2(pConn); (void)cliBatchSend(pConn);
} }
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
@ -1599,7 +1573,7 @@ int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn*
(void)cliHandleState_mayUpdateStateCtx(pConn, pReq); (void)cliHandleState_mayUpdateStateCtx(pConn, pReq);
return code; return code;
} }
void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
int32_t lino = 0; int32_t lino = 0;
STransMsg resp = {0}; STransMsg resp = {0};
int32_t code = (pThrd->initCb)(pThrd, pReq, NULL); int32_t code = (pThrd->initCb)(pThrd, pReq, NULL);
@ -1651,7 +1625,7 @@ _exception:
return; return;
} }
void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleReq__noShareConn(pThrd, pReq); } void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleBatchReq(pThrd, pReq); }
static void cliDoReq(queue* wq, SCliThrd* pThrd) { static void cliDoReq(queue* wq, SCliThrd* pThrd) {
int count = 0; int count = 0;
@ -2182,8 +2156,6 @@ static FORCE_INLINE void doCloseIdleConn(void* param) {
taosMemoryFree(arg); taosMemoryFree(arg);
return; return;
} }
cliDestroyConn(conn, true);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) { static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) {
@ -2440,6 +2412,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
cliRetryUpdateRule(pCtx, noDelay); cliRetryUpdateRule(pCtx, noDelay);
pReq->sent = 0; pReq->sent = 0;
pReq->seq = 0;
code = cliRetryDoSched(pReq, pThrd); code = cliRetryDoSched(pReq, pThrd);
if (code != 0) { if (code != 0) {