opt transport

This commit is contained in:
yihaoDeng 2024-09-10 10:54:04 +08:00
parent b719f2258a
commit 56793dd89b
3 changed files with 11 additions and 14 deletions

View File

@ -183,7 +183,6 @@ typedef struct {
int32_t compatibilityVer; int32_t compatibilityVer;
uint32_t magicNum; uint32_t magicNum;
STraceId traceId; STraceId traceId;
uint64_t ahandle; // ahandle assigned by client
int64_t qid; int64_t qid;
uint32_t code; // del later uint32_t code; // del later
uint32_t msgType; uint32_t msgType;

View File

@ -1517,7 +1517,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
STransMsgHead* pHead = transHeadFromCont(pReq->pCont); STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
if (pHead->comp == 0) { if (pHead->comp == 0) {
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0; // pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0;
pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0;
pHead->msgType = pReq->msgType; pHead->msgType = pReq->msgType;

View File

@ -484,7 +484,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn); tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pInst), pConn);
return false; return false;
} }
pHead->ahandle = htole64(pHead->ahandle); // pHead->ahandle = htole64(pHead->ahandle);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
@ -763,12 +763,16 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
if (pConn->broken) { if (pConn->broken) {
return; return;
} }
queue sendReqNode;
QUEUE_INIT(&sendReqNode); SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq));
pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q);
QUEUE_INIT(&pWreq->node);
pWreq->req.data = pWreq;
uv_buf_t* pBuf = NULL; uv_buf_t* pBuf = NULL;
int32_t bufNum = 0; int32_t bufNum = 0;
code = uvBuildToSendData(pConn, &pBuf, &bufNum, &sendReqNode); code = uvBuildToSendData(pConn, &pBuf, &bufNum, &pWreq->node);
if (code != 0) { if (code != 0) {
tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn); tError("%s conn %p failed to send data", transLabel(pConn->pInst), pConn);
return; return;
@ -780,12 +784,6 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq));
pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q);
QUEUE_MOVE(&sendReqNode, &pWreq->node);
pWreq->req.data = pWreq;
uv_write_t* req = &pWreq->req; uv_write_t* req = &pWreq->req;
if (req == NULL) { if (req == NULL) {
if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) { if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) {
@ -1711,7 +1709,7 @@ void transRefSrvHandle(void* handle) {
} }
SSvrConn* pConn = handle; SSvrConn* pConn = handle;
pConn->ref++; pConn->ref++;
tTrace("conn %p ref count:%d", handle, pConn->ref); tTrace("conn %p ref count:%d", pConn, pConn->ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
@ -1720,7 +1718,7 @@ void transUnrefSrvHandle(void* handle) {
} }
SSvrConn* pConn = handle; SSvrConn* pConn = handle;
pConn->ref--; pConn->ref--;
tTrace("conn %p ref count:%d", handle, pConn->ref); tTrace("conn %p ref count:%d", pConn, pConn->ref);
if (pConn->ref == 0) { if (pConn->ref == 0) {
destroyConn((SSvrConn*)handle, true); destroyConn((SSvrConn*)handle, true);
} }