diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d7aab8357d..db444bb078 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,9 +25,8 @@ typedef struct { } SMsgList; typedef struct SConnList { - queue conns; - int32_t size; - SMsgList* list; + queue conns; + int32_t size; } SConnList; typedef struct { @@ -180,18 +179,18 @@ typedef struct { // conn pool // add expire timeout and capacity limit -static void* createConnPool(int size); -static void* destroyConnPool(SCliThrd* thread); -static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); -static void addConnToPool(void* pool, SCliConn* conn); -static void doCloseIdleConn(void* param); -static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); -static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port); -static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); -static void cliSendBatch_shareConnCb(uv_write_t* req, int status); -void cliSendBatch_shareConn(SCliConn* pConn); -int32_t cliSend2(SCliConn* conn); -bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); +static void* createConnPool(int size); +static void* destroyConnPool(SCliThrd* thread); +// static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); +static void addConnToPool(void* pool, SCliConn* conn); +static void doCloseIdleConn(void* param); +static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port); +static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn); +static void cliBatchSendCb(uv_write_t* req, int status); +void cliBatchSendImpl(SCliConn* pConn); +static int32_t cliBatchSend(SCliConn* conn); +bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); // register conn timer static void cliConnTimeout(uv_timer_t* handle); // register timer for read @@ -207,7 +206,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after conn to server static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -static void cliSendBatchCb(uv_write_t* req, int status); SCliBatch* cliGetHeadFromList(SCliBatchList* pList); @@ -574,7 +572,6 @@ void cliHandleResp(SCliConn* conn) { if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { STraceId* trace = &resp.info.traceId; tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn)); - // retry, notify } else { destroyReq(pReq); } @@ -610,9 +607,6 @@ void* destroyConnPool(SCliThrd* pThrd) { SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } - SMsgList* msglist = connList->list; - taosMemoryFree(msglist); - connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); @@ -620,57 +614,58 @@ void* destroyConnPool(SCliThrd* pThrd) { return NULL; } -static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { - void* pool = pThrd->pool; - STrans* pTranInst = pThrd->pInst; - size_t klen = strlen(key); - SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); - if (plist == NULL) { - SConnList list = {0}; - (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, klen); +// static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { +// void* pool = pThrd->pool; +// STrans* pTranInst = pThrd->pInst; +// size_t klen = strlen(key); +// SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); +// if (plist == NULL) { +// SConnList list = {0}; +// (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); +// plist = taosHashGet(pool, key, klen); - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - QUEUE_INIT(&nList->msgQ); - nList->numOfConn++; +// // SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); +// // QUEUE_INIT(&nList->msgQ); +// // nList->numOfConn++; - QUEUE_INIT(&plist->conns); - plist->list = nList; - } +// QUEUE_INIT(&plist->conns); +// //plist->list = nList; +// } - if (QUEUE_IS_EMPTY(&plist->conns)) { - if (plist->list->numOfConn >= pTranInst->connLimitNum) { - *exceed = true; - return NULL; - } - plist->list->numOfConn++; - return NULL; - } +// if (QUEUE_IS_EMPTY(&plist->conns)) { +// if (plist->list->numOfConn >= pTranInst->connLimitNum) { +// *exceed = true; +// return NULL; +// } +// plist->list->numOfConn++; +// return NULL; +// } - queue* h = QUEUE_TAIL(&plist->conns); - QUEUE_REMOVE(h); - plist->size -= 1; +// queue* h = QUEUE_TAIL(&plist->conns); +// QUEUE_REMOVE(h); +// plist->size -= 1; - SCliConn* conn = QUEUE_DATA(h, SCliConn, q); - conn->status = ConnNormal; - QUEUE_INIT(&conn->q); - tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); +// SCliConn* conn = QUEUE_DATA(h, SCliConn, q); +// conn->status = ConnNormal; +// QUEUE_INIT(&conn->q); +// tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - if (conn->task != NULL) { - transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); - conn->task = NULL; - } - conn->seq++; - return conn; -} +// if (conn->task != NULL) { +// transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); +// conn->task = NULL; +// } +// conn->seq++; +// return conn; +// } -static int32_t getOrCreateMsgList(SCliThrd* pThrd, const char* key, SConnList** ppList) { +static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList** ppList) { int32_t code = 0; void* pool = pThrd->pool; size_t klen = strlen(key); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; + QUEUE_INIT(&list.conns); code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); if (code != 0) { return code; @@ -680,16 +675,7 @@ static int32_t getOrCreateMsgList(SCliThrd* pThrd, const char* key, SConnList** if (plist == NULL) { return TSDB_CODE_INVALID_PTR; } - - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - if (nList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - QUEUE_INIT(&nList->msgQ); - nList->numOfConn++; - QUEUE_INIT(&plist->conns); - plist->list = nList; *ppList = plist; } else { *ppList = plist; @@ -702,13 +688,13 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p STrans* pInst = pThrd->pInst; SConnList* plist = NULL; - code = getOrCreateMsgList(pThrd, key, &plist); + code = getOrCreateConnList(pThrd, key, &plist); if (code != 0) { return code; } if (QUEUE_IS_EMPTY(&plist->conns)) { - if (plist->list->numOfConn >= pInst->connLimitNum) { + if (plist->size >= pInst->connLimitNum) { return TSDB_CODE_RPC_MAX_SESSIONS; } return TSDB_CODE_RPC_NETWORK_BUSY; @@ -721,6 +707,8 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_INIT(&conn->q); + conn->seq = 0; + conn->list = plist; if (conn->task != NULL) { SDelayTask* task = conn->task; @@ -756,7 +744,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { return; } uv_read_stop(conn->stream); - conn->seq = 0; SCliThrd* thrd = conn->hostThrd; @@ -1080,7 +1067,7 @@ static void cliConnRmReqs(SCliConn* conn) { return; } -static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { +static void cliBatchSendCb(uv_write_t* req, int status) { SCliConn* conn = req->data; SCliThrd* pThrd = conn->hostThrd; @@ -1101,7 +1088,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { taosMemoryFree(req); if (!cliMayRecycleConn(conn)) { - cliSendBatch_shareConn(conn); + cliBatchSend(conn); } } bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) { @@ -1126,7 +1113,7 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg pConn->userInited = 1; return true; } -void cliSendBatch_shareConn(SCliConn* pConn) { +int32_t cliBatchSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; int32_t size = transQueueSize(&pConn->reqsToSend); @@ -1134,7 +1121,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t totalLen = 0; if (size == 0) { tDebug("%s conn %p not msg to send", pInst->label, pConn); - return; + return 0; } uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); @@ -1191,38 +1178,25 @@ void cliSendBatch_shareConn(SCliConn* pConn) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } - if (j == 0) { - taosMemoryFree(wb); - return; - } transRefCliHandle(pConn); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); - uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliSendBatch_shareConnCb); + uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); taosMemoryFree(wb); + return 0; } int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; transQueuePush(&pConn->reqsToSend, &pCliMsg->q); - if (pConn->connnected) { - code = cliSend2(pConn); - } else { - // do nothing - } + code = cliBatchSend(pConn); return code; } -int32_t cliSend2(SCliConn* pConn) { - cliSendBatch_shareConn(pConn); - return 0; -} - static void cliDestroyBatch(SCliBatch* pBatch) { if (pBatch == NULL) return; while (!QUEUE_IS_EMPTY(&pBatch->wq)) { @@ -1368,7 +1342,7 @@ void cliConnCb(uv_connect_t* req, int status) { cliConnSetSockInfo(pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - (void)cliSend2(pConn); + (void)cliBatchSend(pConn); } static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { @@ -1599,7 +1573,7 @@ int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* (void)cliHandleState_mayUpdateStateCtx(pConn, pReq); return code; } -void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { +void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { int32_t lino = 0; STransMsg resp = {0}; int32_t code = (pThrd->initCb)(pThrd, pReq, NULL); @@ -1651,7 +1625,7 @@ _exception: return; } -void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleReq__noShareConn(pThrd, pReq); } +void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) { return cliHandleBatchReq(pThrd, pReq); } static void cliDoReq(queue* wq, SCliThrd* pThrd) { int count = 0; @@ -2182,8 +2156,6 @@ static FORCE_INLINE void doCloseIdleConn(void* param) { taosMemoryFree(arg); return; } - - cliDestroyConn(conn, true); taosMemoryFree(arg); } static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) { @@ -2440,6 +2412,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { cliRetryUpdateRule(pCtx, noDelay); pReq->sent = 0; + pReq->seq = 0; code = cliRetryDoSched(pReq, pThrd); if (code != 0) {