Merge pull request #14385 from taosdata/feature/queryredirect

feat: support query redirect
This commit is contained in:
dapan1121 2022-06-30 18:03:17 +08:00 committed by GitHub
commit 068924cc62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 673 additions and 372 deletions

View File

@ -1495,6 +1495,7 @@ typedef struct SSubQueryMsg {
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int32_t execId;
int8_t taskType;
int8_t explain;
uint32_t sqlLen; // the query sql,
@ -1514,6 +1515,7 @@ typedef struct {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int32_t execId;
} SQueryContinueReq;
typedef struct {
@ -1535,6 +1537,7 @@ typedef struct {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int32_t execId;
} SResFetchReq;
typedef struct {
@ -1546,6 +1549,7 @@ typedef struct {
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int32_t execId;
int8_t status;
} STaskStatus;
@ -1591,6 +1595,7 @@ typedef struct {
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int32_t execId;
} STaskCancelReq;
typedef struct {
@ -1603,6 +1608,7 @@ typedef struct {
uint64_t queryId;
uint64_t taskId;
int64_t refId;
int32_t execId;
} STaskDropReq;
typedef struct {

View File

@ -167,10 +167,6 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TABLE, "drop-table", NULL, NULL)
@ -184,12 +180,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
@ -206,6 +199,17 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_QUERY, "merge-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
@ -214,9 +218,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL)

View File

@ -347,6 +347,7 @@ typedef struct SDownstreamSourceNode {
SQueryNodeAddr addr;
uint64_t taskId;
uint64_t schedId;
int32_t execId;
} SDownstreamSourceNode;
typedef struct SExchangePhysiNode {

View File

@ -135,9 +135,11 @@ typedef struct STableMetaOutput {
} STableMetaOutput;
typedef struct SDataBuf {
int32_t msgType;
void* pData;
uint32_t len;
void* handle;
SEpSet* pEpSet;
} SDataBuf;
typedef struct STargetInfo {
@ -146,7 +148,7 @@ typedef struct STargetInfo {
int32_t vgId;
} STargetInfo;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SRequestConnInfo {
@ -234,13 +236,25 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_VND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_SCH_TIMEOUT_ERROR || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define REQUEST_TOTAL_EXEC_TIMES 2

View File

@ -86,6 +86,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0102)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0103)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0104)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0105)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -89,8 +89,10 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
}
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;
} else {
return false;

View File

@ -262,7 +262,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
return TSDB_CODE_SUCCESS;
}
static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) {
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0;
if (code != 0) {
taosMemoryFreeClear(param);

View File

@ -769,7 +769,8 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY: {
code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
break;
}
@ -1255,10 +1256,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
@ -1268,7 +1269,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
SDataBuf buf = {.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);

View File

@ -26,7 +26,7 @@ static void setErrno(SRequestObj* pRequest, int32_t code) {
terrno = code;
}
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
@ -39,7 +39,7 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pMsg->pData);
@ -116,7 +116,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
return pMsgSendInfo;
}
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
taosMemoryFree(pMsg->pData);
@ -132,7 +132,7 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
@ -211,7 +211,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
@ -229,7 +229,7 @@ int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code)
return code;
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
@ -250,7 +250,7 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
@ -356,7 +356,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return TSDB_CODE_SUCCESS;
}
int32_t processShowVariablesRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);

View File

@ -384,7 +384,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
@ -819,7 +819,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
}
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
/*tmq_t* tmq = pParam->tmq;*/
@ -1073,7 +1073,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
}
#endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
@ -1330,7 +1330,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
}
#endif
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
int8_t async = pParam->async;

View File

@ -4309,6 +4309,7 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR
if (tEncodeU64(&encoder, status->queryId) < 0) return -1;
if (tEncodeU64(&encoder, status->taskId) < 0) return -1;
if (tEncodeI64(&encoder, status->refId) < 0) return -1;
if (tEncodeI32(&encoder, status->execId) < 0) return -1;
if (tEncodeI8(&encoder, status->status) < 0) return -1;
}
} else {
@ -4339,6 +4340,7 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
if (tDecodeU64(&decoder, &status.queryId) < 0) return -1;
if (tDecodeU64(&decoder, &status.taskId) < 0) return -1;
if (tDecodeI64(&decoder, &status.refId) < 0) return -1;
if (tDecodeI32(&decoder, &status.execId) < 0) return -1;
if (tDecodeI8(&decoder, &status.status) < 0) return -1;
taosArrayPush(pRsp->taskStatus, &status);
}

View File

@ -209,10 +209,11 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -220,7 +221,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -107,14 +107,15 @@ SArray *qmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_QM_INFO, qmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by VNODE
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -324,16 +324,17 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -349,7 +350,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -41,6 +41,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
}
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
@ -86,7 +87,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return;
case TDMT_MND_STATUS_RSP:
@ -250,7 +251,10 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;
} else {
return false;

View File

@ -527,9 +527,9 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_VND_QUERY || pMsg->msgType == TDMT_VND_QUERY_CONTINUE ||
pMsg->msgType == TDMT_VND_QUERY_HEARTBEAT || pMsg->msgType == TDMT_VND_FETCH ||
pMsg->msgType == TDMT_VND_DROP_TASK) {
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
return 0;
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
@ -552,6 +552,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
pMsg->info.hasEpSet = 1;
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.rspLen = contLen;

View File

@ -19,13 +19,13 @@
#include "qworker.h"
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return 0;
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0;
SMnode *pMnode = pMsg->info.node;
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) return;
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
}
@ -37,19 +37,20 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
mTrace("msg:%p, in query queue is processing", pMsg);
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY:
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
default:
@ -67,11 +68,12 @@ int32_t mndInitQuery(SMnode *pMnode) {
return -1;
}
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
return 0;
}

View File

@ -65,7 +65,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) {
}
int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
return 0;
}
@ -78,28 +78,29 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
qTrace("message in qnode queue is processing");
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY:
code = qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
code = qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_VND_CONSUME:
// code = tqProcessConsumeReq(pQnode->pTq, pMsg);
// break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
default:

View File

@ -216,7 +216,7 @@ _err:
}
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) {
return 0;
}
@ -227,9 +227,10 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY:
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_CONTINUE:
case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
@ -243,15 +244,15 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);

View File

@ -178,7 +178,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else {

View File

@ -241,7 +241,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
}
int32_t ctgHandleMsgCallback(void *param, const SDataBuf *pMsg, int32_t rspCode) {
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
int32_t code = 0;

View File

@ -1933,7 +1933,7 @@ typedef struct SFetchRspHandleWrapper {
int32_t sourceIndex;
} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
@ -2010,13 +2010,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@ -2034,7 +2035,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->msgType = TDMT_SCH_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
@ -2145,9 +2146,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock* pRes = pExchangeInfo->pResult;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
@ -2165,17 +2166,17 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
}
@ -2249,8 +2250,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId,
pSource->taskId, tstrerror(pDataInfo->code));
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId,
pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
pOperator->pTaskInfo->code = pDataInfo->code;
return NULL;
}
@ -2258,9 +2259,9 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
" try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
@ -2276,17 +2277,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
pExchangeInfo->current += 1;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
", totalBytes:%" PRIu64,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows,
pLoadInfo->totalSize);
}
@ -2378,7 +2379,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
}
for (int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = (SNodeListNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode);
}

View File

@ -1362,7 +1362,7 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
}
static int32_t loadSysTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
SOperatorInfo* operator=(SOperatorInfo*) param;
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*)operator->info;
if (TSDB_CODE_SUCCESS == code) {

View File

@ -548,8 +548,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) {
if (msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH) {
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
return false;
}
return true;

View File

@ -582,6 +582,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr));
COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(execId);
return TSDB_CODE_SUCCESS;
}

View File

@ -3430,6 +3430,7 @@ static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) {
static const char* jkDownstreamSourceAddr = "Addr";
static const char* jkDownstreamSourceTaskId = "TaskId";
static const char* jkDownstreamSourceSchedId = "SchedId";
static const char* jkDownstreamSourceExecId = "ExecId";
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
@ -3441,6 +3442,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
}
return code;
}
@ -3455,6 +3459,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
}
return code;
}

