From b5c1b672a4d6dd2405cc37025d806f08e47ddbdd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 16 Sep 2024 09:29:43 +0800 Subject: [PATCH] Merge branch '3.0' into enh/opt-transport --- include/libs/transport/trpc.h | 2 +- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 78 ++++++--------------------- source/libs/transport/src/transSvr.c | 33 ++++++------ 4 files changed, 34 insertions(+), 81 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 72e66e268f..d9712dde4a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -63,7 +63,7 @@ typedef struct SRpcHandleInfo { int8_t forbiddenIp; int8_t notFreeAhandle; int8_t compressed; - int32_t seqNum; // msg seq + int64_t seqNum; // msg seq int64_t qId; // queryId Get from client, other req's qId = -1; int32_t refIdMgt; int32_t msgType; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 985ec1cb6a..7964a9479d 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -185,7 +185,7 @@ typedef struct { uint32_t code; // del later uint32_t msgType; int32_t msgLen; - int32_t seqNum; + int64_t seqNum; uint8_t content[0]; // message body starts from here } STransMsgHead; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c9a23d76be..c2ce95b651 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -92,7 +92,7 @@ typedef struct SCliConn { int32_t port; int64_t refId; - int32_t seq; + int64_t seq; int8_t registered; int8_t connnected; @@ -127,7 +127,7 @@ typedef struct SCliReq { uint64_t st; int sent; //(0: no send, 1: alread sent) queue seqq; - int32_t seq; + int64_t seq; queue qlist; } SCliReq; @@ -375,7 +375,7 @@ void destroyCliConnQTable(SCliConn* conn) { conn->pQTable = NULL; } bool filteBySeq(void* key, void* arg) { - int32_t* seq = arg; + int64_t* seq = arg; SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->seq == *seq) { return true; @@ -383,7 +383,7 @@ bool filteBySeq(void* key, void* arg) { return false; } } -int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { +int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, SCliReq** pReq) { int32_t code = 0; queue set; QUEUE_INIT(&set) @@ -432,7 +432,7 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe pResp->info.traceId = pHead->traceId; pResp->info.hasEpSet = pHead->hasEpSet; pResp->info.cliVer = htonl(pHead->compatibilityVer); - pResp->info.seqNum = htonl(pHead->seqNum); + pResp->info.seqNum = taosHton64(pHead->seqNum); int64_t qid = taosHton64(pHead->qid); pResp->info.handle = (void*)qid; @@ -444,8 +444,8 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) { int64_t qId = taosHton64(pHead->qid); STraceId* trace = &pHead->traceId; - int32_t seqNum = htonl(pHead->seqNum); - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "", + int64_t seqNum = taosHton64(pHead->seqNum); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum, qId); @@ -541,7 +541,7 @@ void cliHandleResp(SCliConn* conn) { int64_t qId = taosHton64(pHead->qid); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - int32_t seq = htonl(pHead->seqNum); + int64_t seq = taosHton64(pHead->seqNum); STransMsg resp = {0}; if (cliHandleState_mayHandleReleaseResp(conn, pHead)) { @@ -558,12 +558,13 @@ void cliHandleResp(SCliConn* conn) { code = cliNotifyCb(conn, NULL, &resp); return; } else { - tDebug("%s conn %p recv unexpected packet, seqNum:%d,qid:%" PRId64 " reason:%s", CONN_GET_INST_LABEL(conn), conn, - seq, qId, tstrerror(code)); + tDebug("%s conn %p recv unexpected packet, seqNum:%" PRId64 ",qid:%" PRId64 " reason:%s", + CONN_GET_INST_LABEL(conn), conn, seq, qId, tstrerror(code)); } if (code != 0) { - tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq, - qId, tstrerror(code)); + tDebug("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64 + ", the sever sends repeated response,reason:%s", + CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code)); // TODO: notify cb if (cliMayRecycleConn(conn)) { return; @@ -603,7 +604,7 @@ void cliConnTimeout(uv_timer_t* handle) { return; } - tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn)); + tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn); } void* createConnPool(int size) { @@ -631,50 +632,6 @@ void* destroyConnPool(SCliThrd* pThrd) { return NULL; } -// static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { -// void* pool = pThrd->pool; -// STrans* pTranInst = pThrd->pInst; -// size_t klen = strlen(key); -// SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); -// if (plist == NULL) { -// SConnList list = {0}; -// (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); -// plist = taosHashGet(pool, key, klen); - -// // SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); -// // QUEUE_INIT(&nList->msgQ); -// // nList->numOfConn++; - -// QUEUE_INIT(&plist->conns); -// //plist->list = nList; -// } - -// if (QUEUE_IS_EMPTY(&plist->conns)) { -// if (plist->list->numOfConn >= pTranInst->connLimitNum) { -// *exceed = true; -// return NULL; -// } -// plist->list->numOfConn++; -// return NULL; -// } - -// queue* h = QUEUE_TAIL(&plist->conns); -// QUEUE_REMOVE(h); -// plist->size -= 1; - -// SCliConn* conn = QUEUE_DATA(h, SCliConn, q); -// conn->status = ConnNormal; -// QUEUE_INIT(&conn->q); -// tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - -// if (conn->task != NULL) { -// transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); -// conn->task = NULL; -// } -// conn->seq++; -// return conn; -// } - static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList** ppList) { int32_t code = 0; void* pool = pThrd->pool; @@ -724,7 +681,6 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_INIT(&conn->q); - conn->seq = 0; conn->list = plist; if (conn->task != NULL) { @@ -773,8 +729,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->list->size += 1; tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr); - conn->seq = 0; - if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); if (arg == NULL) return; @@ -1197,7 +1151,7 @@ int32_t cliBatchSend(SCliConn* pConn) { pHead->compatibilityVer = htonl(pInst->compatibilityVer); } pHead->timestamp = taosHton64(taosGetTimestampUs()); - pHead->seqNum = htonl(pConn->seq); + pHead->seqNum = taosHton64(pConn->seq); pHead->qid = taosHton64(pReq->info.qId); if (pHead->comp == 0) { @@ -3210,7 +3164,7 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { } int32_t code = transHeapDelete(p, pConn); if (code != 0) { - tDebug("%s conn failed to delete conn %p from heap cache since %s", pConn, tstrerror(code)); + tDebug("conn %p failed delete from heap cache since %s", pConn, tstrerror(code)); } return code; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1e9162de9a..46bc1bfd2d 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -71,7 +71,7 @@ typedef struct SSvrRespMsg { STransMsg msg; queue q; STransMsgType type; - int32_t seqNum; + int64_t seqNum; void* arg; FilteFunc func; int8_t sent; @@ -393,13 +393,12 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (pConn->status == ConnNormal && pHead->noResp == 0) { if (cost >= EXCEPTION_LIMIT_US) { - tGDebug( - "%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%" PRId64 - "", - transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, - (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64 + ", qid:%" PRId64 "", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); } else { - tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%" PRId64 "", + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, (int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId); } @@ -407,15 +406,15 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* if (cost >= EXCEPTION_LIMIT_US) { tGDebug( "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, " - "seqNum:%d, qid:%" PRId64 "", + "seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } else { - tGDebug( - "%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, " - "qid:%" PRId64 "", - transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, - pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); + tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64 + ", " + "qid:%" PRId64 "", + transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, + pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId); } } tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn, @@ -457,7 +456,7 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { .msgType = pHead->msgType + 1, .info.qId = qId, .info.traceId = pHead->traceId, - .info.seqNum = htonl(pHead->seqNum)}; + .info.seqNum = taosHton64(pHead->seqNum)}; SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); srvMsg->msg = tmsg; @@ -573,7 +572,7 @@ static bool uvHandleReq(SSvrConn* pConn) { transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.forbiddenIp = forbiddenIp; transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0; - transMsg.info.seqNum = htonl(pHead->seqNum); + transMsg.info.seqNum = taosHton64(pHead->seqNum); transMsg.info.qId = taosHton64(pHead->qid); transMsg.info.msgType = pHead->msgType; @@ -671,7 +670,7 @@ void uvOnSendCb(uv_write_t* req, int status) { SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q); STraceId* trace = &smsg->msg.info.traceId; - tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%" PRId64 "", transLabel(conn->pInst), conn, + tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(conn->pInst), conn, smsg->msg.info.seqNum, smsg->msg.info.qId); destroySmsg(smsg); } @@ -723,7 +722,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer); pHead->version = TRANS_VER; - pHead->seqNum = htonl(pMsg->info.seqNum); + pHead->seqNum = taosHton64(pMsg->info.seqNum); pHead->qid = taosHton64(pMsg->info.qId); pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0;