add rpc sync read timeout

This commit is contained in:
yihaoDeng 2023-10-19 10:15:04 +08:00
parent ef27c87756
commit 1910540a61
4 changed files with 131 additions and 16 deletions

View File

@ -119,6 +119,13 @@ typedef struct SExHandle {
void* pThrd;
} SExHandle;
typedef struct {
STransMsg* pRsp;
tsem_t* pSem;
int8_t inited;
SRWLatch latch;
} STransSyncMsg;
/*convet from fqdn to ip */
typedef struct SCvtAddr {
char ip[TSDB_FQDN_LEN];
@ -136,6 +143,8 @@ typedef struct {
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
int64_t syncMsgRef;
SCvtAddr cvtAddr;
bool setMaxRetry;
@ -307,6 +316,7 @@ int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
@ -432,10 +442,11 @@ int64_t transAddExHandle(int32_t refMgt, void* p);
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestoryExHandle(void* handle);
void transDestroyExHandle(void* handle);
int32_t transGetRefMgt();
int32_t transGetInstMgt();
int32_t transGetSyncMsgMgt();
void transHttpEnvDestroy();

View File

@ -169,6 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs);
}
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }

View File

@ -2411,8 +2411,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
}
}
if (pCtx->pSem != NULL) {
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pSem) {
if (pCtx->pRsp == NULL) {
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
} else {
@ -2420,6 +2421,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
tsem_post(pCtx->pSem);
pCtx->pRsp = NULL;
} else {
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
if (pSyncMsg != NULL) {
memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp));
tsem_post(pSyncMsg->pSem);
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
}
}
} else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (retry == false && hasEpSet == true) {
@ -2564,6 +2573,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return -1;
@ -2587,7 +2597,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->pSem = sem;
pCtx->pRsp = pRsp;
pCtx->pRsp = pTransRsp;
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
@ -2607,10 +2617,84 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
}
tsem_wait(sem);
memcpy(pRsp, pTransRsp, sizeof(STransMsg));
_RETURN:
tsem_destroy(sem);
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
taosMemoryFree(pTransRsp);
return ret;
}
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
tsem_init(sem, 0, 0);
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
taosInitRWLatch(&pSyncMsg->latch);
pSyncMsg->inited = 0;
pSyncMsg->pRsp = pTransMsg;
pSyncMsg->pSem = sem;
return taosAddRef(transGetSyncMsgMgt(), pSyncMsg);
}
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return -1;
}
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
epsetAssign(&pCtx->epSet, pEpSet);
epsetAssign(&pCtx->origEpSet, pEpSet);
pCtx->ahandle = pReq->info.ahandle;
pCtx->msgType = pReq->msgType;
pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg);
int64_t ref = pCtx->syncMsgRef;
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref);
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cliMsg->ctx = pCtx;
cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs();
cliMsg->type = Normal;
cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
destroyCmsg(cliMsg);
goto _RETURN;
}
ret = tsem_timewait(pSyncMsg->pSem, timeoutMs);
if (ret < 0) {
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
ret = TSDB_CODE_TIMEOUT_ERROR;
} else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
ret = 0;
}
_RETURN:
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
taosReleaseRef(transGetSyncMsgMgt(), ref);
taosRemoveRef(transGetSyncMsgMgt(), ref);
return ret;
}
/*

View File

@ -21,6 +21,9 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
static int32_t instMgt;
static int32_t transSyncMsgMgt;
void transDestroySyncMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len) {
int32_t ret = 0;
@ -601,13 +604,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
static void transInitEnv() {
refMgt = transOpenRefMgt(50000, transDestoryExHandle);
refMgt = transOpenRefMgt(50000, transDestroyExHandle);
instMgt = taosOpenRef(50, rpcCloseImpl);
transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
}
static void transDestroyEnv() {
transCloseRefMgt(refMgt);
transCloseRefMgt(instMgt);
transCloseRefMgt(transSyncMsgMgt);
}
void transInit() {
@ -617,6 +622,7 @@ void transInit() {
int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
void transCleanup() {
// clean env
@ -648,13 +654,24 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
// release extern handle
return taosReleaseRef(refMgt, refId);
}
void transDestoryExHandle(void* handle) {
void transDestroyExHandle(void* handle) {
if (handle == NULL) {
return;
}
taosMemoryFree(handle);
}
void transDestroySyncMsg(void* msg) {
if (msg == NULL) return;
STransSyncMsg* pSyncMsg = msg;
tsem_destroy(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pSem);
taosMemoryFree(pSyncMsg->pRsp);
taosMemoryFree(pSyncMsg);
}
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
// char ip_addr_cpy[20];
// char ip[5];