View File

@ -6011,7 +6011,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_EXPLAIN_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = true;
pQuery->msgType = TDMT_VND_QUERY;
pQuery->msgType = TDMT_SCH_QUERY;
break;
case QUERY_NODE_DELETE_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;

View File

@ -1555,7 +1555,11 @@ static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogic
if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
} else {
pSubplan->msgType = TDMT_VND_QUERY;
if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
pSubplan->msgType = TDMT_SCH_QUERY;
} else {
pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
}
code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);

View File

@ -216,7 +216,8 @@ void destroyQueryExecRes(SQueryExecRes* pRes) {
tFreeSSubmitRsp((SSubmitRsp*)pRes->res);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY: {
taosArrayDestroy((SArray*)pRes->res);
break;
}

View File

@ -75,6 +75,7 @@ typedef struct SQWDebug {
bool lockEnable;
bool statusEnable;
bool dumpEnable;
bool tmp;
} SQWDebug;
extern SQWDebug gQWDebug;
@ -82,6 +83,7 @@ extern SQWDebug gQWDebug;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
char *msg;
int32_t msgLen;
SRpcHandleInfo connInfo;
@ -100,6 +102,7 @@ typedef struct SQWHbInfo {
typedef struct SQWPhaseInput {
int32_t code;
int32_t msgType;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
@ -119,6 +122,8 @@ typedef struct SQWTaskCtx {
int8_t phase;
int8_t taskType;
int8_t explain;
int32_t queryType;
int32_t execId;
bool queryFetched;
bool queryEnd;
@ -197,8 +202,8 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx;
} SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
#define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
@ -223,15 +228,18 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) \
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
*(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \
} while (0)
#define QW_GET_QTID(id, qId, tId) \
#define QW_GET_QTID(id, qId, tId, eId) \
do { \
(qId) = *(uint64_t *)(id); \
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
(eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \
} while (0)
#define QW_ERR_RET(c) \
@ -273,22 +281,22 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_ELOG_E(param) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_TASK_WLOG_E(param) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_TASK_DLOG_E(param) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, tId, eId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
do { \
@ -362,6 +370,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
#ifdef __cplusplus

View File

@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);

View File

@ -9,7 +9,7 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false};
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false};
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
if (!gQWDebug.statusEnable) {
@ -121,3 +121,90 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
}
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
int32_t contLen = 0;
char* rsp = NULL;
if (pEpSet) {
contLen = tSerializeSEpSet(NULL, 0, pEpSet);
rsp = rpcMallocCont(contLen);
tSerializeSEpSet(rsp, contLen, pEpSet);
}
SRpcMsg rpcRsp = {
.msgType = rspType,
.pCont = rsp,
.contLen = contLen,
.code = code,
.info = *pConn,
};
rpcRsp.info.hasEpSet = 1;
tmsgSendRsp(&rpcRsp);
qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code));
return TSDB_CODE_SUCCESS;
}
int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
if (gQWDebug.tmp) {
if (TDMT_SCH_QUERY == qwMsg->msgType) {
SEpSet epSet = {0};
epSet.inUse = 1;
epSet.numOfEps = 3;
strcpy(epSet.eps[0].fqdn, "localhost");
epSet.eps[0].port = 7100;
strcpy(epSet.eps[1].fqdn, "localhost");
epSet.eps[1].port = 7200;
strcpy(epSet.eps[2].fqdn, "localhost");
epSet.eps[2].port = 7300;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
ctx->phase = QW_PHASE_POST_QUERY;
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
gQWDebug.tmp = false;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qwDbgEnableDebug(char *option) {
if (0 == strcasecmp(option, "lock")) {
gQWDebug.lockEnable = true;
qDebug("qw lock debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "status")) {
gQWDebug.statusEnable = true;
qDebug("qw status debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "dump")) {
gQWDebug.dumpEnable = true;
qDebug("qw dump debug enabled");
return TSDB_CODE_SUCCESS;
}
if (0 == strcasecmp(option, "tmp")) {
gQWDebug.tmp = true;
qDebug("qw tmp debug enabled");
return TSDB_CODE_SUCCESS;
}
qError("invalid qw debug option:%s", option);
return TSDB_CODE_APP_ERROR;
}

View File

@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
if (tbInfo) {
@ -53,7 +53,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code, STbVerInfo*
}
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.msgType = rspType,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
@ -73,7 +73,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
tSerializeSExplainRsp(pRsp, contLen, &rsp);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP,
.msgType = TDMT_SCH_EXPLAIN_RSP,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
@ -92,7 +92,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
.contLen = contLen,
.pCont = pRsp,
.code = code,
@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
}
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.msgType = TDMT_SCH_FETCH_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
@ -129,7 +129,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.msgType = TDMT_SCH_CANCEL_TASK_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
@ -145,7 +145,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
pRsp->code = code;
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.msgType = TDMT_SCH_DROP_TASK_RSP,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
@ -156,6 +156,41 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
req->refId = rId;
req->execId = eId;
SRpcMsg pNewMsg = {
.msgType = TDMT_SCH_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = 0,
.info = *pConn,
};
int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
rpcFreeCont(req);
QW_ERR_RET(code);
}
QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
@ -167,9 +202,10 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
req->execId = eId;
SRpcMsg pNewMsg = {
.msgType = TDMT_VND_QUERY_CONTINUE,
.msgType = TDMT_SCH_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
@ -202,10 +238,10 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->refId = htobe64(rId);
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_DROP_TASK,
.msgType = TDMT_SCH_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
.code = TSDB_CODE_RPC_BROKEN_LINK,
.info = *pConn,
};
@ -236,10 +272,10 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
}
SRpcMsg brokenMsg = {
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.msgType = TDMT_SCH_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
.code = TSDB_CODE_RPC_BROKEN_LINK,
.info = *pConn,
};
@ -266,6 +302,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
msg->execId = ntohl(msg->execId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
@ -273,6 +310,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
@ -295,6 +333,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
qwAbortPrerocessQuery(QW_FPARAMS());
@ -324,10 +363,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
@ -356,6 +396,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = 0;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
@ -387,11 +428,13 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->execId = ntohl(msg->execId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = 0;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
@ -437,11 +480,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
msg->execId = ntohl(msg->execId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
@ -476,15 +521,17 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->refId = be64toh(msg->refId);
msg->execId = ntohl(msg->execId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
}
@ -522,7 +569,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
}
@ -553,6 +600,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR
uint64_t qId = req.queryId;
uint64_t tId = req.taskId;
int64_t rId = 0;
int32_t eId = -1;
SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);

View File

@ -135,8 +135,8 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchS
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
@ -151,8 +151,8 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
SQWTaskStatus ntask = {0};
ntask.status = status;
@ -207,8 +207,8 @@ int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
@ -220,8 +220,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
}
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
@ -233,8 +233,8 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
}
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
SQWTaskCtx nctx = {0};
@ -314,8 +314,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
}
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
SQWTaskCtx octx;
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
@ -348,8 +348,8 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
SQWTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, tId, eId);
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
QW_TASK_WLOG_E("scheduler does not exist");

