From 3ebb664953c3129fa7f62c64fcca100b4a629c92 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 18 Nov 2024 18:12:39 +0800 Subject: [PATCH] enh: add bypassFlag to facilitate performance testing --- source/client/src/clientImpl.c | 5 ++-- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 4 +++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 --- source/libs/scheduler/src/schRemote.c | 31 +++++++--------------- source/libs/transport/src/trans.c | 5 ---- 5 files changed, 15 insertions(+), 34 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index bc1e834ed8..8a0b1ddaab 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -3108,9 +3108,8 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param void doRequestCallback(SRequestObj* pRequest, int32_t code) { pRequest->inCallback = true; int64_t this = pRequest->self; - if ((tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && - (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) || - ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (code == TSDB_CODE_RPC_BYPASS_SEND))) { + if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && + (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) { code = TSDB_CODE_SUCCESS; pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index eb22335311..5b26d17519 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -122,6 +122,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi tb_uid_t suid = pSubmitTbData->suid; tb_uid_t uid = pSubmitTbData->uid; + if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) { + goto _err; + } + // create/get STbData to op code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c0d0f5a6c4..16c5e026d1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1791,10 +1791,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in int32_t code = 0; terrno = 0; - if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) { - return TSDB_CODE_MSG_PREPROCESSED; - } - SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0}; SArray *newTbUids = NULL; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ced5c6f4a3..3321fdb4b5 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1021,12 +1021,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery _return: - if(code != TSDB_CODE_RPC_BYPASS_SEND) { - if (pJob) { - SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); - } else { - qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); - } + if (pJob) { + SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); + } else { + qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code)); } if (pMsgSendInfo) { @@ -1347,30 +1345,19 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } -#if 1 - SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); - msg = NULL; - SCH_ERR_JRET(code); - - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { - SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); - } -#else - if (TDMT_VND_SUBMIT != msgType) { + if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) { + taosMemoryFree(msg); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + } else { SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); } - } else { - taosMemoryFree(msg); - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); } -#endif return TSDB_CODE_SUCCESS; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a1e0342d83..de129773a0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -190,11 +190,6 @@ int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t return transSendRequest(pInit, pEpSet, pMsg, NULL); } int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (pMsg->msgType == TDMT_VND_SUBMIT)) { - transFreeMsg(pMsg->pCont); - pMsg->pCont = NULL; - return TSDB_CODE_RPC_BYPASS_SEND; - } if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) { return transSendRequest(pInit, pEpSet, pMsg, pCtx); } else {