feat: query redirect
This commit is contained in:
parent
4a55ed07b3
commit
6d8fd7e506
|
@ -201,12 +201,13 @@ enum {
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_QUERY, "merge-query", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "vnode-drop-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "vnode-explain", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||||
|
|
|
@ -138,6 +138,7 @@ typedef struct SDataBuf {
|
||||||
void* pData;
|
void* pData;
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
void* handle;
|
void* handle;
|
||||||
|
SEpSet* pEpSet;
|
||||||
} SDataBuf;
|
} SDataBuf;
|
||||||
|
|
||||||
typedef struct STargetInfo {
|
typedef struct STargetInfo {
|
||||||
|
@ -234,13 +235,24 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
||||||
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
||||||
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
||||||
|
#define NEED_REDIRECT_ERROR(_code) \
|
||||||
|
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
||||||
|
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \
|
||||||
|
(_code) == TSDB_CODE_APP_NOT_READY)
|
||||||
|
|
||||||
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
||||||
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
|
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
|
||||||
(_type) == TDMT_VND_DROP_STB)
|
(_type) == TDMT_VND_DROP_STB)
|
||||||
|
|
||||||
|
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
|
||||||
|
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
|
||||||
|
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
|
||||||
|
|
||||||
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
|
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
|
||||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||||
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define REQUEST_TOTAL_EXEC_TIMES 2
|
#define REQUEST_TOTAL_EXEC_TIMES 2
|
||||||
|
|
||||||
|
|
|
@ -89,9 +89,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (NEED_REDIRECT_ERROR(code)) {
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -808,7 +808,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_QUERY: {
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY: {
|
||||||
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
|
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1306,7 +1307,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
|
|
||||||
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
|
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
|
||||||
|
|
||||||
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
|
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
|
||||||
|
|
||||||
if (pMsg->contLen > 0) {
|
if (pMsg->contLen > 0) {
|
||||||
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
|
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
|
||||||
|
|
|
@ -210,6 +210,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
|
@ -108,6 +108,7 @@ SArray *qmGetMsgHandles() {
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
|
@ -325,6 +325,7 @@ SArray *vmGetMsgHandles() {
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -251,7 +251,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -527,9 +527,9 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||||
if (!IsReq(pMsg)) return 0;
|
if (!IsReq(pMsg)) return 0;
|
||||||
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE ||
|
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
|
||||||
pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || pMsg->msgType == TDMT_SCH_FETCH ||
|
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
|
||||||
pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||||
|
|
|
@ -19,13 +19,13 @@
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
|
|
||||||
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
|
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
|
||||||
if (TDMT_SCH_QUERY != pMsg->msgType) return 0;
|
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
|
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
|
||||||
if (TDMT_SCH_QUERY != pMsg->msgType) return;
|
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return;
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
||||||
mTrace("msg:%p, in query queue is processing", pMsg);
|
mTrace("msg:%p, in query queue is processing", pMsg);
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY:
|
||||||
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
|
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_QUERY_CONTINUE:
|
case TDMT_SCH_QUERY_CONTINUE:
|
||||||
|
@ -68,6 +69,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
||||||
|
|
|
@ -65,7 +65,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
|
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
|
||||||
if (TDMT_SCH_QUERY != pMsg->msgType) {
|
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,6 +79,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY:
|
||||||
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
|
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_QUERY_CONTINUE:
|
case TDMT_SCH_QUERY_CONTINUE:
|
||||||
|
|
|
@ -216,7 +216,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
if (TDMT_SCH_QUERY != pMsg->msgType) {
|
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,6 +228,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY:
|
||||||
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_QUERY_CONTINUE:
|
case TDMT_SCH_QUERY_CONTINUE:
|
||||||
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
|
||||||
|
|
|
@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -1555,7 +1555,11 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
|
||||||
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
|
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
|
||||||
code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
|
code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
|
||||||
} else {
|
} else {
|
||||||
pSubplan->msgType = TDMT_SCH_QUERY;
|
if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
|
||||||
|
pSubplan->msgType = TDMT_SCH_QUERY;
|
||||||
|
} else {
|
||||||
|
pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
|
||||||
|
}
|
||||||
code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
|
code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
|
||||||
code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
|
code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
|
||||||
|
|
|
@ -216,7 +216,8 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
|
||||||
tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
|
tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_QUERY: {
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY: {
|
||||||
taosArrayDestroy((SArray*)pRes->res);
|
taosArrayDestroy((SArray*)pRes->res);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,6 +383,7 @@ char* schGetOpStr(SCH_OP_TYPE type);
|
||||||
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
||||||
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
|
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
|
||||||
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
|
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
|
||||||
|
int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -682,6 +682,25 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schUpdateTaskCandidateAddr(SSchTask *pTask, SEpSet* pEpSet) {
|
||||||
|
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
|
||||||
|
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||||
|
|
||||||
|
SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
|
||||||
|
SEp* pNew = &pEpSet->eps[pEpSet->inUse];
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
|
||||||
|
|
||||||
|
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
|
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -821,7 +840,6 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO CHECK epList/condidateList
|
|
||||||
if (SCH_IS_DATA_SRC_TASK(pTask)) {
|
if (SCH_IS_DATA_SRC_TASK(pTask)) {
|
||||||
if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
|
if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
|
||||||
*needRetry = false;
|
*needRetry = false;
|
||||||
|
@ -853,7 +871,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
||||||
|
|
||||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
|
|
||||||
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1237,7 +1254,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
|
if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXECUTING == pTask->status &&
|
||||||
|
pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
|
||||||
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
|
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
|
@ -1281,7 +1299,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) {
|
if (taskStatus->status == JOB_TASK_STATUS_NOT_START) {
|
||||||
schRescheduleTask(pJob, pTask);
|
schRescheduleTask(pJob, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1657,5 +1675,86 @@ _return:
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if ((pTask->execIdx + 1) >= pTask->maxExecTimes) {
|
||||||
|
SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx);
|
||||||
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
schProcessOnJobFailure(pJob, rspCode);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("task will be redirected now, status:%d", SCH_GET_TASK_STATUS_STR(pTask));
|
||||||
|
|
||||||
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
|
taosHashClear(pTask->execNodes);
|
||||||
|
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
|
||||||
|
schDeregisterTaskHb(pJob, pTask);
|
||||||
|
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||||
|
taosMemoryFreeClear(pTask->msg);
|
||||||
|
pTask->msgLen = 0;
|
||||||
|
pTask->lastMsgType = 0;
|
||||||
|
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||||
|
|
||||||
|
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
|
||||||
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
|
if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) {
|
||||||
|
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pTask->childReady = 0;
|
||||||
|
|
||||||
|
int32_t childrenNum = taosArrayGetSize(pTask->children);
|
||||||
|
for (int32_t i = 0; i < childrenNum; ++i) {
|
||||||
|
SSchTask* pChild = taosArrayGetP(pTask->children, i);
|
||||||
|
SCH_LOCK_TASK(pChild);
|
||||||
|
code = schDoTaskRedirect(pJob, pChild, rspCode);
|
||||||
|
SCH_UNLOCK_TASK(pChild);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
qClearSubplanExecutionNode(pTask->plan);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
|
||||||
|
|
||||||
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
code = schProcessOnTaskFailure(pJob, pTask, code);
|
||||||
|
|
||||||
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
|
||||||
|
SCH_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SDataBuf* pData, int32_t rspCode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
|
||||||
|
if (NULL == pData->pEpSet) {
|
||||||
|
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
|
||||||
|
SCH_ERR_JRET(rspCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pTask, pData->pEpSet));
|
||||||
|
}
|
||||||
|
|
||||||
|
schDoTaskRedirect(pJob, pTask, rspCode);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
schProcessOnTaskFailure(pJob, pTask, code);
|
||||||
|
|
||||||
|
SCH_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -90,15 +90,6 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
|
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
|
||||||
int32_t rspCode) {
|
int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t status = 0;
|
|
||||||
|
|
||||||
if (schJobNeedToStop(pJob, &status)) {
|
|
||||||
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
|
|
||||||
taosMemoryFreeClear(msg);
|
|
||||||
SCH_RET(atomic_load_32(&pJob->errCode));
|
|
||||||
}
|
|
||||||
|
|
||||||
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
|
|
||||||
|
|
||||||
switch (msgType) {
|
switch (msgType) {
|
||||||
case TDMT_VND_CREATE_TABLE_RSP: {
|
case TDMT_VND_CREATE_TABLE_RSP: {
|
||||||
|
@ -392,8 +383,23 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
|
||||||
|
|
||||||
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
|
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
|
||||||
|
|
||||||
|
int8_t status = 0;
|
||||||
|
if (schJobNeedToStop(pJob, &status)) {
|
||||||
|
SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
|
||||||
|
code = atomic_load_32(&pJob->errCode);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
|
||||||
|
|
||||||
|
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || ((rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgSize > 0)) {
|
||||||
|
code = schHandleRedirect(pJob, pTask, msgType, pMsg, rspCode);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||||
|
pMsg->pData = NULL;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -405,6 +411,7 @@ _return:
|
||||||
schReleaseJob(pParam->refId);
|
schReleaseJob(pParam->refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pMsg->pData);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -569,6 +576,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||||
*fp = schHandleSubmitCallback;
|
*fp = schHandleSubmitCallback;
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY:
|
||||||
*fp = schHandleQueryCallback;
|
*fp = schHandleQueryCallback;
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DELETE:
|
case TDMT_VND_DELETE:
|
||||||
|
@ -1032,7 +1040,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
tSerializeSVDeleteReq(msg, msgSize, &req);
|
tSerializeSVDeleteReq(msg, msgSize, &req);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_QUERY: {
|
case TDMT_SCH_QUERY:
|
||||||
|
case TDMT_SCH_MERGE_QUERY: {
|
||||||
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
|
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
|
||||||
|
|
||||||
uint32_t len = strlen(pJob->sql);
|
uint32_t len = strlen(pJob->sql);
|
||||||
|
@ -1135,7 +1144,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
|
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
|
||||||
(rpcCtx.args ? &rpcCtx : NULL)));
|
(rpcCtx.args ? &rpcCtx : NULL)));
|
||||||
|
|
||||||
if (msgType == TDMT_SCH_QUERY) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
|
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue