enh: add bypassFlag to facilitate performance testing
This commit is contained in:
parent
7e17f6366b
commit
3ebb664953
|
@ -3108,9 +3108,8 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
|
||||||
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
||||||
pRequest->inCallback = true;
|
pRequest->inCallback = true;
|
||||||
int64_t this = pRequest->self;
|
int64_t this = pRequest->self;
|
||||||
if ((tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
|
if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
|
||||||
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) ||
|
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
|
||||||
((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (code == TSDB_CODE_RPC_BYPASS_SEND))) {
|
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,6 +122,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
|
||||||
tb_uid_t suid = pSubmitTbData->suid;
|
tb_uid_t suid = pSubmitTbData->suid;
|
||||||
tb_uid_t uid = pSubmitTbData->uid;
|
tb_uid_t uid = pSubmitTbData->uid;
|
||||||
|
|
||||||
|
if (tsBypassFlag & TSDB_BYPASS_RB_TSDB_WRITE_MEM) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// create/get STbData to op
|
// create/get STbData to op
|
||||||
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -1791,10 +1791,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
if (tsBypassFlag & TSDB_BYPASS_RA_RPC_RECV_SUBMIT) {
|
|
||||||
return TSDB_CODE_MSG_PREPROCESSED;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
|
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
|
||||||
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
|
SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
|
||||||
SArray *newTbUids = NULL;
|
SArray *newTbUids = NULL;
|
||||||
|
|
|
@ -1021,12 +1021,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if(code != TSDB_CODE_RPC_BYPASS_SEND) {
|
if (pJob) {
|
||||||
if (pJob) {
|
SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||||
SCH_TASK_ELOG("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
} else {
|
||||||
} else {
|
qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
||||||
qError("fail to send msg, type:%d, %s, error:%s", msgType, TMSG_INFO(msgType), tstrerror(code));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsgSendInfo) {
|
if (pMsgSendInfo) {
|
||||||
|
@ -1347,30 +1345,19 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (TDMT_VND_SUBMIT == msgType)) {
|
||||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
taosMemoryFree(msg);
|
||||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||||
msg = NULL;
|
} else {
|
||||||
SCH_ERR_JRET(code);
|
|
||||||
|
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
|
||||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if (TDMT_VND_SUBMIT != msgType) {
|
|
||||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
|
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
taosMemoryFree(msg);
|
|
||||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -190,11 +190,6 @@ int32_t rpcSendRequest(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t
|
||||||
return transSendRequest(pInit, pEpSet, pMsg, NULL);
|
return transSendRequest(pInit, pEpSet, pMsg, NULL);
|
||||||
}
|
}
|
||||||
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
int32_t rpcSendRequestWithCtx(void* pInit, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
|
||||||
if ((tsBypassFlag & TSDB_BYPASS_RB_RPC_SEND_SUBMIT) && (pMsg->msgType == TDMT_VND_SUBMIT)) {
|
|
||||||
transFreeMsg(pMsg->pCont);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
return TSDB_CODE_RPC_BYPASS_SEND;
|
|
||||||
}
|
|
||||||
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
|
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
|
||||||
return transSendRequest(pInit, pEpSet, pMsg, pCtx);
|
return transSendRequest(pInit, pEpSet, pMsg, pCtx);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue