feat: add no retry to query
This commit is contained in:
parent
f6f1855da8
commit
14209eeec6
|
@ -69,7 +69,7 @@ typedef struct SRpcMsg {
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
|
||||||
typedef bool (*RpcRfp)(int32_t code);
|
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
char localFqdn[TSDB_FQDN_LEN];
|
char localFqdn[TSDB_FQDN_LEN];
|
||||||
|
|
|
@ -84,9 +84,12 @@ void closeTransporter(STscObj *pTscObj) {
|
||||||
rpcClose(pTscObj->pAppInfo->pTransporter);
|
rpcClose(pTscObj->pAppInfo->pTransporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool clientRpcRfp(int32_t code) {
|
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
|
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -248,9 +248,12 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool rpcRfp(int32_t code) {
|
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
|
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -110,7 +110,7 @@ static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
||||||
static int32_t udfdConnectToMnode();
|
static int32_t udfdConnectToMnode();
|
||||||
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
|
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
|
||||||
static bool udfdRpcRfp(int32_t code);
|
static bool udfdRpcRfp(int32_t code, tmsg_t msgType);
|
||||||
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
||||||
static int32_t udfdOpenClientRpc();
|
static int32_t udfdOpenClientRpc();
|
||||||
static int32_t udfdCloseClientRpc();
|
static int32_t udfdCloseClientRpc();
|
||||||
|
@ -546,9 +546,12 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static bool udfdRpcRfp(int32_t code) {
|
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
|
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -52,7 +52,7 @@ typedef struct {
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
bool (*retry)(int32_t code);
|
bool (*retry)(int32_t code, tmsg_t msgType);
|
||||||
int index;
|
int index;
|
||||||
|
|
||||||
void* parent;
|
void* parent;
|
||||||
|
|
|
@ -164,11 +164,6 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
|
||||||
transSetDefaultAddr(thandle, ip, fqdn);
|
transSetDefaultAddr(thandle, ip, fqdn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// void rpcSetMsgTraceId(SRpcMsg* pMsg, STraceId uid) {
|
|
||||||
// SRpcHandleInfo* pInfo = &pMsg->info;
|
|
||||||
// pInfo->traceId = uid;
|
|
||||||
//}
|
|
||||||
|
|
||||||
int32_t rpcInit() {
|
int32_t rpcInit() {
|
||||||
// impl later
|
// impl later
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -1030,7 +1030,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
*/
|
*/
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
int32_t code = pResp->code;
|
int32_t code = pResp->code;
|
||||||
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
|
if (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) {
|
||||||
pMsg->sent = 0;
|
pMsg->sent = 0;
|
||||||
pCtx->retryCnt += 1;
|
pCtx->retryCnt += 1;
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
|
|
Loading…
Reference in New Issue