diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d47ad214e8..6719e5788b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -172,8 +172,7 @@ typedef struct { char version : 4; // RPC version char comp : 2; // compression algorithm, 0:no compression 1:lz4 char noResp : 2; // noResp bits, 0: resp, 1: resp - char persist : 2; // persist handle,0: no persit, 1: persist handle - char release : 2; + char toInit : 2; // 0: sent user info or not char secured : 2; char spi : 2; char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 72f271a1f4..2bc2ceabf3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -100,6 +100,8 @@ typedef struct SCliConn { int8_t registered; int8_t connnected; SHashObj* pQTable; + int8_t inited; + void* initPacket; } SCliConn; // #define TRANS_CONN_REF_INC(tconn) \ @@ -1350,6 +1352,26 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { cliSendBatch_shareConn(conn); } } +bool cliConnMayBuildInitPacket(SCliConn* pConn, STransMsgHead** pHead, int32_t msgLen) { + SCliThrd* pThrd = pConn->hostThrd; + STrans* pInst = pThrd->pInst; + if (pConn->inited == 1) { + return false; + } + STransMsgHead* tHead = taosMemoryCalloc(1, msgLen + strlen(pInst->user)); + memcpy(tHead, pHead, TRANS_MSG_OVERHEAD); + memcpy(tHead + TRANS_MSG_OVERHEAD, pInst->user, strlen(pInst->user)); + + memcpy(tHead + TRANS_MSG_OVERHEAD + strlen(pInst->user), (char*)pHead + TRANS_MSG_OVERHEAD, + msgLen - TRANS_MSG_OVERHEAD); + + tHead->toInit = 1; + *pHead = tHead; + + pConn->initPacket = tHead; + pConn->inited = 1; + return true; +} void cliSendBatch_shareConn(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; @@ -1379,14 +1401,16 @@ void cliSendBatch_shareConn(SCliConn* pConn) { STransMsgHead* pHead = transHeadFromCont(pReq->pCont); + if (cliConnMayBuildInitPacket(pConn, &pHead, msgLen)) { + } else { + pHead->toInit = 0; + } + if (pHead->comp == 0) { - // pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; - pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; - memcpy(pHead->user, pInst->user, strlen(pInst->user)); + // memcpy(pHead->user, pInst->user, strlen(pInst->user)); pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->version = TRANS_VER; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fcaf82a824..35f81377a5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -462,6 +462,11 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { } return 0; } + +bool uvExtractFisrtInitPacket() { + + return true; +} static bool uvHandleReq(SSvrConn* pConn) { STrans* pInst = pConn->pInst; SWorkThrd* pThrd = pConn->hostThrd; @@ -685,7 +690,6 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - // pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; pHead->magicNum = htonl(TRANS_MAGIC_NUM); @@ -704,7 +708,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); // pHead->msgType = pMsg->msgType; - pHead->release = smsg->type == Release ? 1 : 0; + // pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));