enh: add bypassFlag to facilitate performance testing
This commit is contained in:
parent
08c431b035
commit
3225c381e2
|
@ -96,7 +96,8 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
|
||||
#define TSDB_CODE_RPC_ASYNC_IN_PROCESS TAOS_DEF_ERROR_CODE(0, 0x0028)
|
||||
#define TSDB_CODE_RPC_NO_STATE TAOS_DEF_ERROR_CODE(0, 0x0029)
|
||||
#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A)
|
||||
#define TSDB_CODE_RPC_STATE_DROPED TAOS_DEF_ERROR_CODE(0, 0x002A)
|
||||
#define TSDB_CODE_RPC_BYPASS_SEND TAOS_DEF_ERROR_CODE(0, 0x002B)
|
||||
|
||||
//common & util
|
||||
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0100) //
|
||||
|
|
|
@ -623,10 +623,11 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
|
|||
/**
|
||||
* RB: return before
|
||||
* RA: return after
|
||||
* NR: not return, skip and go on following code
|
||||
* NR: not return, skip and go on following steps
|
||||
*/
|
||||
#define TSDB_BYPASS_RB_RPC_SEND_SUBMIT 0x01u
|
||||
#define TSDB_BYPASS_RA_RPC_RECV_SUBMIT 0x02u
|
||||
#define TSDB_BYPASS_RB_TSDB_WRITE_MEM 0x04u
|
||||
#define TSDB_BYPASS_RB_TSDB_COMMIT 0x08u
|
||||
|
||||
#define DEFAULT_HANDLE 0
|
||||
|
|
|
@ -3108,8 +3108,9 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
|
|||
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
||||
pRequest->inCallback = true;
|
||||
int64_t this = pRequest->self;
|
||||
if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
|
||||
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
|
||||
if ((tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
|
||||
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) ||
|
||||
((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (code == TSDB_CODE_RPC_BYPASS_SEND))) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
}
|
||||
|
|
|
@ -362,6 +362,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
|
||||
return TSDB_CODE_MSG_PREPROCESSED;
|
||||
}
|
||||
|
||||
SDecoder *pCoder = &(SDecoder){0};
|
||||
|
||||
if (taosHton64(((SSubmitReq2Msg *)pMsg->pCont)->version) != 1) {
|
||||
|
@ -1787,6 +1791,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
int32_t code = 0;
|
||||
terrno = 0;
|
||||
|
||||
if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
|
||||
return TSDB_CODE_MSG_PREPROCESSED;
|
||||
}
|
||||
|
||||
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
|
||||
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
|
||||
SArray *newTbUids = NULL;
|
||||
|
|
|
@ -1021,10 +1021,12 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
|||
|
||||
_return:
|
||||
|
||||
if (pJob) {
|
||||
SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||
} else {
|
||||
qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||
if(code != TSDB_CODE_RPC_BYPASS_SEND) {
|
||||
if (pJob) {
|
||||
SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||
} else {
|
||||
qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
if (pMsgSendInfo) {
|
||||
|
|
|
@ -172,7 +172,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
|
|||
SSchJob *pJob = NULL;
|
||||
(void)schAcquireJob(*jobId, &pJob);
|
||||
if (NULL == pJob) {
|
||||
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
|
||||
qTrace("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -190,6 +190,11 @@ int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t
|
|||
return transSendRequest(pInit, pEpSet, pMsg, NULL);
|
||||
}
|
||||
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||
if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (pMsg->msgType == TDMT_VND_SUBMIT)) {
|
||||
transFreeMsg(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
return TSDB_CODE_RPC_BYPASS_SEND;
|
||||
}
|
||||
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
|
||||
return transSendRequest(pInit, pEpSet, pMsg, pCtx);
|
||||
} else {
|
||||
|
|
|
@ -61,7 +61,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already qu
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_IN_PROCESS, "rpc async in process")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NO_STATE, "rpc no state")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_STATE_DROPED, "rpc state already dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BYPASS_SEND, "rpc bypass send")
|
||||
|
||||
//common & util
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
|
||||
|
|
Loading…
Reference in New Issue