Merge pull request #19046 from taosdata/fix/TD-21180-3.0

feat(rpc):  return new error code TSDB_CODE_RPC_VGROUP_NOT_CONNECTED
This commit is contained in:
dapan1121 2022-12-26 18:08:48 +08:00 committed by GitHub
commit 66cece62de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 33 additions and 4 deletions

View File

@ -268,7 +268,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define NEED_REDIRECT_ERROR(_code) \
(NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \

View File

@ -65,6 +65,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017) //
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018) //
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019) //
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
//common & util
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //

View File

@ -1430,6 +1430,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
}
// pMsg is response msg
if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
// restore origin code
if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
} else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
}
} else {
// uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
}
}
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
arg->msg = *pMsg;
arg->pEpset = tEpSet;

View File

@ -248,6 +248,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_MNODE_NOT_FOUND ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||

View File

@ -957,7 +957,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
if (code == TSDB_CODE_SYN_TIMEOUT) {

View File

@ -606,6 +606,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||

View File

@ -375,7 +375,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \

View File

@ -1665,11 +1665,20 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
int32_t code = pResp->code;
// return internal code app
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
pResp->code = pCtx->retryCode;
}
}
// check whole vnodes is offline on this vgroup
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps || pCtx->retryStep > 0) {
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
pResp->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
} else if (pResp->code == TSDB_CODE_RPC_BROKEN_LINK) {
pResp->code = TSDB_CODE_RPC_SOMENODE_BROKEN_LINK;
}
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {

View File

@ -51,6 +51,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQD
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")