View File

@ -168,7 +168,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
// TODO GET EXECUTOR API TO GET MORE INFO
QW_GET_QTID(key, status.queryId, status.taskId);
QW_GET_QTID(key, status.queryId, status.taskId, status.execId);
status.status = taskStatus->status;
status.refId = taskStatus->refId;
@ -417,17 +417,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
}
if (QW_PHASE_POST_QUERY == phase) {
#if 0
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = &ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
#endif
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
@ -458,8 +451,8 @@ _return:
}
if (rspConnection) {
qwBuildAndSendQueryRsp(rspConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx ? &ctx->tbInfo : NULL);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
}
if (ctx) {
@ -500,7 +493,9 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
ctx->ctrlConnInfo = qwMsg->connInfo;
@ -530,8 +525,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_8(&ctx->explain, explain);
ctx->taskType = taskType;
ctx->explain = explain;
ctx->queryType = qwMsg->msgType;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
@ -571,6 +567,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
_return:
input.code = code;
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
// if (!queryRsped) {
@ -739,8 +736,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx *ctx = NULL;
bool locked = false;
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);

View File

@ -122,7 +122,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.phyLen = htonl(100);
qwtqueryMsg.sqlLen = 0;
queryRpc->msgType = TDMT_VND_QUERY;
queryRpc->msgType = TDMT_SCH_QUERY;
queryRpc->pCont = &qwtqueryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}
@ -131,7 +131,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
fetchMsg->taskId = htobe64(1);
fetchRpc->msgType = TDMT_VND_FETCH;
fetchRpc->msgType = TDMT_SCH_FETCH;
fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq);
}
@ -140,7 +140,7 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId));
dropMsg->taskId = htobe64(1);
dropRpc->msgType = TDMT_VND_DROP_TASK;
dropRpc->msgType = TDMT_SCH_DROP_TASK;
dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq);
}
@ -202,7 +202,8 @@ void qwtSendReqToDnode(void* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq)
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
switch (pRsp->msgType) {
case TDMT_VND_QUERY_RSP: {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (pRsp->code) {
@ -213,7 +214,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
rpcFreeCont(rsp);
break;
}
case TDMT_VND_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
if (0 == pRsp->code && 0 == rsp->completed) {
@ -229,7 +230,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
break;
}
case TDMT_VND_DROP_TASK_RSP: {
case TDMT_SCH_DROP_TASK_RSP: {
STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont;
rpcFreeCont(rsp);
@ -756,9 +757,9 @@ void *queryQueueThread(void *param) {
}
}
if (TDMT_VND_QUERY == queryRpc->msgType) {
if (TDMT_SCH_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
@ -813,13 +814,13 @@ void *fetchQueueThread(void *param) {
}
switch (fetchRpc->msgType) {
case TDMT_VND_FETCH:
case TDMT_SCH_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_CANCEL_TASK:
case TDMT_SCH_CANCEL_TASK:
qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
break;
default:

View File

@ -125,7 +125,7 @@ typedef struct SSchTaskCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
int32_t execIdx;
int32_t execId;
void *pTrans;
} SSchTaskCallbackParam;
@ -171,7 +171,7 @@ typedef struct SSchTask {
uint64_t taskId; // task id
SRWLatch lock; // task lock
int32_t maxExecTimes; // task may exec times
int32_t execIdx; // task current execute try index
int32_t execId; // task current execute try index
SSchLevel *level; // level
SRWLatch planLock; // task update plan lock
SSubplan *plan; // subplan
@ -244,9 +244,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execIdx) { \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
@ -254,7 +254,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
@ -262,12 +262,12 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
@ -275,6 +275,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
@ -307,6 +308,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (((_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_BROKEN_LINK) && ((_len) > 0))
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
@ -317,13 +320,13 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
@ -354,7 +357,7 @@ void schFreeJobImpl(void *job);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code);
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
@ -365,7 +368,7 @@ void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync);
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
@ -377,13 +380,15 @@ int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
#ifdef __cplusplus

View File

@ -28,7 +28,7 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->execIdx = -1;
pTask->execId = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
@ -429,59 +429,59 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) {
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
if (taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) {
if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task execNode added, execIdx:%d", execIdx);
SCH_TASK_DLOG("task execNode added, execId:%d", execId);
return TSDB_CODE_SUCCESS;
}
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
if (NULL == pTask->execNodes) {
return TSDB_CODE_SUCCESS;
}
if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) {
SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx);
if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId);
} else {
SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx);
SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
}
if (execIdx != pTask->execIdx) { // ignore it
SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx);
if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx);
SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) {
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx));
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
}
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pJob, pTask, handle, execIdx);
schUpdateTaskExecNode(pJob, pTask, handle, execId);
return TSDB_CODE_SUCCESS;
}
@ -683,6 +683,25 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) {
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0));
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
}
SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0);
SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
SEp* pNew = &pEpSet->eps[pEpSet->inUse];
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
return TSDB_CODE_SUCCESS;
}
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) {
@ -810,9 +829,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
}
if ((pTask->execIdx + 1) >= pTask->maxExecTimes) {
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx);
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
return TSDB_CODE_SUCCESS;
}
@ -822,11 +841,10 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS;
}
// TODO CHECK epList/condidateList
if (SCH_IS_DATA_SRC_TASK(pTask)) {
if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execIdx:%d, epNum:%d", pTask->execIdx,
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS;
}
@ -842,7 +860,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
*needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execIdx + 1, errCode, tstrerror(errCode));
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
}
@ -854,7 +872,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schDecTaskFlowQuota(pJob, pTask));
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
@ -1155,6 +1172,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
.execId = pTask->execId,
.addr = pTask->succeedAddr};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock);
@ -1184,7 +1202,7 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
return TSDB_CODE_SUCCESS;
@ -1223,7 +1241,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
while (nodeInfo) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
@ -1238,8 +1256,9 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_LOCK_TASK(pTask);
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXECUTING == pTask->status &&
pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
@ -1258,6 +1277,9 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
qDebug("QID:%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
@ -1266,8 +1288,6 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
continue;
}
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status));
pTask = NULL;
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
if (NULL == pTask) {
@ -1276,13 +1296,20 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
continue;
}
if (taskStatus->execId != pTask->execId) {
// TODO DROP TASK FROM SERVER!!!!
SCH_TASK_DLOG("EID %d in hb rsp mis-match", taskStatus->execId);
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_FAILED) {
// RECORD AND HANDLE ERROR!!!!
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) {
if (taskStatus->status == JOB_TASK_STATUS_NOT_START) {
schRescheduleTask(pJob, pTask);
}
@ -1308,7 +1335,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) {
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
pJob->execRes.msgType = TDMT_VND_QUERY;
pJob->execRes.msgType = TDMT_SCH_QUERY;
}
return TSDB_CODE_SUCCESS;
@ -1345,9 +1372,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execIdx++;
pTask->execId++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx);
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId);
SCH_LOG_TASK_START_TS(pTask);
@ -1658,5 +1685,94 @@ _return:
SCH_RET(code);
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("redirect will no continue cause of job status %s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schProcessOnJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
schDeregisterTaskHb(pJob, pTask);
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
taosMemoryFreeClear(pTask->msg);
pTask->msgLen = 0;
pTask->lastMsgType = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
if (JOB_TASK_STATUS_EXECUTING == SCH_GET_TASK_STATUS(pTask)) {
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
// merge plan
pTask->childReady = 0;
qClearSubplanExecutionNode(pTask->plan);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
int32_t childrenNum = taosArrayGetSize(pTask->children);
for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask* pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild);
schDoTaskRedirect(pJob, pChild, NULL, rspCode);
SCH_UNLOCK_TASK(pChild);
}
return TSDB_CODE_SUCCESS;
_return:
code = schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
}
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode);
}
}
SCH_RET(schDoTaskRedirect(pJob, pTask, pData, rspCode));
_return:
schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
}

