add user req

This commit is contained in:
yihaoDeng 2024-09-29 12:56:55 +08:00
parent 8276cd5cbe
commit 836523bf18
3 changed files with 170 additions and 60 deletions

View File

@ -137,8 +137,17 @@ typedef struct SCvtAddr {
} SCvtAddr; } SCvtAddr;
typedef struct { typedef struct {
SEpSet epSet; // ip list provided by app int32_t inUse;
SEpSet origEpSet; int32_t numOfEps;
SEp eps[];
} SReqEpSet;
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet);
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet);
typedef struct {
SReqEpSet* epSet; // ip list provided by app
SReqEpSet* origEpSet;
void* ahandle; // handle provided by app void* ahandle; // handle provided by app
tmsg_t msgType; // message type tmsg_t msgType; // message type
@ -438,9 +447,9 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
void transDQCancel(SDelayQueue* queue, SDelayTask* task); void transDQCancel(SDelayQueue* queue, SDelayTask* task);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transRepEpsetIsEqual(SReqEpSet* a, SReqEpSet* b);
bool transEpSetIsEqual2(SEpSet* a, SEpSet* b); bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b);
/* /*
* init global func * init global func
*/ */

View File

@ -230,7 +230,7 @@ static void doFreeTimeoutMsg(void* param);
static void cliDestroyBatch(SCliBatch* pBatch); static void cliDestroyBatch(SCliBatch* pBatch);
// cli util func // cli util func
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx); static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx);
static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SReqEpSet* pEpSet, const SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* thrd, SCliReq* pReq, STransMsg* resp); static FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* thrd, SCliReq* pReq, STransMsg* resp);
@ -831,8 +831,8 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
// code // code
static int32_t cliGetOrCreateConn(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { static int32_t cliGetOrCreateConn(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) {
// impl later // impl later
char* fqdn = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); char* fqdn = EPSET_GET_INUSE_IP(pReq->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); uint16_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet);
char addr[TSDB_FQDN_LEN + 64] = {0}; char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
@ -932,8 +932,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
int32_t code = 0; int32_t code = 0;
SCliConn* pConn = NULL; SCliConn* pConn = NULL;
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); char* ip = EPSET_GET_INUSE_IP(pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); int32_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet);
TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception);
@ -1559,7 +1559,7 @@ static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) {
return; return;
} }
FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { FORCE_INLINE int32_t cliMayCvtFqdnToIp(SReqEpSet* pEpSet, const SCvtAddr* pCvtAddr) {
if (pCvtAddr == NULL) { if (pCvtAddr == NULL) {
return 0; return 0;
} }
@ -1583,7 +1583,7 @@ FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr)
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) { FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) {
if (code != 0) return false; if (code != 0) return false;
return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; return transRepEpsetIsEqual(pCtx->epSet, pCtx->origEpSet) ? false : true;
} }
FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMsg* pResp) { FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMsg* pResp) {
@ -1770,8 +1770,8 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) {
if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { if (code == TSDB_CODE_RPC_NO_STATE || code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
char addr[TSDB_FQDN_LEN + 64] = {0}; char addr[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); char* ip = EPSET_GET_INUSE_IP(pReq->ctx->epSet);
int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); int32_t port = EPSET_GET_INUSE_PORT(pReq->ctx->epSet);
CONN_CONSTRUCT_HASH_KEY(addr, ip, port); CONN_CONSTRUCT_HASH_KEY(addr, ip, port);
pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); pConn = getConnFromHeapCache(pThrd->connHeapCache, addr);
@ -1860,8 +1860,8 @@ static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx; SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); char* ip = EPSET_GET_INUSE_IP(pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key); size_t klen = strlen(key);
@ -2007,7 +2007,9 @@ static FORCE_INLINE void destroyReq(void* arg) {
STraceId* trace = &pReq->msg.info.traceId; STraceId* trace = &pReq->msg.info.traceId;
tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx);
if (pReq->ctx) destroyReqCtx(pReq->ctx); if (pReq->ctx) {
destroyReqCtx(pReq->ctx);
}
transFreeMsg(pReq->msg.pCont); transFreeMsg(pReq->msg.pCont);
taosMemoryFree(pReq); taosMemoryFree(pReq);
} }
@ -2102,7 +2104,7 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
if (pReq->ctx == NULL) { if (pReq->ctx == NULL) {
return 0; return 0;
} }
return cliMayCvtFqdnToIp(&pReq->ctx->epSet, pThrd->pCvtAddr); return cliMayCvtFqdnToIp(pReq->ctx->epSet, pThrd->pCvtAddr);
} }
int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = thrd; SCliThrd* pThrd = thrd;
@ -2319,7 +2321,13 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx) { taosMemoryFree(ctx); } static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx) {
if (ctx) {
taosMemoryFree(ctx->epSet);
taosMemoryFree(ctx->origEpSet);
taosMemoryFree(ctx);
}
}
int32_t cliSendQuit(SCliThrd* thrd) { int32_t cliSendQuit(SCliThrd* thrd) {
// cli can stop gracefully // cli can stop gracefully
@ -2391,7 +2399,7 @@ static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) {
STraceId* trace = &pReq->msg.info.traceId; STraceId* trace = &pReq->msg.info.traceId;
char tbuf[512] = {0}; char tbuf[512] = {0};
code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); code = epsetToStr((SEpSet*)pCtx->epSet, tbuf, tListLen(tbuf));
if (code != 0) { if (code != 0) {
tWarn("failed to debug epset since %s", tstrerror(code)); tWarn("failed to debug epset since %s", tstrerror(code));
return; return;
@ -2410,7 +2418,7 @@ static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) {
char tbuf[512] = {0}; char tbuf[512] = {0};
code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); code = epsetToStr((SEpSet*)pCtx->epSet, tbuf, tListLen(tbuf));
if (code != 0) { if (code != 0) {
tWarn("failed to debug epset since %s", tstrerror(code)); tWarn("failed to debug epset since %s", tstrerror(code));
return; return;
@ -2442,7 +2450,6 @@ static FORCE_INLINE int32_t cliSchedMsgToNextNode(SCliReq* pReq, SCliThrd* pThrd
FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) {
int32_t code = 0; int32_t code = 0;
SReqCtx* ctx = pReq->ctx; SReqCtx* ctx = pReq->ctx;
SEpSet* dst = &ctx->epSet;
if ((pResp == NULL || pResp->info.hasEpSet == 0)) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false; return false;
@ -2453,7 +2460,8 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) {
tError("failed to deserialize epset, code:%d", code); tError("failed to deserialize epset, code:%d", code);
return false; return false;
} }
int32_t tlen = tSerializeSEpSet(NULL, 0, dst); SEpSet tepset;
int32_t tlen = tSerializeSEpSet(NULL, 0, &tepset);
char* buf = NULL; char* buf = NULL;
int32_t len = pResp->contLen - tlen; int32_t len = pResp->contLen - tlen;
@ -2473,7 +2481,7 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) {
pResp->info.hasEpSet = 1; pResp->info.hasEpSet = 1;
epsetAssign(dst, &epset); transCreateReqEpsetFromUserEpset(&epset, &ctx->epSet);
return true; return true;
} }
@ -2481,34 +2489,35 @@ bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
bool noDelay = true; bool noDelay = true;
if (hasEpSet == false) { if (hasEpSet == false) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) {
noDelay = false; noDelay = false;
} else { } else {
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(pCtx->epSet);
} }
} else if (pResp->contLen != 0) { } else if (pResp->contLen != 0) {
SEpSet epSet; SEpSet epSet;
int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
if (valid < 0) { if (valid < 0) {
tDebug("get invalid epset, epset equal, continue"); tDebug("get invalid epset, epset equal, continue");
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) {
noDelay = false; noDelay = false;
} else { } else {
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(pCtx->epSet);
} }
} else { } else {
if (!transEpSetIsEqual2(&pCtx->epSet, &epSet)) { if (!transCompareReqAndUserEpset(pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset1"); tDebug("epset not equal, retry new epset1");
transPrintEpSet(&pCtx->epSet); transPrintEpSet((SEpSet*)pCtx->epSet);
transPrintEpSet(&epSet); transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet);
transCreateReqEpsetFromUserEpset(&epSet, &pCtx->epSet);
noDelay = false; noDelay = false;
} else { } else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) {
noDelay = false; noDelay = false;
} else { } else {
tDebug("epset equal, continue"); tDebug("epset equal, continue");
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(pCtx->epSet);
} }
} }
} }
@ -2518,24 +2527,24 @@ bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
if (valid < 0) { if (valid < 0) {
tDebug("get invalid epset, epset equal, continue"); tDebug("get invalid epset, epset equal, continue");
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) {
noDelay = false; noDelay = false;
} else { } else {
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(pCtx->epSet);
} }
} else { } else {
if (!transEpSetIsEqual2(&pCtx->epSet, &epSet)) { if (!transCompareReqAndUserEpset(pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset2"); tDebug("epset not equal, retry new epset2");
transPrintEpSet(&pCtx->epSet); transPrintEpSet((SEpSet*)pCtx->epSet);
transPrintEpSet(&epSet); transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet); transCreateReqEpsetFromUserEpset(&epSet, &pCtx->epSet);
noDelay = false; noDelay = false;
} else { } else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps) {
noDelay = false; noDelay = false;
} else { } else {
tDebug("epset equal, continue"); tDebug("epset equal, continue");
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(pCtx->epSet);
} }
} }
} }
@ -2675,7 +2684,7 @@ void cliMayResetRespCode(SCliReq* pReq, STransMsg* pResp) {
} }
// check whole vnodes is offline on this vgroup // check whole vnodes is offline on this vgroup
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) { if (pCtx->epsetRetryCnt >= pCtx->epSet->numOfEps || pCtx->retryStep > 0) {
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED; pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
} else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) { } else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
@ -2710,7 +2719,10 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp));
if (cliIsEpsetUpdated(pResp->code, pCtx)) { if (cliIsEpsetUpdated(pResp->code, pCtx)) {
pSyncMsg->hasEpSet = 1; pSyncMsg->hasEpSet = 1;
epsetAssign(&pSyncMsg->epSet, &pCtx->epSet);
SEpSet epset = {0};
transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset);
epsetAssign(&pSyncMsg->epSet, &epset);
} }
TAOS_UNUSED(tsem2_post(pSyncMsg->pSem)); TAOS_UNUSED(tsem2_post(pSyncMsg->pSem));
TAOS_UNUSED(taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef)); TAOS_UNUSED(taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef));
@ -2721,12 +2733,16 @@ int32_t cliNotifyImplCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
} else { } else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pResp->info.hasEpSet == 1) { if (pResp->info.hasEpSet == 1) {
pInst->cfp(pInst->parent, pResp, &pCtx->epSet); SEpSet epset = {0};
transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset);
pInst->cfp(pInst->parent, pResp, &epset);
} else { } else {
if (!cliIsEpsetUpdated(pResp->code, pCtx)) { if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
pInst->cfp(pInst->parent, pResp, NULL); pInst->cfp(pInst->parent, pResp, NULL);
} else { } else {
pInst->cfp(pInst->parent, pResp, &pCtx->epSet); SEpSet epset = {0};
transCreateUserEpsetFromReqEpset(pCtx->epSet, &epset);
pInst->cfp(pInst->parent, pResp, &epset);
} }
} }
} }
@ -2862,6 +2878,7 @@ int32_t transReleaseCliHandle(void* handle) {
} }
static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) { static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliReq** pCliMsg) {
int32_t code = 0;
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx));
@ -2869,8 +2886,18 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe
return terrno; return terrno;
} }
epsetAssign(&pCtx->epSet, pEpSet); code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet);
epsetAssign(&pCtx->origEpSet, pEpSet); if (code != 0) {
taosMemoryFree(pCtx);
return code;
}
code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet);
if (code != 0) {
taosMemoryFree(pCtx);
taosMemoryFree(pCtx->epSet);
return code;
}
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
@ -2984,6 +3011,8 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr
return TSDB_CODE_RPC_MODULE_QUIT; return TSDB_CODE_RPC_MODULE_QUIT;
} }
int32_t code = 0; int32_t code = 0;
SCliReq* pCliReq = NULL;
SReqCtx* pCtx = NULL;
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransRsp == NULL) { if (pTransRsp == NULL) {
@ -3008,25 +3037,36 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr
if (pReq->info.traceId.msgId == 0) TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); if (pReq->info.traceId.msgId == 0) TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
SReqCtx* pCtx = taosMemoryCalloc(1, sizeof(SReqCtx)); pCtx = taosMemoryCalloc(1, sizeof(SReqCtx));
if (pCtx == NULL) { if (pCtx == NULL) {
TAOS_UNUSED(tsem_destroy(sem)); TAOS_UNUSED(tsem_destroy(sem));
taosMemoryFree(sem); taosMemoryFree(sem);
TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); TAOS_CHECK_GOTO(terrno, NULL, _RETURN1);
} }
epsetAssign(&pCtx->epSet, pEpSet); code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet);
epsetAssign(&pCtx->origEpSet, pEpSet); if (code != 0) {
(TAOS_UNUSED(tsem_destroy(sem)));
taosMemoryFree(sem);
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
}
code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet);
if (code != 0) {
(TAOS_UNUSED(tsem_destroy(sem)));
taosMemoryFree(sem);
TAOS_CHECK_GOTO(code, NULL, _RETURN1);
}
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->pSem = sem; pCtx->pSem = sem;
pCtx->pRsp = pTransRsp; pCtx->pRsp = pTransRsp;
SCliReq* pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); pCliReq = taosMemoryCalloc(1, sizeof(SCliReq));
if (pCliReq == NULL) { if (pCliReq == NULL) {
(TAOS_UNUSED(tsem_destroy(sem))); (TAOS_UNUSED(tsem_destroy(sem)));
taosMemoryFree(sem); taosMemoryFree(sem);
taosMemoryFree(pCtx);
TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); TAOS_CHECK_GOTO(terrno, NULL, _RETURN1);
} }
@ -3037,7 +3077,7 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(pCtx->epSet), EPSET_GET_INUSE_PORT(pCtx->epSet), pReq->info.ahandle);
code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); code = transAsyncSend(pThrd->asyncPool, &pCliReq->q);
if (code != 0) { if (code != 0) {
@ -3059,6 +3099,11 @@ _RETURN1:
taosMemoryFree(pTransRsp); taosMemoryFree(pTransRsp);
taosMemoryFree(pReq->pCont); taosMemoryFree(pReq->pCont);
pReq->pCont = NULL; pReq->pCont = NULL;
if (pCtx != NULL) {
taosMemoryFree(pCtx->epSet);
taosMemoryFree(pCtx->origEpSet);
taosMemoryFree(pCtx);
}
return code; return code;
} }
@ -3126,8 +3171,16 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq
TAOS_CHECK_GOTO(terrno, NULL, _RETURN2); TAOS_CHECK_GOTO(terrno, NULL, _RETURN2);
} }
epsetAssign(&pCtx->epSet, pEpSet); code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->epSet);
epsetAssign(&pCtx->origEpSet, pEpSet); if (code != 0) {
taosMemoryFreeClear(pCtx->epSet);
TAOS_CHECK_GOTO(code, NULL, _RETURN2);
}
code = transCreateReqEpsetFromUserEpset(pEpSet, &pCtx->origEpSet);
if (code != 0) {
taosMemoryFreeClear(pCtx->epSet);
TAOS_CHECK_GOTO(code, NULL, _RETURN2);
}
pCtx->ahandle = pReq->info.ahandle; pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
@ -3156,7 +3209,7 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(pCtx->epSet), EPSET_GET_INUSE_PORT(pCtx->epSet), pReq->info.ahandle);
code = transAsyncSend(pThrd->asyncPool, &pCliReq->q); code = transAsyncSend(pThrd->asyncPool, &pCliReq->q);
if (code != 0) { if (code != 0) {
@ -3183,6 +3236,12 @@ _RETURN:
return code; return code;
_RETURN2: _RETURN2:
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
if (pCtx != NULL) {
taosMemoryFree(pCtx->epSet);
taosMemoryFree(pCtx->origEpSet);
taosMemoryFree(pCtx);
}
pReq->pCont = NULL; pReq->pCont = NULL;
taosMemoryFree(pTransMsg); taosMemoryFree(pTransMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef);

View File

@ -667,7 +667,7 @@ void transPrintEpSet(SEpSet* pEpSet) {
len += snprintf(buf + len, sizeof(buf) - len, "}"); len += snprintf(buf + len, sizeof(buf) - len, "}");
tTrace("%s, inUse:%d", buf, pEpSet->inUse); tTrace("%s, inUse:%d", buf, pEpSet->inUse);
} }
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { bool transRepEpsetIsEqual(SReqEpSet* a, SReqEpSet* b) {
if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) { if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {
return false; return false;
} }
@ -678,7 +678,7 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
} }
return true; return true;
} }
bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) { bool transCompareReqAndUserEpset(SReqEpSet* a, SEpSet* b) {
if (a->numOfEps != b->numOfEps) { if (a->numOfEps != b->numOfEps) {
return false; return false;
} }
@ -935,3 +935,45 @@ int32_t transSetReadOption(uv_handle_t* handle) {
code = taosSetSockOpt2(fd); code = taosSetSockOpt2(fd);
return code; return code;
} }
int32_t transCreateReqEpsetFromUserEpset(const SEpSet* pEpset, SReqEpSet** pReqEpSet) {
if (pEpset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReqEpSet == NULL) {
return TSDB_CODE_INVALID_PARA;
}
taosMemoryFree(*pReqEpSet);
int32_t size = sizeof(SReqEpSet) + sizeof(SEp) * pEpset->numOfEps;
SReqEpSet* pReq = (SReqEpSet*)taosMemoryCalloc(1, size);
if (pReq == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->inUse = pEpset->inUse;
pReq->numOfEps = pEpset->numOfEps;
for (int32_t i = 0; i < pEpset->numOfEps; i++) {
pReq->eps[i].port = pEpset->eps[i].port;
strcpy(pReq->eps[i].fqdn, pEpset->eps[i].fqdn);
}
*pReqEpSet = pReq;
return TSDB_CODE_SUCCESS;
}
int32_t transCreateUserEpsetFromReqEpset(const SReqEpSet* pReqEpSet, SEpSet* pEpSet) {
if (pReqEpSet == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pEpSet->inUse = pReqEpSet->inUse;
pEpSet->numOfEps = pReqEpSet->numOfEps;
for (int32_t i = 0; i < pReqEpSet->numOfEps; i++) {
pEpSet->eps[i].port = pReqEpSet->eps[i].port;
strcpy(pEpSet->eps[i].fqdn, pReqEpSet->eps[i].fqdn);
}
return TSDB_CODE_SUCCESS;
}