diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2c811495fd..863c212657 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -96,7 +96,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) #define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028) #define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029) -#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A) +#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A) +#define TSDB_CODE_RPC_BYPASS_SEND TAOS_DEF_ERROR_CODE(0, 0x002B) //common & util #define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) // diff --git a/include/util/tdef.h b/include/util/tdef.h index e869d993a3..0e2f7ed8a6 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -623,10 +623,11 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; /** * RB: return before * RA: return after - * NR: not return, skip and go on following code + * NR: not return, skip and go on following steps */ #define TSDB_BYPASS_RB_RPC_SEND_SUBMIT 0x01u #define TSDB_BYPASS_RA_RPC_RECV_SUBMIT 0x02u +#define TSDB_BYPASS_RB_TSDB_WRITE_MEM 0x04u #define TSDB_BYPASS_RB_TSDB_COMMIT 0x08u #define DEFAULT_HANDLE 0 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8a0b1ddaab..bc1e834ed8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -3108,8 +3108,9 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param void doRequestCallback(SRequestObj* pRequest, int32_t code) { pRequest->inCallback = true; int64_t this = pRequest->self; - if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && - (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) { + if ((tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && + (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) || + ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (code == TSDB_CODE_RPC_BYPASS_SEND))) { code = TSDB_CODE_SUCCESS; pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6702b8b588..c0d0f5a6c4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -362,6 +362,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; int32_t lino = 0; + if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) { + return TSDB_CODE_MSG_PREPROCESSED; + } + SDecoder *pCoder = &(SDecoder){0}; if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) { @@ -1787,6 +1791,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in int32_t code = 0; terrno = 0; + if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) { + return TSDB_CODE_MSG_PREPROCESSED; + } + SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0}; SArray *newTbUids = NULL; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index eefb32f783..ced5c6f4a3 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1021,10 +1021,12 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery _return: - if (pJob) { - SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); - } else { - qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); + if(code != TSDB_CODE_RPC_BYPASS_SEND) { + if (pJob) { + SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); + } else { + qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); + } } if (pMsgSendInfo) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index db9ecd6025..73ac26495a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -172,7 +172,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { SSchJob *pJob = NULL; (void)schAcquireJob(*jobId, &pJob); if (NULL == pJob) { - qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); + qTrace("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index de129773a0..a1e0342d83 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -190,6 +190,11 @@ int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t return transSendRequest(pInit, pEpSet, pMsg, NULL); } int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (pMsg->msgType == TDMT_VND_SUBMIT)) { + transFreeMsg(pMsg->pCont); + pMsg->pCont = NULL; + return TSDB_CODE_RPC_BYPASS_SEND; + } if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) { return transSendRequest(pInit, pEpSet, pMsg, pCtx); } else { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0d8a85155a..227c55e5c0 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -61,7 +61,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already qu TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BYPASS_SEND, "rpc bypass send") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")