opt transport

This commit is contained in:
yihaoDeng 2024-09-19 09:04:56 +08:00
parent 88c0724468
commit 6d3b8c0594
2 changed files with 28 additions and 27 deletions

View File

@ -141,28 +141,24 @@ typedef struct {
SEpSet origEpSet; SEpSet origEpSet;
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
tmsg_t msgType; // message type tmsg_t msgType; // message type
int8_t connType; // connection type cli/srv
STransCtx userCtx; // STransCtx userCtx; //
STransMsg* pRsp; // for synchronous API STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API tsem_t* pSem; // for synchronous API
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
int64_t syncMsgRef; int64_t syncMsgRef;
SCvtAddr cvtAddr; SCvtAddr* pCvtAddr;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
int64_t retryMaxTimeout;
int32_t retryMinInterval; int32_t retryMinInterval;
int32_t retryMaxInterval; int32_t retryMaxInterval;
int32_t retryStepFactor; int32_t retryStepFactor;
int64_t retryMaxTimeout;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
bool retryInit;
int32_t retryStep; int32_t retryStep;
int8_t epsetRetryCnt;
int32_t retryCode; int32_t retryCode;
int8_t retryInit;
void* task; int8_t epsetRetryCnt;
int hThrdIdx;
} SReqCtx; } SReqCtx;
#pragma pack(push, 1) #pragma pack(push, 1)

View File

@ -120,17 +120,13 @@ typedef struct {
typedef struct SCliReq { typedef struct SCliReq {
SReqCtx* ctx; SReqCtx* ctx;
STransMsg msg;
queue q; queue q;
STransMsgType type; STransMsgType type;
// int64_t refId;
uint64_t st; uint64_t st;
int sent; //(0: no send, 1: alread sent)
queue seqq;
int64_t seq; int64_t seq;
int32_t sent; //(0: no send, 1: alread sent)
STransMsg msg;
queue qlist;
} SCliReq; } SCliReq;
typedef struct SCliThrd { typedef struct SCliThrd {
@ -152,7 +148,7 @@ typedef struct SCliThrd {
void (*destroyAhandleFp)(void* ahandle); void (*destroyAhandleFp)(void* ahandle);
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr* pCvtAddr;
SHashObj* failFastCache; SHashObj* failFastCache;
SHashObj* batchCache; SHashObj* batchCache;
@ -1474,12 +1470,18 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) {
static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; } static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; }
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) {
SReqCtx* pCtx = pReq->ctx; SReqCtx* pCtx = pReq->ctx;
pThrd->cvtAddr = pCtx->cvtAddr; if (pThrd->pCvtAddr != NULL) {
taosMemoryFreeClear(pThrd->pCvtAddr);
}
pThrd->pCvtAddr = pCtx->pCvtAddr;
destroyReq(pReq); destroyReq(pReq);
return; return;
} }
FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) {
if (pCvtAddr == NULL) {
return 0;
}
if (pCvtAddr->cvt == false) { if (pCvtAddr->cvt == false) {
if (EPSET_IS_VALID(pEpSet)) { if (EPSET_IS_VALID(pEpSet)) {
return 0; return 0;
@ -2001,7 +2003,7 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
if (pReq->ctx == NULL) { if (pReq->ctx == NULL) {
return 0; return 0;
} }
return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); return cliMayCvtFqdnToIp(&pReq->ctx->epSet, pThrd->pCvtAddr);
} }
int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = thrd; SCliThrd* pThrd = thrd;
@ -2201,6 +2203,8 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosHashCleanup(pThrd->pIdConnTable); taosHashCleanup(pThrd->pIdConnTable);
taosMemoryFree(pThrd->pCvtAddr);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
@ -2426,7 +2430,7 @@ void cliRetryMayInitCtx(STrans* pInst, SCliReq* pReq) {
pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0; pCtx->retryStep = 0;
pCtx->retryInit = true; pCtx->retryInit = 1;
pCtx->retryCode = TSDB_CODE_SUCCESS; pCtx->retryCode = TSDB_CODE_SUCCESS;
pReq->msg.info.handle = 0; pReq->msg.info.handle = 0;
} }
@ -2756,7 +2760,6 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe
pCliReq->msg = *pReq; pCliReq->msg = *pReq;
pCliReq->st = taosGetTimestampUs(); pCliReq->st = taosGetTimestampUs();
pCliReq->type = Normal; pCliReq->type = Normal;
QUEUE_INIT(&pCliReq->seqq);
*pCliMsg = pCliReq; *pCliMsg = pCliReq;
@ -3023,7 +3026,6 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq
pCliReq->msg = *pReq; pCliReq->msg = *pReq;
pCliReq->st = taosGetTimestampUs(); pCliReq->st = taosGetTimestampUs();
pCliReq->type = Normal; pCliReq->type = Normal;
// pCliReq->refId = (int64_t)pInstRef;
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid,
@ -3083,10 +3085,12 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) {
break; break;
} }
pCtx->cvtAddr = cvtAddr; pCtx->pCvtAddr = (SCvtAddr*)taosMemoryCalloc(1, sizeof(SCvtAddr));
memcpy(pCtx->pCvtAddr, &cvtAddr, sizeof(SCvtAddr));
SCliReq* pReq = taosMemoryCalloc(1, sizeof(SCliReq)); SCliReq* pReq = taosMemoryCalloc(1, sizeof(SCliReq));
if (pReq == NULL) { if (pReq == NULL) {
taosMemoryFree(pCtx->pCvtAddr);
taosMemoryFree(pCtx); taosMemoryFree(pCtx);
code = terrno; code = terrno;
break; break;
@ -3099,6 +3103,7 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) {
tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid); tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid);
if ((code = transAsyncSend(thrd->asyncPool, &(pReq->q))) != 0) { if ((code = transAsyncSend(thrd->asyncPool, &(pReq->q))) != 0) {
taosMemoryFree(pCtx->pCvtAddr);
destroyReq(pReq); destroyReq(pReq);
if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) { if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) {
code = TSDB_CODE_RPC_MODULE_QUIT; code = TSDB_CODE_RPC_MODULE_QUIT;