Merge pull request #23329 from taosdata/opt/addStatusSend
Opt/add status send
This commit is contained in:
commit
9f9fae3b99
|
@ -163,6 +163,7 @@ int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc in
|
||||||
// These functions will not be called in the child process
|
// These functions will not be called in the child process
|
||||||
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
|
||||||
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
|
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int32_t timeoutMs);
|
||||||
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
|
||||||
void *rpcAllocHandle();
|
void *rpcAllocHandle();
|
||||||
void rpcSetIpWhite(void *thandl, void *arg);
|
void rpcSetIpWhite(void *thandl, void *arg);
|
||||||
|
|
|
@ -249,7 +249,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_PASSWORD_LEN 32
|
#define TSDB_PASSWORD_LEN 32
|
||||||
#define TSDB_USET_PASSWORD_LEN 129
|
#define TSDB_USET_PASSWORD_LEN 129
|
||||||
#define TSDB_VERSION_LEN 32
|
#define TSDB_VERSION_LEN 32
|
||||||
#define TSDB_LABEL_LEN 8
|
#define TSDB_LABEL_LEN 12
|
||||||
#define TSDB_JOB_STATUS_LEN 32
|
#define TSDB_JOB_STATUS_LEN 32
|
||||||
|
|
||||||
#define TSDB_CLUSTER_ID_LEN 40
|
#define TSDB_CLUSTER_ID_LEN 40
|
||||||
|
|
|
@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
|
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
|
rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, 5000);
|
||||||
if (rpcRsp.code != 0) {
|
if (rpcRsp.code != 0) {
|
||||||
dmRotateMnodeEpSet(pMgmt->pData);
|
dmRotateMnodeEpSet(pMgmt->pData);
|
||||||
char tbuf[256];
|
char tbuf[256];
|
||||||
|
|
|
@ -115,7 +115,9 @@ int32_t dmRunDnode(SDnode *pDnode);
|
||||||
int32_t dmInitServer(SDnode *pDnode);
|
int32_t dmInitServer(SDnode *pDnode);
|
||||||
void dmCleanupServer(SDnode *pDnode);
|
void dmCleanupServer(SDnode *pDnode);
|
||||||
int32_t dmInitClient(SDnode *pDnode);
|
int32_t dmInitClient(SDnode *pDnode);
|
||||||
|
int32_t dmInitStatusClient(SDnode *pDnode);
|
||||||
void dmCleanupClient(SDnode *pDnode);
|
void dmCleanupClient(SDnode *pDnode);
|
||||||
|
void dmCleanupStatusClient(SDnode *pDnode);
|
||||||
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode);
|
int32_t dmInitMsgHandle(SDnode *pDnode);
|
||||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#ifdef TD_TSZ
|
#ifdef TD_TSZ
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t dmInitDnode(SDnode *pDnode) {
|
int32_t dmInitDnode(SDnode *pDnode) {
|
||||||
|
@ -66,7 +66,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(dmInitModule(pDnode) != 0) {
|
if (dmInitModule(pDnode) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,6 +91,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
if (pDnode == NULL) return;
|
if (pDnode == NULL) return;
|
||||||
|
|
||||||
dmCleanupClient(pDnode);
|
dmCleanupClient(pDnode);
|
||||||
|
dmCleanupStatusClient(pDnode);
|
||||||
dmCleanupServer(pDnode);
|
dmCleanupServer(pDnode);
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
|
|
|
@ -358,6 +358,50 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
dDebug("dnode rpc client is initialized");
|
dDebug("dnode rpc client is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int32_t dmInitStatusClient(SDnode *pDnode) {
|
||||||
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
SRpcInit rpcInit = {0};
|
||||||
|
rpcInit.label = "DND-STATUS";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
|
||||||
|
rpcInit.sessions = 1024;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.user = TSDB_DEFAULT_USER;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.parent = pDnode;
|
||||||
|
rpcInit.rfp = rpcRfp;
|
||||||
|
rpcInit.compressSize = tsCompressMsgSize;
|
||||||
|
|
||||||
|
rpcInit.retryMinInterval = tsRedirectPeriod;
|
||||||
|
rpcInit.retryStepFactor = tsRedirectFactor;
|
||||||
|
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
|
||||||
|
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
|
||||||
|
|
||||||
|
rpcInit.failFastInterval = 5000; // interval threshold(ms)
|
||||||
|
rpcInit.failFastThreshold = 3; // failed threshold
|
||||||
|
rpcInit.ffp = dmFailFastFp;
|
||||||
|
|
||||||
|
int32_t connLimitNum = 100;
|
||||||
|
connLimitNum = TMAX(connLimitNum, 10);
|
||||||
|
connLimitNum = TMIN(connLimitNum, 500);
|
||||||
|
|
||||||
|
rpcInit.connLimitNum = connLimitNum;
|
||||||
|
rpcInit.connLimitLock = 1;
|
||||||
|
rpcInit.supportBatch = 1;
|
||||||
|
rpcInit.batchSize = 8 * 1024;
|
||||||
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
|
|
||||||
|
// pTrans->statusClientRpc = rpcOpen(&rpcInit);
|
||||||
|
// if (pTrans->statusClientRpc == NULL) {
|
||||||
|
// dError("failed to init dnode rpc status client");
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
|
||||||
|
dDebug("dnode rpc status client is initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void dmCleanupClient(SDnode *pDnode) {
|
void dmCleanupClient(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
@ -367,6 +411,14 @@ void dmCleanupClient(SDnode *pDnode) {
|
||||||
dDebug("dnode rpc client is closed");
|
dDebug("dnode rpc client is closed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void dmCleanupStatusClient(SDnode *pDnode) {
|
||||||
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
// if (pTrans->statusClientRpc) {
|
||||||
|
// rpcClose(pTrans->statusClientRpc);
|
||||||
|
// pTrans->statusClientRpc = NULL;
|
||||||
|
// dDebug("dnode rpc status client is closed");
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dmInitServer(SDnode *pDnode) {
|
int32_t dmInitServer(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
|
@ -119,6 +119,13 @@ typedef struct SExHandle {
|
||||||
void* pThrd;
|
void* pThrd;
|
||||||
} SExHandle;
|
} SExHandle;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
STransMsg* pRsp;
|
||||||
|
tsem_t* pSem;
|
||||||
|
int8_t inited;
|
||||||
|
SRWLatch latch;
|
||||||
|
} STransSyncMsg;
|
||||||
|
|
||||||
/*convet from fqdn to ip */
|
/*convet from fqdn to ip */
|
||||||
typedef struct SCvtAddr {
|
typedef struct SCvtAddr {
|
||||||
char ip[TSDB_FQDN_LEN];
|
char ip[TSDB_FQDN_LEN];
|
||||||
|
@ -133,11 +140,13 @@ typedef struct {
|
||||||
tmsg_t msgType; // message type
|
tmsg_t msgType; // message type
|
||||||
int8_t connType; // connection type cli/srv
|
int8_t connType; // connection type cli/srv
|
||||||
|
|
||||||
STransCtx appCtx; //
|
STransCtx appCtx; //
|
||||||
STransMsg* pRsp; // for synchronous API
|
STransMsg* pRsp; // for synchronous API
|
||||||
tsem_t* pSem; // for synchronous API
|
tsem_t* pSem; // for synchronous API
|
||||||
SCvtAddr cvtAddr;
|
STransSyncMsg* pSyncMsg; // for syncchronous with timeout API
|
||||||
bool setMaxRetry;
|
int64_t syncMsgRef;
|
||||||
|
SCvtAddr cvtAddr;
|
||||||
|
bool setMaxRetry;
|
||||||
|
|
||||||
int32_t retryMinInterval;
|
int32_t retryMinInterval;
|
||||||
int32_t retryMaxInterval;
|
int32_t retryMaxInterval;
|
||||||
|
@ -307,6 +316,7 @@ int transReleaseSrvHandle(void* handle);
|
||||||
|
|
||||||
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
|
||||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
|
||||||
|
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs);
|
||||||
int transSendResponse(const STransMsg* msg);
|
int transSendResponse(const STransMsg* msg);
|
||||||
int transRegisterMsg(const STransMsg* msg);
|
int transRegisterMsg(const STransMsg* msg);
|
||||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||||
|
@ -432,10 +442,11 @@ int64_t transAddExHandle(int32_t refMgt, void* p);
|
||||||
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
|
int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
|
||||||
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
|
void* transAcquireExHandle(int32_t refMgt, int64_t refId);
|
||||||
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
|
int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
|
||||||
void transDestoryExHandle(void* handle);
|
void transDestroyExHandle(void* handle);
|
||||||
|
|
||||||
int32_t transGetRefMgt();
|
int32_t transGetRefMgt();
|
||||||
int32_t transGetInstMgt();
|
int32_t transGetInstMgt();
|
||||||
|
int32_t transGetSyncMsgMgt();
|
||||||
|
|
||||||
void transHttpEnvDestroy();
|
void transHttpEnvDestroy();
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,8 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (pInit->label) {
|
if (pInit->label) {
|
||||||
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label);
|
||||||
|
memcpy(pRpc->label, pInit->label, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRpc->compressSize = pInit->compressSize;
|
pRpc->compressSize = pInit->compressSize;
|
||||||
|
@ -168,6 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in
|
||||||
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
|
||||||
}
|
}
|
||||||
|
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) {
|
||||||
|
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs);
|
||||||
|
}
|
||||||
|
|
||||||
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
|
||||||
|
|
||||||
|
|
|
@ -2411,15 +2411,26 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pCtx->pSem != NULL) {
|
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
|
||||||
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
if (pCtx->pRsp == NULL) {
|
if (pCtx->pSem) {
|
||||||
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
|
if (pCtx->pRsp == NULL) {
|
||||||
|
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
} else {
|
||||||
|
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
|
||||||
|
}
|
||||||
|
tsem_post(pCtx->pSem);
|
||||||
|
pCtx->pRsp = NULL;
|
||||||
} else {
|
} else {
|
||||||
memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp));
|
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
|
||||||
|
if (pSyncMsg != NULL) {
|
||||||
|
memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp));
|
||||||
|
tsem_post(pSyncMsg->pSem);
|
||||||
|
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
|
||||||
|
} else {
|
||||||
|
rpcFreeCont(pResp->pCont);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tsem_post(pCtx->pSem);
|
|
||||||
pCtx->pRsp = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
if (retry == false && hasEpSet == true) {
|
if (retry == false && hasEpSet == true) {
|
||||||
|
@ -2563,15 +2574,18 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
|
||||||
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg));
|
||||||
if (pTransInst == NULL) {
|
if (pTransInst == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
|
taosMemoryFree(pTransRsp);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
||||||
if (pThrd == NULL) {
|
if (pThrd == NULL) {
|
||||||
transFreeMsg(pReq->pCont);
|
transFreeMsg(pReq->pCont);
|
||||||
|
taosMemoryFree(pTransRsp);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||||
}
|
}
|
||||||
|
@ -2587,7 +2601,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
||||||
pCtx->ahandle = pReq->info.ahandle;
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->pSem = sem;
|
pCtx->pSem = sem;
|
||||||
pCtx->pRsp = pRsp;
|
pCtx->pRsp = pTransRsp;
|
||||||
|
|
||||||
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
cliMsg->ctx = pCtx;
|
cliMsg->ctx = pCtx;
|
||||||
|
@ -2607,10 +2621,86 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
||||||
}
|
}
|
||||||
tsem_wait(sem);
|
tsem_wait(sem);
|
||||||
|
|
||||||
|
memcpy(pRsp, pTransRsp, sizeof(STransMsg));
|
||||||
|
|
||||||
_RETURN:
|
_RETURN:
|
||||||
tsem_destroy(sem);
|
tsem_destroy(sem);
|
||||||
taosMemoryFree(sem);
|
taosMemoryFree(sem);
|
||||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
taosMemoryFree(pTransRsp);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
|
||||||
|
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||||
|
tsem_init(sem, 0, 0);
|
||||||
|
|
||||||
|
STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg));
|
||||||
|
|
||||||
|
taosInitRWLatch(&pSyncMsg->latch);
|
||||||
|
pSyncMsg->inited = 0;
|
||||||
|
pSyncMsg->pRsp = pTransMsg;
|
||||||
|
pSyncMsg->pSem = sem;
|
||||||
|
|
||||||
|
return taosAddRef(transGetSyncMsgMgt(), pSyncMsg);
|
||||||
|
}
|
||||||
|
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) {
|
||||||
|
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
|
||||||
|
if (pTransInst == NULL) {
|
||||||
|
transFreeMsg(pReq->pCont);
|
||||||
|
taosMemoryFree(pTransMsg);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
|
||||||
|
if (pThrd == NULL) {
|
||||||
|
transFreeMsg(pReq->pCont);
|
||||||
|
taosMemoryFree(pTransMsg);
|
||||||
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||||
|
|
||||||
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
|
epsetAssign(&pCtx->epSet, pEpSet);
|
||||||
|
epsetAssign(&pCtx->origEpSet, pEpSet);
|
||||||
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
|
pCtx->msgType = pReq->msgType;
|
||||||
|
pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg);
|
||||||
|
|
||||||
|
int64_t ref = pCtx->syncMsgRef;
|
||||||
|
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref);
|
||||||
|
|
||||||
|
SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg));
|
||||||
|
cliMsg->ctx = pCtx;
|
||||||
|
cliMsg->msg = *pReq;
|
||||||
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
cliMsg->type = Normal;
|
||||||
|
cliMsg->refId = (int64_t)shandle;
|
||||||
|
|
||||||
|
STraceId* trace = &pReq->info.traceId;
|
||||||
|
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
|
||||||
|
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
|
||||||
|
|
||||||
|
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
|
||||||
|
if (ret != 0) {
|
||||||
|
destroyCmsg(cliMsg);
|
||||||
|
goto _RETURN;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tsem_timewait(pSyncMsg->pSem, timeoutMs);
|
||||||
|
if (ret < 0) {
|
||||||
|
pRsp->code = TSDB_CODE_TIMEOUT_ERROR;
|
||||||
|
ret = TSDB_CODE_TIMEOUT_ERROR;
|
||||||
|
} else {
|
||||||
|
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
_RETURN:
|
||||||
|
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
|
taosReleaseRef(transGetSyncMsgMgt(), ref);
|
||||||
|
taosRemoveRef(transGetSyncMsgMgt(), ref);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -21,6 +21,9 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
static int32_t refMgt;
|
static int32_t refMgt;
|
||||||
static int32_t instMgt;
|
static int32_t instMgt;
|
||||||
|
static int32_t transSyncMsgMgt;
|
||||||
|
|
||||||
|
void transDestroySyncMsg(void* msg);
|
||||||
|
|
||||||
int32_t transCompressMsg(char* msg, int32_t len) {
|
int32_t transCompressMsg(char* msg, int32_t len) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
@ -601,13 +604,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void transInitEnv() {
|
static void transInitEnv() {
|
||||||
refMgt = transOpenRefMgt(50000, transDestoryExHandle);
|
refMgt = transOpenRefMgt(50000, transDestroyExHandle);
|
||||||
instMgt = taosOpenRef(50, rpcCloseImpl);
|
instMgt = taosOpenRef(50, rpcCloseImpl);
|
||||||
|
transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
|
||||||
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
|
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
|
||||||
}
|
}
|
||||||
static void transDestroyEnv() {
|
static void transDestroyEnv() {
|
||||||
transCloseRefMgt(refMgt);
|
transCloseRefMgt(refMgt);
|
||||||
transCloseRefMgt(instMgt);
|
transCloseRefMgt(instMgt);
|
||||||
|
transCloseRefMgt(transSyncMsgMgt);
|
||||||
}
|
}
|
||||||
|
|
||||||
void transInit() {
|
void transInit() {
|
||||||
|
@ -617,6 +622,7 @@ void transInit() {
|
||||||
|
|
||||||
int32_t transGetRefMgt() { return refMgt; }
|
int32_t transGetRefMgt() { return refMgt; }
|
||||||
int32_t transGetInstMgt() { return instMgt; }
|
int32_t transGetInstMgt() { return instMgt; }
|
||||||
|
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }
|
||||||
|
|
||||||
void transCleanup() {
|
void transCleanup() {
|
||||||
// clean env
|
// clean env
|
||||||
|
@ -648,13 +654,24 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
|
||||||
// release extern handle
|
// release extern handle
|
||||||
return taosReleaseRef(refMgt, refId);
|
return taosReleaseRef(refMgt, refId);
|
||||||
}
|
}
|
||||||
void transDestoryExHandle(void* handle) {
|
void transDestroyExHandle(void* handle) {
|
||||||
if (handle == NULL) {
|
if (handle == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosMemoryFree(handle);
|
taosMemoryFree(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void transDestroySyncMsg(void* msg) {
|
||||||
|
if (msg == NULL) return;
|
||||||
|
|
||||||
|
STransSyncMsg* pSyncMsg = msg;
|
||||||
|
tsem_destroy(pSyncMsg->pSem);
|
||||||
|
taosMemoryFree(pSyncMsg->pSem);
|
||||||
|
|
||||||
|
taosMemoryFree(pSyncMsg->pRsp);
|
||||||
|
taosMemoryFree(pSyncMsg);
|
||||||
|
}
|
||||||
|
|
||||||
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
|
// void subnetIp2int(const char* const ip_addr, uint8_t* dst) {
|
||||||
// char ip_addr_cpy[20];
|
// char ip_addr_cpy[20];
|
||||||
// char ip[5];
|
// char ip[5];
|
||||||
|
|
Loading…
Reference in New Issue