View File

@ -28,9 +28,10 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
int32_t reqMsgType = msgType - 1;
switch (msgType) {
case TDMT_SCH_LINK_BROKEN:
case TDMT_VND_EXPLAIN_RSP:
case TDMT_SCH_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS;
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
@ -43,7 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_VND_FETCH_RSP:
case TDMT_SCH_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
@ -90,15 +91,6 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
taosMemoryFreeClear(msg);
SCH_RET(atomic_load_32(&pJob->errCode));
}
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
@ -247,7 +239,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_QUERY_RSP: {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
SCH_ERR_JRET(rspCode);
@ -264,7 +257,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_EXPLAIN_RSP: {
case TDMT_SCH_EXPLAIN_RSP: {
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
@ -294,7 +287,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
break;
}
case TDMT_VND_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode);
@ -342,8 +335,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
schProcessOnDataFetched(pJob);
break;
}
case TDMT_VND_DROP_TASK_RSP: {
// SHOULD NEVER REACH HERE
case TDMT_SCH_DROP_TASK_RSP: {
// NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
@ -367,8 +360,9 @@ _return:
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
@ -385,15 +379,30 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
if (pParam->execIdx != pTask->execIdx) {
SCH_TASK_DLOG("execIdx %d mis-match current execIdx %d", pParam->execIdx, pTask->execIdx);
if (pParam->execId != pTask->execId) {
SCH_TASK_DLOG("execId %d mis-match current execId %d", pParam->execId, pTask->execId);
goto _return;
}
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL);
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx));
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execId));
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
code = atomic_load_32(&pJob->errCode);
goto _return;
}
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
if (NEED_SCHEDULER_REDIRECT_ERROR(rspCode) || SCH_SUB_TASK_NETWORK_ERR(rspCode, pMsg->len > 0)) {
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
goto _return;
}
code = schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode);
pMsg->pData = NULL;
_return:
@ -405,50 +414,19 @@ _return:
schReleaseJob(pParam->refId);
}
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param);
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleDropTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
}
int32_t schHandleAlterTbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_ALTER_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code);
}
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
}
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code);
taosMemoryFreeClear(param);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
@ -461,7 +439,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
} else {
SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code));
SCH_ERR_RET(schHandleCallback(param, pMsg, code));
}
return TSDB_CODE_SUCCESS;
@ -480,7 +458,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
param->refId = pJob->refId;
param->taskId = SCH_TASK_ID(pTask);
param->pTrans = pJob->conn.pTrans;
param->execIdx = pTask->execIdx;
param->execId = pTask->execId;
*pParam = param;
return TSDB_CODE_SUCCESS;
@ -557,33 +535,20 @@ _return:
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTbCallback;
break;
case TDMT_VND_DROP_TABLE:
*fp = schHandleDropTbCallback;
break;
case TDMT_VND_ALTER_TABLE:
*fp = schHandleAlterTbCallback;
break;
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
case TDMT_VND_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY:
case TDMT_VND_DELETE:
*fp = schHandleDeleteCallback;
case TDMT_SCH_EXPLAIN:
case TDMT_SCH_FETCH:
*fp = schHandleCallback;
break;
case TDMT_VND_EXPLAIN:
*fp = schHandleExplainCallback;
break;
case TDMT_VND_FETCH:
*fp = schHandleFetchCallback;
break;
case TDMT_VND_DROP_TASK:
case TDMT_SCH_DROP_TASK:
*fp = schHandleDropCallback;
break;
case TDMT_VND_QUERY_HEARTBEAT:
case TDMT_SCH_QUERY_HEARTBEAT:
*fp = schHandleHbCallback;
break;
case TDMT_SCH_LINK_BROKEN:
@ -692,9 +657,9 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT_RSP;
int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP;
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp));
SCH_ERR_JRET(schGetCallbackFp(TDMT_SCH_QUERY_HEARTBEAT, &fp));
param->nodeEpId = epId;
param->pTrans = pJob->conn.pTrans;
@ -722,7 +687,7 @@ _return:
SCH_RET(code);
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
@ -784,9 +749,9 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
}
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_VND_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, NULL, 0, TDMT_SCH_EXPLAIN, &trans, false, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_EXPLAIN_RSP;
int32_t msgType = TDMT_SCH_EXPLAIN_RSP;
SRpcCtxVal ctxVal = {.val = pExplainMsgSendInfo, .clone = schCloneSMsgSendInfo};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
@ -882,7 +847,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SEpSet *epSet = &addr->epSet;
SMsgSendInfo *pMsgSendInfo = NULL;
bool isHb = (TDMT_VND_QUERY_HEARTBEAT == msgType);
bool isHb = (TDMT_SCH_QUERY_HEARTBEAT == msgType);
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
@ -926,7 +891,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
int32_t code = 0;
SRpcCtx rpcCtx = {0};
SSchTrans trans = {0};
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT;
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
@ -1032,7 +997,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
tSerializeSVDeleteReq(msg, msgSize, &req);
break;
}
case TDMT_VND_QUERY: {
case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY: {
SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx));
uint32_t len = strlen(pJob->sql);
@ -1049,6 +1015,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
pMsg->execId = htonl(pTask->execId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->phyLen = htonl(pTask->msgLen);
@ -1060,7 +1027,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true;
break;
}
case TDMT_VND_FETCH: {
case TDMT_SCH_FETCH: {
msgSize = sizeof(SResFetchReq);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
@ -1075,10 +1042,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->execId = htonl(pTask->execId);
break;
}
case TDMT_VND_DROP_TASK: {
case TDMT_SCH_DROP_TASK: {
msgSize = sizeof(STaskDropReq);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
@ -1094,9 +1062,10 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
pMsg->execId = htonl(pTask->execId);
break;
}
case TDMT_VND_QUERY_HEARTBEAT: {
case TDMT_SCH_QUERY_HEARTBEAT: {
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
SSchedulerHbReq req = {0};
@ -1135,8 +1104,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle,
(rpcCtx.args ? &rpcCtx : NULL)));
if (msgType == TDMT_VND_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx));
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
return TSDB_CODE_SUCCESS;

View File

@ -118,7 +118,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan->msgType = TDMT_VND_QUERY;
scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId;
mergePlan->id.groupId = schtMergeTemplateId;
@ -130,7 +130,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pChildren = nodesMakeList();
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList();
@ -181,7 +181,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_VND_QUERY;
scanPlan[i].msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i));
@ -198,7 +198,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
@ -412,7 +412,7 @@ void *schtCreateFetchRspThread(void *param) {
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
schReleaseJob(job);
@ -445,7 +445,7 @@ void *schtFetchRspThread(void *aa) {
dataBuf.pData = rsp;
dataBuf.len = sizeof(*rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0);
assert(code == 0 || code);
}
@ -547,7 +547,7 @@ void* schtRunJobThread(void *aa) {
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
@ -566,7 +566,7 @@ void* schtRunJobThread(void *aa) {
dataBuf.pData = &rsp;
dataBuf.len = sizeof(rsp);
code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0);
code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0);
assert(code == 0 || code);
pIter = taosHashIterate(execTasks, pIter);
@ -677,7 +677,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -688,7 +688,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -780,7 +780,7 @@ TEST(queryTest, readyFirstCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -791,7 +791,7 @@ TEST(queryTest, readyFirstCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
@ -896,9 +896,9 @@ TEST(queryTest, flowCtrlCase) {
taosHashCancelIterate(pJob->execTasks, pIter);
if (task->lastMsgType == TDMT_VND_QUERY) {
if (task->lastMsgType == TDMT_SCH_QUERY) {
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
} else {

View File

@ -329,7 +329,7 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle);
@ -400,7 +400,7 @@ void cliHandleExcept(SCliConn* pConn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL;
@ -1023,19 +1023,28 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
}
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if (pResp == NULL || pResp->info.hasEpSet == 0) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false;
}
// rebuild resp msg
SEpSet epset;
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epset) < 0) {
return false;
}
tDeserializeSEpSet(pResp->pCont, pResp->contLen, dst);
int32_t tlen = tSerializeSEpSet(NULL, 0, dst);
int32_t bufLen = pResp->contLen - tlen;
char* buf = rpcMallocCont(bufLen);
memcpy(buf, (char*)pResp->pCont + tlen, bufLen);
char* buf = NULL;
int32_t len = pResp->contLen - tlen;
if (len != 0) {
buf = rpcMallocCont(len);
memcpy(buf, (char*)pResp->pCont + tlen, len);
}
rpcFreeCont(pResp->pCont);
pResp->pCont = buf;
pResp->contLen = bufLen;
pResp->contLen = len;
*dst = epset;
return true;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
@ -1054,7 +1063,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
if (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) {
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
@ -1083,11 +1093,13 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId;
if (cliTryToExtractEpSet(pResp, &pCtx->epSet)) {
bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
if (pCtx->pSem != NULL) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) {
@ -1099,12 +1111,16 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->pRsp = NULL;
} else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (retry == false && hasEpSet == true) {
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
} else {
if (!cliIsEpsetUpdated(code, pCtx)) {
pTransInst->cfp(pTransInst->parent, pResp, NULL);
} else {
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
}
}
}
return 0;
}

View File

@ -139,8 +139,6 @@ static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL};
static void uvDestroyConn(uv_handle_t* handle);
// server and worker thread
@ -1116,4 +1114,6 @@ _return2:
rpcFreeCont(msg->pCont);
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
#endif

View File

@ -89,8 +89,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")