enh: support random execution node

This commit is contained in:
dapan1121 2023-06-30 15:57:58 +08:00
parent 4d8fabfefc
commit ffd306f2b9
3 changed files with 5 additions and 4 deletions

View File

@ -111,7 +111,7 @@ int32_t tmqMaxTopicNum = 20;
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false; bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;

View File

@ -57,7 +57,7 @@ typedef enum {
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 60000000 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 5000000
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000 #define SCH_MAX_TASK_TIMEOUT_USEC 300000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6 #define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3 #define SCH_MIN_AYSNC_EXEC_NUM 3
@ -239,7 +239,7 @@ typedef struct SSchTask {
int32_t lastMsgType; // last sent msg type int32_t lastMsgType; // last sent msg type
int64_t timeoutUsec; // task timeout useconds before reschedule int64_t timeoutUsec; // task timeout useconds before reschedule
SQueryNodeAddr succeedAddr; // task executed success node address SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index int32_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo
SSchTaskProfile profile; // task execution profile SSchTaskProfile profile; // task execution profile

View File

@ -745,7 +745,6 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTask->candidateIdx = 0;
pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr)); pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->candidateAddrs) { if (NULL == pTask->candidateAddrs) {
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM); SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM);
@ -770,6 +769,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask)); SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
pTask->candidateIdx = abs(taosRand() % taosArrayGetSize(pTask->candidateAddrs));
/* /*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));