From 8823358ca7c9f8f713306ecf7ded80c1c317cc92 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 Sep 2024 14:38:21 +0800 Subject: [PATCH] opt parameter --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 6 +- source/dnode/mnode/impl/src/mndMain.c | 3 +- source/libs/transport/inc/transComm.h | 9 ++- source/libs/transport/src/transCli.c | 68 ++++++++++++--------- source/libs/transport/src/transSvr.c | 36 +++++++++-- 5 files changed, 78 insertions(+), 44 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 70f258a362..b695457801 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -64,7 +64,7 @@ static void dmMayShouldUpdateIpWhiteList(SDnodeMgmt *pMgmt, int64_t ver) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_RETRIEVE_IP_WHITE, - .info.ahandle = (void *)0x9527, + .info.ahandle = 0, .info.refId = 0, .info.noResp = 0, .info.handle = 0}; @@ -180,7 +180,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, - .info.ahandle = (void *)0x9527, + .info.ahandle = 0, .info.refId = 0, .info.noResp = 0, .info.handle = 0}; @@ -229,7 +229,7 @@ void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_NOTIFY, - .info.ahandle = (void *)0x9527, + .info.ahandle = 0, .info.refId = 0, .info.noResp = 1, .info.handle = 0}; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 25872801e4..37e54d58ae 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -210,8 +210,7 @@ static void mndPullupGrant(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527}; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = 0}; // TODO check return value (void)tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6719e5788b..d54db8ab5c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -169,16 +169,15 @@ typedef struct { #define TRANS_VER 2 typedef struct { - char version : 4; // RPC version - char comp : 2; // compression algorithm, 0:no compression 1:lz4 - char noResp : 2; // noResp bits, 0: resp, 1: resp - char toInit : 2; // 0: sent user info or not + char version : 4; // RPC version + char comp : 2; // compression algorithm, 0:no compression 1:lz4 + char noResp : 2; // noResp bits, 0: resp, 1: resp + char withUserInfo : 2; // 0: sent user info or not char secured : 2; char spi : 2; char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset uint64_t timestamp; - char user[TSDB_UNI_LEN]; int32_t compatibilityVer; uint32_t magicNum; STraceId traceId; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2bc2ceabf3..3718331358 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -100,8 +100,8 @@ typedef struct SCliConn { int8_t registered; int8_t connnected; SHashObj* pQTable; - int8_t inited; - void* initPacket; + int8_t userInited; + void* pInitUserReq; } SCliConn; // #define TRANS_CONN_REF_INC(tconn) \ @@ -523,6 +523,10 @@ void cliHandleResp2(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + if (conn->pInitUserReq) { + taosMemoryFree(conn->pInitUserReq); + conn->pInitUserReq = NULL; + } cliResetConnTimer(conn); SCliReq* pReq = NULL; @@ -1123,8 +1127,8 @@ _exception: void cliDestroyMsg(void* arg) { queue* e = arg; SCliReq* pReq = QUEUE_DATA(e, SCliReq, q); - if (pReq->msg.info.notFreeAhandle == 0) { - taosMemoryFree(pReq->ctx->ahandle); + if (pReq->msg.info.notFreeAhandle == 0 && pReq->ctx->ahandle != 0) { + // taosMemoryFree(pReq->ctx->ahandle); } destroyReq(pReq); } @@ -1233,7 +1237,11 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transDQCancel(pThrd->timeoutQueue, conn->task); conn->task = NULL; } - // cliResetTimer(pThrd, conn); + + if (conn->pInitUserReq) { + taosMemoryFree(conn->pInitUserReq); + conn->pInitUserReq = NULL; + } if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { @@ -1270,6 +1278,10 @@ static void cliDestroy(uv_handle_t* handle) { cliDestroyConnMsgs(conn, true); destroyCliConnQTable(conn); + if (conn->pInitUserReq) { + taosMemoryFree(conn->pInitUserReq); + conn->pInitUserReq = NULL; + } tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); (void)transDestroyBuffer(&conn->readBuf); @@ -1352,24 +1364,26 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { cliSendBatch_shareConn(conn); } } -bool cliConnMayBuildInitPacket(SCliConn* pConn, STransMsgHead** pHead, int32_t msgLen) { +bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - if (pConn->inited == 1) { + if (pConn->userInited == 1) { return false; } - STransMsgHead* tHead = taosMemoryCalloc(1, msgLen + strlen(pInst->user)); - memcpy(tHead, pHead, TRANS_MSG_OVERHEAD); - memcpy(tHead + TRANS_MSG_OVERHEAD, pInst->user, strlen(pInst->user)); + STransMsgHead* pHead = *ppHead; + STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user)); + memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD); + memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user)); - memcpy(tHead + TRANS_MSG_OVERHEAD + strlen(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD, - msgLen - TRANS_MSG_OVERHEAD); + memcpy((char*)tHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD, + *msgLen - TRANS_MSG_OVERHEAD); - tHead->toInit = 1; - *pHead = tHead; + tHead->withUserInfo = 1; + *ppHead = tHead; + *msgLen += sizeof(pInst->user); - pConn->initPacket = tHead; - pConn->inited = 1; + pConn->pInitUserReq = tHead; + pConn->userInited = 1; return true; } void cliSendBatch_shareConn(SCliConn* pConn) { @@ -1397,20 +1411,20 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pReq->contLen = 0; } - int msgLen = transMsgLenFromCont(pReq->contLen); + int32_t msgLen = transMsgLenFromCont(pReq->contLen); STransMsgHead* pHead = transHeadFromCont(pReq->pCont); - if (cliConnMayBuildInitPacket(pConn, &pHead, msgLen)) { - } else { - pHead->toInit = 0; + char* content = pReq->pCont; + int32_t contLen = pReq->contLen; + if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) { + content = transContFromHead(pHead); + contLen = transContLenFromMsg(msgLen); } - if (pHead->comp == 0) { pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - // memcpy(pHead->user, pInst->user, strlen(pInst->user)); pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; @@ -1421,8 +1435,8 @@ void cliSendBatch_shareConn(SCliConn* pConn) { pHead->qid = taosHton64(pReq->info.qId); if (pHead->comp == 0) { - if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) { - msgLen = transCompressMsg(pReq->pCont, pReq->contLen) + sizeof(STransMsgHead); + if (pInst->compressSize != -1 && pInst->compressSize < contLen) { + msgLen = transCompressMsg(content, contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } } else { @@ -2984,10 +2998,8 @@ int32_t transReleaseCliHandle(void* handle) { return TSDB_CODE_RPC_BROKEN_LINK; } - STransMsg tmsg = {.msgType = TDMT_SCH_TASK_RELEASE, - .info.handle = handle, - .info.ahandle = (void*)0x9527, - .info.qId = (int64_t)handle}; + STransMsg tmsg = { + .msgType = TDMT_SCH_TASK_RELEASE, .info.handle = handle, .info.ahandle = (void*)0, .info.qId = (int64_t)handle}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 35f81377a5..96e369af9a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -54,6 +54,7 @@ typedef struct SSvrConn { int spi; char info[64]; char user[TSDB_UNI_LEN]; // user ID for the link + int8_t userInited; char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; // ciphering key @@ -463,10 +464,31 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { return 0; } -bool uvExtractFisrtInitPacket() { - - return true; -} +bool uvConnMayGetUserInfo(SSvrConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) { + if (pConn->userInited) { + return false; + } + + STrans* pInst = pConn->pInst; + STransMsgHead* pHead = *ppHead; + int32_t len = *msgLen; + if (pHead->withUserInfo) { + STransMsgHead* tHead = taosMemoryCalloc(1, len - sizeof(pInst->user)); + memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD); + memcpy((char*)tHead + TRANS_MSG_OVERHEAD, (char*)pHead + TRANS_MSG_OVERHEAD + sizeof(pInst->user), + len - sizeof(STransMsgHead) - sizeof(pInst->user)); + tHead->msgLen = htonl(htonl(pHead->msgLen) - sizeof(pInst->user)); + + memcpy(pConn->user, (char*)pHead + TRANS_MSG_OVERHEAD, sizeof(pConn->user)); + pConn->userInited = 1; + + taosMemoryFree(pHead); + *ppHead = tHead; + *msgLen = len - sizeof(pInst->user); + return true; + } + return false; +} static bool uvHandleReq(SSvrConn* pConn) { STrans* pInst = pConn->pInst; SWorkThrd* pThrd = pConn->hostThrd; @@ -479,6 +501,9 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p read invalid packet", transLabel(pInst), pConn); return false; } + if (uvConnMayGetUserInfo(pConn, &pHead, &msgLen) == true) { + } + if (resetBuf == 0) { tTrace("%s conn %p not reset read buf", transLabel(pInst), pConn); } @@ -487,12 +512,10 @@ static bool uvHandleReq(SSvrConn* pConn) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn); return false; } - // pHead->ahandle = htole64(pHead->ahandle); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); pConn->inType = pHead->msgType; - memcpy(pConn->user, pHead->user, strlen(pHead->user)); int8_t forbiddenIp = 0; if (pThrd->enableIpWhiteList && tsEnableWhiteList) { @@ -697,6 +720,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { pHead->version = TRANS_VER; pHead->seqNum = htonl(pMsg->info.seqNum); pHead->qid = taosHton64(pMsg->info.qId); + pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0; // handle invalid drop_task resp, TD-20098 if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {