enh: add rpc msg abort preprocess
This commit is contained in:
parent
a8694bd863
commit
ef21c6b339
|
@ -96,6 +96,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
|||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
|
||||
int32_t mndPreProcessMsg(SRpcMsg *pMsg);
|
||||
void mndAbortPreprocessMsg(SRpcMsg *pMsg);
|
||||
|
||||
/**
|
||||
* @brief Generate machine code
|
||||
|
|
|
@ -64,6 +64,8 @@ typedef struct {
|
|||
|
||||
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
|
||||
|
||||
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
||||
|
|
|
@ -550,6 +550,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
|||
pMsg->msgType != TDMT_MND_TRANS_TIMER) {
|
||||
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
|
||||
|
||||
mndAbortPreprocessMsg(pMsg);
|
||||
|
||||
SEpSet epSet = {0};
|
||||
mndGetMnodeEpSet(pMsg->info.node, &epSet);
|
||||
|
||||
|
|
|
@ -25,6 +25,13 @@ int32_t mndPreProcessMsg(SRpcMsg *pMsg) {
|
|||
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||
}
|
||||
|
||||
void mndAbortPreprocessMsg(SRpcMsg *pMsg) {
|
||||
if (TDMT_VND_QUERY != pMsg->msgType) return;
|
||||
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||
}
|
||||
|
||||
int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = -1;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include "qwInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
|
||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql);
|
||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
|
|
|
@ -283,6 +283,26 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
uint64_t tId = msg->taskId;
|
||||
int64_t rId = msg->refId;
|
||||
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
qwAbortPrerocessQuery(QW_FPARAMS());
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
|
|
|
@ -482,6 +482,13 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
|
||||
QW_ERR_RET(qwDropTask(QW_FPARAMS()));
|
||||
|
||||
QW_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
|
|
Loading…
Reference in New Issue