more code format

This commit is contained in:
Hongze Cheng 2022-10-13 16:58:43 +08:00
parent 260ca17337
commit 5f010843a6
12 changed files with 732 additions and 743 deletions

View File

@ -101,8 +101,8 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0; int32_t code = 0;
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockIndex[0] == pIter->iSttBlk) { if (pInfo->blockIndex[0] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) { if (pInfo->currentLoadBlockIndex != 0) {
tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s", tsdbDebug("current load index is set to 0, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr); pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
@ -113,7 +113,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
if (pInfo->blockIndex[1] == pIter->iSttBlk) { if (pInfo->blockIndex[1] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 1) { if (pInfo->currentLoadBlockIndex != 1) {
tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%"PRIu64", load data, %s", tsdbDebug("current load index is set to 1, block index:%d, file index:%d, due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr); pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 1; pInfo->currentLoadBlockIndex = 1;
} }
@ -140,8 +140,10 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->elapsedTime += el; pInfo->elapsedTime += el;
pInfo->loadBlocks += 1; pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%"PRIu64", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s", tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, idStr); ", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _exit; goto _exit;
} }
@ -336,7 +338,7 @@ _exit:
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); } void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
void tLDataIterNextBlock(SLDataIter *pIter, const char* idStr) { void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1; int32_t step = pIter->backward ? -1 : 1;
int32_t oldIndex = pIter->iSttBlk; int32_t oldIndex = pIter->iSttBlk;
@ -386,10 +388,10 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char* idStr) {
if (index != -1) { if (index != -1) {
pIter->iSttBlk = index; pIter->iSttBlk = index;
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk); pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
tsdbDebug("try next last file block:%d from %d, trigger by uid:%"PRIu64", file index:%d, %s", pIter->iSttBlk, oldIndex, pIter->uid, pIter->iStt, tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
idStr); oldIndex, pIter->uid, pIter->iStt, idStr);
} else { } else {
tsdbDebug("no more last block qualified, uid:%"PRIu64", file index::%d, %s", pIter->uid, oldIndex, idStr); tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr);
} }
} }

View File

@ -1645,7 +1645,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%"PRIu64", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
// only last block exists // only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {

View File

@ -20,13 +20,13 @@
extern "C" { extern "C" {
#endif #endif
#include "command.h"
#include "os.h" #include "os.h"
#include "tarray.h"
#include "planner.h" #include "planner.h"
#include "scheduler.h" #include "scheduler.h"
#include "tarray.h"
#include "thash.h" #include "thash.h"
#include "trpc.h" #include "trpc.h"
#include "command.h"
enum { enum {
SCH_READ = 1, SCH_READ = 1,
@ -51,20 +51,20 @@ typedef enum {
SCH_ALL, SCH_ALL,
} SCH_POLICY; } SCH_POLICY;
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_DEFAULT_MAX_RETRY_NUM 6 #define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3 #define SCH_MIN_AYSNC_EXEC_NUM 3
typedef struct SSchDebug { typedef struct SSchDebug {
bool lockEnable; bool lockEnable;
bool apiEnable; bool apiEnable;
} SSchDebug; } SSchDebug;
typedef struct SSchTrans { typedef struct SSchTrans {
@ -80,7 +80,6 @@ typedef struct SSchHbTrans {
} SSchHbTrans; } SSchHbTrans;
typedef struct SSchApiStat { typedef struct SSchApiStat {
#if defined(WINDOWS) || defined(_TD_DARWIN_64) #if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors; size_t avoidCompilationErrors;
#endif #endif
@ -88,7 +87,6 @@ typedef struct SSchApiStat {
} SSchApiStat; } SSchApiStat;
typedef struct SSchRuntimeStat { typedef struct SSchRuntimeStat {
#if defined(WINDOWS) || defined(_TD_DARWIN_64) #if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors; size_t avoidCompilationErrors;
#endif #endif
@ -96,7 +94,6 @@ typedef struct SSchRuntimeStat {
} SSchRuntimeStat; } SSchRuntimeStat;
typedef struct SSchJobStat { typedef struct SSchJobStat {
#if defined(WINDOWS) || defined(_TD_DARWIN_64) #if defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t avoidCompilationErrors; size_t avoidCompilationErrors;
#endif #endif
@ -104,17 +101,17 @@ typedef struct SSchJobStat {
} SSchJobStat; } SSchJobStat;
typedef struct SSchStat { typedef struct SSchStat {
SSchApiStat api; SSchApiStat api;
SSchRuntimeStat runtime; SSchRuntimeStat runtime;
SSchJobStat job; SSchJobStat job;
} SSchStat; } SSchStat;
typedef struct SSchResInfo { typedef struct SSchResInfo {
SExecResult* execRes; SExecResult *execRes;
void** fetchRes; void **fetchRes;
schedulerExecFp execFp; schedulerExecFp execFp;
schedulerFetchFp fetchFp; schedulerFetchFp fetchFp;
void* cbParam; void *cbParam;
} SSchResInfo; } SSchResInfo;
typedef struct SSchOpEvent { typedef struct SSchOpEvent {
@ -123,9 +120,9 @@ typedef struct SSchOpEvent {
SSchedulerReq *pReq; SSchedulerReq *pReq;
} SSchOpEvent; } SSchOpEvent;
typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusEnterFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusLeaveFp)(void *pHandle, void *pParam);
typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent); typedef int32_t (*schStatusEventFp)(void *pHandle, void *pParam, void *pEvent);
typedef struct SSchStatusFps { typedef struct SSchStatusFps {
EJobTaskType status; EJobTaskType status;
@ -142,16 +139,16 @@ typedef struct SSchedulerCfg {
} SSchedulerCfg; } SSchedulerCfg;
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId uint64_t sId; // schedulerId
SSchedulerCfg cfg; SSchedulerCfg cfg;
bool exit; bool exit;
int32_t jobRef; int32_t jobRef;
int32_t jobNum; int32_t jobNum;
SSchStat stat; SSchStat stat;
SRWLatch hbLock; SRWLatch hbLock;
SHashObj *hbConnections; SHashObj *hbConnections;
void *queryMgmt; void *queryMgmt;
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SSchCallbackParamHeader { typedef struct SSchCallbackParamHeader {
@ -174,11 +171,11 @@ typedef struct SSchHbCallbackParam {
} SSchHbCallbackParam; } SSchHbCallbackParam;
typedef struct SSchFlowControl { typedef struct SSchFlowControl {
SRWLatch lock; SRWLatch lock;
bool sorted; bool sorted;
int32_t tableNumSum; int32_t tableNumSum;
uint32_t execTaskNum; uint32_t execTaskNum;
SArray *taskList; // Element is SSchTask* SArray *taskList; // Element is SSchTask*
} SSchFlowControl; } SSchFlowControl;
typedef struct SSchNodeInfo { typedef struct SSchNodeInfo {
@ -187,50 +184,50 @@ typedef struct SSchNodeInfo {
} SSchNodeInfo; } SSchNodeInfo;
typedef struct SSchLevel { typedef struct SSchLevel {
int32_t level; int32_t level;
int8_t status; int8_t status;
SRWLatch lock; SRWLatch lock;
int32_t taskFailed; int32_t taskFailed;
int32_t taskSucceed; int32_t taskSucceed;
int32_t taskNum; int32_t taskNum;
int32_t taskLaunchedNum; int32_t taskLaunchedNum;
int32_t taskDoneNum; int32_t taskDoneNum;
SArray *subTasks; // Element is SSchTask SArray *subTasks; // Element is SSchTask
} SSchLevel; } SSchLevel;
typedef struct SSchTaskProfile { typedef struct SSchTaskProfile {
int64_t startTs; int64_t startTs;
SArray* execTime; SArray *execTime;
int64_t waitTime; int64_t waitTime;
int64_t endTs; int64_t endTs;
} SSchTaskProfile; } SSchTaskProfile;
typedef struct SSchTask { typedef struct SSchTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SRWLatch lock; // task reentrant lock SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task max exec times int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times int32_t maxRetryTimes; // task max retry times
int32_t retryTimes; // task retry times int32_t retryTimes; // task retry times
bool waitRetry; // wait for retry bool waitRetry; // wait for retry
int32_t execId; // task current execute index int32_t execId; // task current execute index
SSchLevel *level; // level SSchLevel *level; // level
SRWLatch planLock; // task update plan lock SRWLatch planLock; // task update plan lock
SSubplan *plan; // subplan SSubplan *plan; // subplan
char *msg; // operator tree char *msg; // operator tree
int32_t msgLen; // msg length int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
int32_t lastMsgType; // last sent msg type int32_t lastMsgType; // last sent msg type
int64_t timeoutUsec; // task timeout useconds before reschedule int64_t timeoutUsec; // task timeout useconds before reschedule
SQueryNodeAddr succeedAddr; // task executed success node address SQueryNodeAddr succeedAddr; // task executed success node address
int8_t candidateIdx; // current try condidation index int8_t candidateIdx; // current try condidation index
SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr
SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo
SSchTaskProfile profile; // task execution profile SSchTaskProfile profile; // task execution profile
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle void *handle; // task send handle
bool registerdHb; // registered in hb bool registerdHb; // registered in hb
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
@ -243,46 +240,46 @@ typedef struct SSchJobAttr {
} SSchJobAttr; } SSchJobAttr;
typedef struct { typedef struct {
int32_t op; int32_t op;
SRWLatch lock; SRWLatch lock;
bool syncReq; bool syncReq;
} SSchOpStatus; } SSchOpStatus;
typedef struct SSchJob { typedef struct SSchJob {
int64_t refId; int64_t refId;
uint64_t queryId; uint64_t queryId;
SSchJobAttr attr; SSchJobAttr attr;
int32_t levelNum; int32_t levelNum;
int32_t taskNum; int32_t taskNum;
SRequestConnInfo conn; SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad> SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel> SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag; SQueryPlan *pDag;
int64_t allocatorRefId; int64_t allocatorRefId;
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SHashObj *taskList; SHashObj *taskList;
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx; SExplainCtx *explainCtx;
int8_t status; int8_t status;
SQueryNodeAddr resNode; SQueryNodeAddr resNode;
tsem_t rspSem; tsem_t rspSem;
SSchOpStatus opStatus; SSchOpStatus opStatus;
schedulerChkKillFp chkKillFp; schedulerChkKillFp chkKillFp;
void* chkKillParam; void *chkKillParam;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
SRWLatch resLock; SRWLatch resLock;
SExecResult execRes; SExecResult execRes;
void *fetchRes; //TODO free it or not void *fetchRes; // TODO free it or not
bool fetched; bool fetched;
int32_t resNumOfRows; int32_t resNumOfRows;
SSchResInfo userRes; SSchResInfo userRes;
char *sql; char *sql;
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
@ -294,226 +291,282 @@ typedef struct SSchTaskCtx {
extern SSchedulerMgmt schMgmt; extern SSchedulerMgmt schMgmt;
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec) #define SCH_TASK_TIMEOUT(_task) \
((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock) #define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock) #define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_DATA_BIND_TASK(task) \
(((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task))) #define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL) #define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle)) #define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job)) #define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq) #define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq)) #define SCH_JOB_IN_ASYNC_EXEC_OP(job) \
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq)) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && \
(!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) \
((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && \
(!(job)->opStatus.syncReq))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) \
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) do { \
#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob) if ((type) != SUBPLAN_TYPE_MODIFY) { \
(_job)->attr.queryJob = true; \
} else { \
(_job)->attr.insertJob = true; \
} \
} while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob)
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) #define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)))) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)))) #define SCH_REDIRECT_MSGTYPE(_msgType) \
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps) #define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps) #define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
#define SCH_LOG_TASK_START_TS(_task) \ #define SCH_LOG_TASK_START_TS(_task) \
do { \ do { \
int64_t us = taosGetTimestampUs(); \ int64_t us = taosGetTimestampUs(); \
taosArrayPush((_task)->profile.execTime, &us); \ taosArrayPush((_task)->profile.execTime, &us); \
if (0 == (_task)->execId) { \ if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \ (_task)->profile.startTs = us; \
} \ } \
} while (0) } while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \ #define SCH_LOG_TASK_WAIT_TS(_task) \
do { \ do { \
int64_t us = taosGetTimestampUs(); \ int64_t us = taosGetTimestampUs(); \
(_task)->profile.waitTime += us - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \ (_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
} while (0) } while (0)
#define SCH_LOG_TASK_END_TS(_task) \
#define SCH_LOG_TASK_END_TS(_task) \ do { \
do { \ int64_t us = taosGetTimestampUs(); \
int64_t us = taosGetTimestampUs(); \ int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \ int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
*startts = us - *startts; \ *startts = us - *startts; \
(_task)->profile.endTs = us; \ (_task)->profile.endTs = us; \
} while (0) } while (0)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \ #define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
#define SCH_TASK_DLOG(param, ...) \ __VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \
#define SCH_TASK_TLOG(param, ...) \ qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \ #define SCH_TASK_TLOG(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
#define SCH_TASK_WLOG(param, ...) \ __VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
__VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), \
__VA_ARGS__)
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0) #define SCH_SET_ERRNO(_err) \
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0) do { \
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { \
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0) terrno = (_err); \
} \
} while (0)
#define SCH_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
SCH_SET_ERRNO(_code); \
return _code; \
} \
} while (0)
#define SCH_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
SCH_SET_ERRNO(_code); \
} \
return _code; \
} while (0)
#define SCH_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
SCH_SET_ERRNO(code); \
goto _return; \
} \
} while (0)
#define SCH_LOCK_DEBUG(...) do { if (gSCHDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0) #define SCH_LOCK_DEBUG(...) \
do { \
if (gSCHDebug.lockEnable) { \
qDebug(__VA_ARGS__); \
} \
} while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define SCH_LOCK(type, _lock) do { \ #define SCH_LOCK(type, _lock) \
if (SCH_READ == (type)) { \ do { \
assert(atomic_load_32(_lock) >= 0); \ if (SCH_READ == (type)) { \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32(_lock) >= 0); \
taosRLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \
assert(atomic_load_32(_lock) > 0); \ SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} else { \ assert(atomic_load_32(_lock) > 0); \
assert(atomic_load_32(_lock) >= 0); \ } else { \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32(_lock) >= 0); \
taosWLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \
assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY); \ SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY); \
} while (0) } \
} while (0)
#define SCH_UNLOCK(type, _lock) do { \ #define SCH_UNLOCK(type, _lock) \
if (SCH_READ == (type)) { \ do { \
assert(atomic_load_32((_lock)) > 0); \ if (SCH_READ == (type)) { \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) > 0); \
taosRUnLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \
assert(atomic_load_32((_lock)) >= 0); \ SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} else { \ assert(atomic_load_32((_lock)) >= 0); \
assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \ } else { \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY); \
taosWUnLockLatch(_lock); \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \
assert(atomic_load_32((_lock)) >= 0); \ SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ assert(atomic_load_32((_lock)) >= 0); \
} while (0) } \
} while (0)
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans);
void schCleanClusterHb(void* pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId); SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId); int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchJob *pJob); void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask); int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchFetchTask(SSchJob *pJob); int32_t schLaunchFetchTask(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst); int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job); void schFreeJobImpl(void *job);
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx); int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx);
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask); int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask);
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans); int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans);
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code); int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code);
void schFreeRpcCtx(SRpcCtx *pCtx); void schFreeRpcCtx(SRpcCtx *pCtx);
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp); int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp);
bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus);
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask); int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask);
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp);
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp);
void schProcessOnDataFetched(SSchJob *job); void schProcessOnDataFetched(SSchJob *job);
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask);
void schFreeRpcCtxVal(const void *arg); void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId); int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync); int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus); int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob); int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void); uint64_t schGenTaskId(void);
void schCloseJobRef(void); void schCloseJobRef(void);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schJobFetchRows(SSchJob *pJob); int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
char* schGetOpStr(SCH_OP_TYPE type); char *schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes); int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode); int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode); void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq); int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode); void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId); int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId);
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask); void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool schJobDone(SSchJob *pJob); bool schJobDone(SSchJob *pJob);
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask); int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param); int32_t schSwitchJobStatus(SSchJob *pJob, int32_t status, void *param);
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq); int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob **job, SCH_OP_TYPE type, SSchedulerReq *pReq);
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode); int32_t schHandleOpEndEvent(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask); int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode); void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry); int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode); int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob);
void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -16,7 +16,7 @@
#include "query.h" #include "query.h"
#include "schInt.h" #include "schInt.h"
tsem_t schdRspSem; tsem_t schdRspSem;
SSchDebug gSCHDebug = {0}; SSchDebug gSCHDebug = {0};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
@ -30,5 +30,3 @@ void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
tsem_post(&schdRspSem); tsem_post(&schdRspSem);
} }

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "catalog.h"
#include "query.h"
#include "schInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h" #include "tref.h"
void schFreeFlowCtrl(SSchJob *pJob) { void schFreeFlowCtrl(SSchJob *pJob) {
@ -25,7 +25,7 @@ void schFreeFlowCtrl(SSchJob *pJob) {
} }
SSchFlowControl *ctrl = NULL; SSchFlowControl *ctrl = NULL;
void *pIter = taosHashIterate(pJob->flowCtrl, NULL); void *pIter = taosHashIterate(pJob->flowCtrl, NULL);
while (pIter) { while (pIter) {
ctrl = (SSchFlowControl *)pIter; ctrl = (SSchFlowControl *)pIter;
@ -59,7 +59,8 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pJob->flowCtrl = taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pJob->flowCtrl =
taosHashInit(pJob->taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pJob->flowCtrl) { if (NULL == pJob->flowCtrl) {
SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum); SCH_JOB_ELOG("taosHashInit %d flowCtrl failed", pJob->taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -73,10 +74,10 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
} }
int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) { int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level; SSchLevel *pLevel = pTask->level;
SSchFlowControl *ctrl = NULL; SSchFlowControl *ctrl = NULL;
int32_t code = 0; int32_t code = 0;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp)); ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
if (NULL == ctrl) { if (NULL == ctrl) {
@ -93,8 +94,8 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
--ctrl->execTaskNum; --ctrl->execTaskNum;
ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum; ctrl->tableNumSum -= pTask->plan->execNodeStat.tableNum;
SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task quota removed, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
_return: _return:
@ -104,10 +105,10 @@ _return:
} }
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) { int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SSchLevel *pLevel = pTask->level; SSchLevel *pLevel = pTask->level;
int32_t code = 0; int32_t code = 0;
SSchFlowControl *ctrl = NULL; SSchFlowControl *ctrl = NULL;
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
do { do {
ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp)); ctrl = (SSchFlowControl *)taosHashGet(pJob->flowCtrl, ep, sizeof(SEp));
@ -124,8 +125,8 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task quota added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, nctrl.tableNumSum, nctrl.execTaskNum);
*enough = true; *enough = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -173,14 +174,15 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
_return: _return:
SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task quota %s added, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d",
((*enough)?"":"NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ((*enough) ? "" : "NOT"), ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum,
ctrl->execTaskNum);
SCH_UNLOCK(SCH_WRITE, &ctrl->lock); SCH_UNLOCK(SCH_WRITE, &ctrl->lock);
SCH_RET(code); SCH_RET(code);
} }
int32_t schTaskTableNumCompare(const void* key1, const void* key2) { int32_t schTaskTableNumCompare(const void *key1, const void *key2) {
SSchTask *pTask1 = *(SSchTask **)key1; SSchTask *pTask1 = *(SSchTask **)key1;
SSchTask *pTask2 = *(SSchTask **)key2; SSchTask *pTask2 = *(SSchTask **)key2;
@ -193,7 +195,6 @@ int32_t schTaskTableNumCompare(const void* key1, const void* key2) {
} }
} }
int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SCH_LOCK(SCH_WRITE, &ctrl->lock); SCH_LOCK(SCH_WRITE, &ctrl->lock);
@ -202,13 +203,13 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum; int32_t remainNum = schMgmt.cfg.maxNodeTableNum - ctrl->tableNumSum;
int32_t taskNum = taosArrayGetSize(ctrl->taskList); int32_t taskNum = taosArrayGetSize(ctrl->taskList);
int32_t code = 0; int32_t code = 0;
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
if (taskNum > 1 && !ctrl->sorted) { if (taskNum > 1 && !ctrl->sorted) {
taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order taosArraySort(ctrl->taskList, schTaskTableNumCompare); // desc order
} }
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
@ -216,8 +217,8 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task NOT to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
continue; continue;
} }
@ -227,15 +228,15 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
taosArrayRemove(ctrl->taskList, i); taosArrayRemove(ctrl->taskList, i);
SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("task to launch, fqdn:%s, port:%d, tableNum:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn,
ep->fqdn, ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum); ep->port, pTask->plan->execNodeStat.tableNum, ctrl->tableNumSum, ctrl->execTaskNum);
SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask)); SCH_ERR_JRET(schAsyncLaunchTaskImpl(pJob, pTask));
remainNum -= pTask->plan->execNodeStat.tableNum; remainNum -= pTask->plan->execNodeStat.tableNum;
if (remainNum <= 0) { if (remainNum <= 0) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d", ep->fqdn, ep->port,
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum); ctrl->tableNumSum, ctrl->execTaskNum);
break; break;
} }
@ -244,7 +245,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (remainNum < pLastTask->plan->execNodeStat.tableNum) { if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d", SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%d, remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);
break; break;
} }
@ -265,7 +266,6 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { if (!SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -281,10 +281,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);; int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);
;
SCH_ERR_RET(code); SCH_ERR_RET(code);
return code; // to avoid compiler error return code; // to avoid compiler error
} }

View File

@ -53,8 +53,7 @@ _return:
bool schJobDone(SSchJob *pJob) { bool schJobDone(SSchJob *pJob) {
int8_t status = SCH_GET_JOB_STATUS(pJob); int8_t status = SCH_GET_JOB_STATUS(pJob);
return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || return (status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_DROP || status == JOB_TASK_STATUS_SUCC);
status == JOB_TASK_STATUS_SUCC);
} }
FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
@ -235,7 +234,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SSchTask* pTask = taosArrayGet(pLevel->subTasks, 0); SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) { if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
pJob->attr.needFetch = true; pJob->attr.needFetch = true;
} }
@ -244,7 +243,6 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) { if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -255,7 +253,6 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
pJob->queryId = pDag->queryId; pJob->queryId = pDag->queryId;
@ -373,8 +370,7 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode); pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows; pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->execRes.res; pRes->res = pJob->execRes.res;
@ -387,7 +383,7 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
int32_t code = 0; int32_t code = 0;
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
@ -426,8 +422,8 @@ _return:
return code; return code;
} }
int32_t schNotifyUserExecRes(SSchJob* pJob) { int32_t schNotifyUserExecRes(SSchJob *pJob) {
SExecResult* pRes = taosMemoryCalloc(1, sizeof(SExecResult)); SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
if (pRes) { if (pRes) {
schDumpJobExecRes(pJob, pRes); schDumpJobExecRes(pJob, pRes);
} }
@ -439,8 +435,8 @@ int32_t schNotifyUserExecRes(SSchJob* pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schNotifyUserFetchRes(SSchJob* pJob) { int32_t schNotifyUserFetchRes(SSchJob *pJob) {
void* pRes = NULL; void *pRes = NULL;
schDumpJobFetchRes(pJob, &pRes); schDumpJobFetchRes(pJob, &pRes);
@ -507,9 +503,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailure(pJob, errCode)); }
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
@ -520,7 +514,6 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) {
SCH_ERR_RET(schLaunchFetchTask(pJob)); SCH_ERR_RET(schLaunchFetchTask(pJob));
@ -531,9 +524,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schProcessOnDataFetched(SSchJob *pJob) { void schProcessOnDataFetched(SSchJob *pJob) { schPostJobRes(pJob, SCH_OP_FETCH); }
schPostJobRes(pJob, SCH_OP_FETCH);
}
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
@ -548,14 +539,13 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_QUERY_JOB(pJob)) { if (!SCH_IS_QUERY_JOB(pJob)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSchLevel *pLevel = pTask->level; SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1); int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
if (doneNum == pLevel->taskNum) { if (doneNum == pLevel->taskNum) {
pJob->levelIdx--; pJob->levelIdx--;
@ -610,7 +600,6 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schLaunchJob(SSchJob *pJob) { int32_t schLaunchJob(SSchJob *pJob) {
if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) { if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) {
SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes)); SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes));
@ -623,11 +612,10 @@ int32_t schLaunchJob(SSchJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDropJobAllTasks(SSchJob *pJob) { void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->execTasks); schDropTaskInHashList(pJob, pJob->execTasks);
// schDropTaskInHashList(pJob, pJob->succTasks); // schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks); // schDropTaskInHashList(pJob, pJob->failTasks);
} }
void schFreeJobImpl(void *job) { void schFreeJobImpl(void *job) {
@ -659,8 +647,8 @@ void schFreeJobImpl(void *job) {
schFreeFlowCtrl(pJob); schFreeFlowCtrl(pJob);
taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->execTasks);
// taosHashCleanup(pJob->failTasks); // taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks); // taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList); taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->levels);
@ -688,7 +676,7 @@ void schFreeJobImpl(void *job) {
} }
int32_t schJobFetchRows(SSchJob *pJob) { int32_t schJobFetchRows(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_RET(schLaunchFetchTask(pJob)); SCH_ERR_RET(schLaunchFetchTask(pJob));
@ -737,8 +725,8 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->nodeList = taosArrayDup(pReq->pNodeList); pJob->nodeList = taosArrayDup(pReq->pNodeList);
} }
pJob->taskList = pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) { if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans); SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -750,8 +738,8 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs)); SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
} }
pJob->execTasks = pJob->execTasks = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) { if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans); SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -769,7 +757,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
*pJobId = pJob->refId; *pJobId = pJob->refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId); SCH_JOB_DLOG("job refId:0x%" PRIx64 " created", pJob->refId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -788,7 +776,7 @@ _return:
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
int32_t code = 0; int32_t code = 0;
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId); qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
SCH_ERR_RET(schLaunchJob(pJob)); SCH_ERR_RET(schLaunchJob(pJob));
@ -802,7 +790,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) { void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
if (NULL == pReq || pReq->syncReq) { if (NULL == pReq || pReq->syncReq) {
return; return;
} }
@ -827,7 +815,7 @@ bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
return r; return r;
} }
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode) {
int32_t op = 0; int32_t op = 0;
switch (type) { switch (type) {
@ -836,7 +824,8 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
jobTaskStatusStr(pJob->status));
} }
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDumpJobExecRes(pJob, pReq->pExecRes); schDumpJobExecRes(pJob, pReq->pExecRes);
@ -847,7 +836,8 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op),
jobTaskStatusStr(pJob->status));
} }
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
} }
@ -866,9 +856,9 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
} }
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq) {
int32_t code = 0; int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob); int8_t status = SCH_GET_JOB_STATUS(pJob);
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
@ -947,12 +937,12 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
} }
} }
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId) { int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_t rId, uint64_t tId) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
SSchJob *pJob = schAcquireJob(rId); SSchJob *pJob = schAcquireJob(rId);
if (NULL == pJob) { if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
@ -983,6 +973,3 @@ _return:
SCH_RET(code); SCH_RET(code);
} }

View File

@ -62,7 +62,7 @@ _return:
} }
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) { int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob *pJob = schAcquireJob(jobId); SSchJob* pJob = schAcquireJob(jobId);
if (NULL == pJob) { if (NULL == pJob) {
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
@ -91,5 +91,3 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
return code; return code;
} }

View File

@ -16,12 +16,12 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "qworker.h"
#include "schInt.h" #include "schInt.h"
#include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "qworker.h"
#include "tglobal.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) { void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
@ -166,7 +166,8 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
schUpdateTaskExecNode(pJob, pTask, handle, execId); schUpdateTaskExecNode(pJob, pTask, handle, execId);
if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
pTask->execId, pTask->waitRetry);
SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR);
} }
@ -837,17 +838,18 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
goto _return; goto _return;
} }
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
for (int32_t i = 0; i < resNum; ++i) { for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
pJob = schAcquireJob(localRsp->rId); pJob = schAcquireJob(localRsp->rId);
if (NULL == pJob) { if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId); qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
localRsp->tId, localRsp->rId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
} }
@ -866,7 +868,8 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
schReleaseJob(pJob->refId); schReleaseJob(pJob->refId);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code); qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
localRsp->tId, code);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
@ -879,7 +882,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
_return: _return:
for (int32_t i = 0; i < resNum; ++i) { for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
tFreeSExplainRsp(&localRsp->rsp); tFreeSExplainRsp(&localRsp->rsp);
} }
@ -890,7 +893,7 @@ _return:
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan; SSubplan *plan = pTask->plan;
int32_t code = 0; int32_t code = 0;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen); code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
@ -899,7 +902,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
pTask->msgLen); pTask->msgLen);
SCH_ERR_RET(code); SCH_ERR_RET(code);
} else if (tsQueryPlannerTrace) { } else if (tsQueryPlannerTrace) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen); qSubPlanToString(plan, &msg, &msgLen);
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
@ -917,13 +920,13 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); // SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (NULL == schMgmt.queryMgmt) { if (NULL == schMgmt.queryMgmt) {
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
} }
SArray *explainRes = NULL; SArray *explainRes = NULL;
SQWMsg qwMsg = {0}; SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
@ -935,7 +938,8 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
} }
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes)); SCH_ERR_RET(schHandleExplainRes(explainRes));
@ -959,16 +963,16 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_LOCK_TASK(pTask); SCH_LOCK_TASK(pTask);
} }
int8_t status = 0; int8_t status = 0;
int32_t code = 0; int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++; pTask->execId++;
pTask->retryTimes++; pTask->retryTimes++;
pTask->waitRetry = false; pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d",
pTask->execId, pTask->retryTimes); SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", pTask->execId, pTask->retryTimes);
SCH_LOG_TASK_START_TS(pTask); SCH_LOG_TASK_START_TS(pTask);
@ -1086,14 +1090,15 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL; void *pRsp = NULL;
SArray *explainRes = NULL; SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
} }
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes)); SCH_ERR_RET(schHandleExplainRes(explainRes));

View File

@ -22,7 +22,7 @@
#include "trpc.h" #include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
qDebug("sch acquire jobId:0x%"PRIx64, refId); qDebug("sch acquire jobId:0x%" PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
} }
@ -31,11 +31,11 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qDebug("sch release jobId:0x%"PRIx64, refId); qDebug("sch release jobId:0x%" PRIx64, refId);
return taosReleaseRef(schMgmt.jobRef, refId); return taosReleaseRef(schMgmt.jobRef, refId);
} }
char* schGetOpStr(SCH_OP_TYPE type) { char *schGetOpStr(SCH_OP_TYPE type) {
switch (type) { switch (type) {
case SCH_OP_NULL: case SCH_OP_NULL:
return "NULL"; return "NULL";
@ -56,13 +56,13 @@ void schFreeHbTrans(SSchHbTrans *pTrans) {
schFreeRpcCtx(&pTrans->rpcCtx); schFreeRpcCtx(&pTrans->rpcCtx);
} }
void schCleanClusterHb(void* pTrans) { void schCleanClusterHb(void *pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL); SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
while (hb) { while (hb) {
if (hb->trans.pTrans == pTrans) { if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL); SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
schFreeHbTrans(hb); schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
} }
@ -74,7 +74,7 @@ void schCleanClusterHb(void* pTrans) {
} }
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) { int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
int32_t code = 0; int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock); SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
@ -94,7 +94,6 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) { int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0; int32_t code = 0;
SSchHbTrans hb = {0}; SSchHbTrans hb = {0};
@ -161,7 +160,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
epId.nodeId = addr->nodeId; epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr); SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn); strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port; epId.ep.port = pEp->port;
@ -184,15 +183,13 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
pTask->registerdHb = false; pTask->registerdHb = false;
} }
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0}; SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId; epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr); SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn); strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port; epId.ep.port = pEp->port;
@ -226,7 +223,6 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schCloseJobRef(void) { void schCloseJobRef(void) {
if (!atomic_load_8((int8_t *)&schMgmt.exit)) { if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
return; return;
@ -242,7 +238,7 @@ uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
uint64_t schGenUUID(void) { uint64_t schGenUUID(void) {
static uint64_t hashId = 0; static uint64_t hashId = 0;
static int32_t requestSerialId = 0; static int32_t requestSerialId = 0;
if (hashId == 0) { if (hashId == 0) {
char uid[64] = {0}; char uid[64] = {0};
@ -254,15 +250,14 @@ uint64_t schGenUUID(void) {
} }
} }
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId(); uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1); int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id; return id;
} }
void schFreeRpcCtxVal(const void *arg) { void schFreeRpcCtxVal(const void *arg) {
if (NULL == arg) { if (NULL == arg) {
return; return;
@ -307,5 +302,3 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -14,10 +14,10 @@
*/ */
#include "query.h" #include "query.h"
#include "qworker.h"
#include "schInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "qworker.h"
SSchedulerMgmt schMgmt = { SSchedulerMgmt schMgmt = {
.jobRef = -1, .jobRef = -1,
@ -61,7 +61,7 @@ int32_t schedulerInit() {
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) { int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler %s exec job start", pReq->syncReq ? "SYNC" : "ASYNC"); qDebug("scheduler %s exec job start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0; int32_t code = 0;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pJobId, pReq)); SCH_ERR_JRET(schInitJob(pJobId, pReq));
@ -144,7 +144,7 @@ int32_t schedulerEnableReSchedule(bool enableResche) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schedulerFreeJob(int64_t* jobId, int32_t errCode) { void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
if (0 == *jobId) { if (0 == *jobId) {
return; return;
} }

View File

@ -30,15 +30,12 @@
#endif #endif
#include "os.h" #include "os.h"
#include "tglobal.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "catalog.h" #include "catalog.h"
#include "scheduler.h" #include "scheduler.h"
#include "taos.h" #include "taos.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdef.h" #include "tdef.h"
#include "tglobal.h"
#include "trpc.h" #include "trpc.h"
#include "tvariant.h" #include "tvariant.h"
@ -56,8 +53,9 @@
namespace { namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize,
extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode); int32_t rspCode);
extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode);
int64_t insertJobRefId = 0; int64_t insertJobRefId = 0;
int64_t queryJobRefId = 0; int64_t queryJobRefId = 0;
@ -66,16 +64,15 @@ uint64_t schtMergeTemplateId = 0x4;
uint64_t schtFetchTaskId = 0; uint64_t schtFetchTaskId = 0;
uint64_t schtQueryId = 1; uint64_t schtQueryId = 1;
bool schtTestStop = false; bool schtTestStop = false;
bool schtTestDeadLoop = false; bool schtTestDeadLoop = false;
int32_t schtTestMTRunSec = 10; int32_t schtTestMTRunSec = 10;
int32_t schtTestPrintNum = 1000; int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0; int32_t schtStartFetch = 0;
void schtInitLogFile() { void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog"; const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10; const int32_t maxLogFileNum = 10;
tsAsyncLog = 0; tsAsyncLog = 0;
qDebugFlag = 159; qDebugFlag = 159;
@ -84,23 +81,21 @@ void schtInitLogFile() {
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir); printf("failed to open log file in directory:%s\n", tsLogDir);
} }
} }
void schtQueryCb(SExecResult* pResult, void* param, int32_t code) { void schtQueryCb(SExecResult *pResult, void *param, int32_t code) {
assert(TSDB_CODE_SUCCESS == code); assert(TSDB_CODE_SUCCESS == code);
*(int32_t*)param = 1; *(int32_t *)param = 1;
} }
void schtBuildQueryDag(SQueryPlan *dag) { void schtBuildQueryDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId; uint64_t qId = schtQueryId;
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList(); dag->pSubplans = nodesMakeList();
SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
@ -117,7 +112,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL; scanPlan->pChildren = NULL;
scanPlan->level = 1; scanPlan->level = 1;
scanPlan->pParents = nodesMakeList(); scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode)); scanPlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan->msgType = TDMT_SCH_QUERY; scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId; mergePlan->id.queryId = qId;
@ -129,31 +124,31 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->pChildren = nodesMakeList(); mergePlan->pChildren = nodesMakeList();
mergePlan->pParents = NULL; mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode)); mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_SCH_QUERY; mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList(); merge->pNodeList = nodesMakeList();
scan->pNodeList = nodesMakeList(); scan->pNodeList = nodesMakeList();
nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(scan->pNodeList, (SNode*)scanPlan); nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan); nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scanPlan->pParents, (SNode*)mergePlan); nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode*)merge); nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode*)scan); nodesListAppend(dag->pSubplans, (SNode *)scan);
} }
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
uint64_t qId = schtQueryId; uint64_t qId = schtQueryId;
int32_t scanPlanNum = 20; int32_t scanPlanNum = 20;
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList(); dag->pSubplans = nodesMakeList();
SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan)); SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan));
@ -180,13 +175,13 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan[i].pChildren = NULL; scanPlan[i].pChildren = NULL;
scanPlan[i].level = 1; scanPlan[i].level = 1;
scanPlan[i].pParents = nodesMakeList(); scanPlan[i].pParents = nodesMakeList();
scanPlan[i].pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode)); scanPlan[i].pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
scanPlan[i].msgType = TDMT_SCH_QUERY; scanPlan[i].msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan[i].pParents, (SNode*)mergePlan); nodesListAppend(scanPlan[i].pParents, (SNode *)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)(scanPlan + i)); nodesListAppend(mergePlan->pChildren, (SNode *)(scanPlan + i));
nodesListAppend(scan->pNodeList, (SNode*)(scanPlan + i)); nodesListAppend(scan->pNodeList, (SNode *)(scanPlan + i));
} }
mergePlan->id.queryId = qId; mergePlan->id.queryId = qId;
@ -197,20 +192,16 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->execNode.epSet.numOfEps = 0; mergePlan->execNode.epSet.numOfEps = 0;
mergePlan->pParents = NULL; mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)taosMemoryCalloc(1, sizeof(SPhysiNode)); mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_SCH_QUERY; mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode*)merge);
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {
nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode *)scan);
} }
void schtFreeQueryDag(SQueryPlan *dag) {}
void schtBuildInsertDag(SQueryPlan *dag) { void schtBuildInsertDag(SQueryPlan *dag) {
uint64_t qId = 0x0000000000000002; uint64_t qId = 0x0000000000000002;
@ -218,7 +209,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList(); dag->pSubplans = nodesMakeList();
SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan)); SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan));
@ -235,7 +226,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[0].pChildren = NULL; insertPlan[0].pChildren = NULL;
insertPlan[0].pParents = NULL; insertPlan[0].pParents = NULL;
insertPlan[0].pNode = NULL; insertPlan[0].pNode = NULL;
insertPlan[0].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode)); insertPlan[0].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
insertPlan[0].msgType = TDMT_VND_SUBMIT; insertPlan[0].msgType = TDMT_VND_SUBMIT;
insertPlan[1].id.queryId = qId; insertPlan[1].id.queryId = qId;
@ -251,48 +242,43 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[1].pChildren = NULL; insertPlan[1].pChildren = NULL;
insertPlan[1].pParents = NULL; insertPlan[1].pParents = NULL;
insertPlan[1].pNode = NULL; insertPlan[1].pNode = NULL;
insertPlan[1].pDataSink = (SDataSinkNode*)taosMemoryCalloc(1, sizeof(SDataSinkNode)); insertPlan[1].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode));
insertPlan[1].msgType = TDMT_VND_SUBMIT; insertPlan[1].msgType = TDMT_VND_SUBMIT;
inserta->pNodeList = nodesMakeList(); inserta->pNodeList = nodesMakeList();
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
insertPlan += 1; insertPlan += 1;
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
nodesListAppend(dag->pSubplans, (SNode*)inserta); nodesListAppend(dag->pSubplans, (SNode *)inserta);
} }
int32_t schtPlanToString(const SSubplan *subplan, char **str, int32_t *len) {
int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
*str = (char *)taosMemoryCalloc(1, 20); *str = (char *)taosMemoryCalloc(1, 20);
*len = 20; *len = 20;
return 0; return 0;
} }
void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) { void schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) {}
} void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {}
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
}
void schtSetPlanToString() { void schtSetPlanToString() {
static Stub stub; static Stub stub;
stub.set(qSubPlanToString, schtPlanToString); stub.set(qSubPlanToString, schtPlanToString);
{ {
#ifdef WINDOWS #ifdef WINDOWS
AddrAny any; AddrAny any;
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_func_addr("qSubPlanToString", result); any.get_func_addr("qSubPlanToString", result);
#endif #endif
#ifdef LINUX #ifdef LINUX
AddrAny any("libplanner.so"); AddrAny any("libplanner.so");
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_global_func_addr_dynsym("^qSubPlanToString$", result); any.get_global_func_addr_dynsym("^qSubPlanToString$", result);
#endif #endif
for (const auto& f : result) { for (const auto &f : result) {
stub.set(f.second, schtPlanToString); stub.set(f.second, schtPlanToString);
} }
} }
@ -303,16 +289,16 @@ void schtSetExecNode() {
stub.set(qSetSubplanExecutionNode, schtExecNode); stub.set(qSetSubplanExecutionNode, schtExecNode);
{ {
#ifdef WINDOWS #ifdef WINDOWS
AddrAny any; AddrAny any;
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_func_addr("qSetSubplanExecutionNode", result); any.get_func_addr("qSetSubplanExecutionNode", result);
#endif #endif
#ifdef LINUX #ifdef LINUX
AddrAny any("libplanner.so"); AddrAny any("libplanner.so");
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result); any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result);
#endif #endif
for (const auto& f : result) { for (const auto &f : result) {
stub.set(f.second, schtExecNode); stub.set(f.second, schtExecNode);
} }
} }
@ -323,22 +309,22 @@ void schtSetRpcSendRequest() {
stub.set(rpcSendRequest, schtRpcSendRequest); stub.set(rpcSendRequest, schtRpcSendRequest);
{ {
#ifdef WINDOWS #ifdef WINDOWS
AddrAny any; AddrAny any;
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_func_addr("rpcSendRequest", result); any.get_func_addr("rpcSendRequest", result);
#endif #endif
#ifdef LINUX #ifdef LINUX
AddrAny any("libtransport.so"); AddrAny any("libtransport.so");
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_global_func_addr_dynsym("^rpcSendRequest$", result); any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
#endif #endif
for (const auto& f : result) { for (const auto &f : result) {
stub.set(f.second, schtRpcSendRequest); stub.set(f.second, schtRpcSendRequest);
} }
} }
} }
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo) {
if (pInfo) { if (pInfo) {
taosMemoryFreeClear(pInfo->param); taosMemoryFreeClear(pInfo->param);
taosMemoryFreeClear(pInfo->msgInfo.pData); taosMemoryFreeClear(pInfo->msgInfo.pData);
@ -347,32 +333,30 @@ int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTr
return 0; return 0;
} }
void schtSetAsyncSendMsgToServer() { void schtSetAsyncSendMsgToServer() {
static Stub stub; static Stub stub;
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer); stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
{ {
#ifdef WINDOWS #ifdef WINDOWS
AddrAny any; AddrAny any;
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_func_addr("asyncSendMsgToServer", result); any.get_func_addr("asyncSendMsgToServer", result);
#endif #endif
#ifdef LINUX #ifdef LINUX
AddrAny any("libtransport.so"); AddrAny any("libtransport.so");
std::map<std::string,void*> result; std::map<std::string, void *> result;
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result); any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
#endif #endif
for (const auto& f : result) { for (const auto &f : result) {
stub.set(f.second, schtAsyncSendMsgToServer); stub.set(f.second, schtAsyncSendMsgToServer);
} }
} }
} }
void *schtSendRsp(void *param) { void *schtSendRsp(void *param) {
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
int64_t job = 0; int64_t job = 0;
int32_t code = 0; int32_t code = 0;
while (true) { while (true) {
job = *(int64_t *)param; job = *(int64_t *)param;
@ -402,12 +386,12 @@ void *schtSendRsp(void *param) {
} }
void *schtCreateFetchRspThread(void *param) { void *schtCreateFetchRspThread(void *param) {
int64_t job = *(int64_t *)param; int64_t job = *(int64_t *)param;
SSchJob* pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
taosSsleep(1); taosSsleep(1);
int32_t code = 0; int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = 10; rsp->numOfRows = 10;
@ -420,10 +404,9 @@ void *schtCreateFetchRspThread(void *param) {
return NULL; return NULL;
} }
void *schtFetchRspThread(void *aa) { void *schtFetchRspThread(void *aa) {
SDataBuf dataBuf = {0}; SDataBuf dataBuf = {0};
SSchTaskCallbackParam* param = NULL; SSchTaskCallbackParam *param = NULL;
while (!schtTestStop) { while (!schtTestStop) {
if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) { if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) {
@ -437,7 +420,7 @@ void *schtFetchRspThread(void *aa) {
param->queryId = schtQueryId; param->queryId = schtQueryId;
param->taskId = schtFetchTaskId; param->taskId = schtFetchTaskId;
int32_t code = 0; int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = 10; rsp->numOfRows = 10;
@ -454,7 +437,7 @@ void *schtFetchRspThread(void *aa) {
void schtFreeQueryJob(int32_t freeThread) { void schtFreeQueryJob(int32_t freeThread) {
static uint32_t freeNum = 0; static uint32_t freeNum = 0;
int64_t job = queryJobRefId; int64_t job = queryJobRefId;
if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) { if (job && atomic_val_compare_exchange_64(&queryJobRefId, job, 0)) {
schedulerFreeJob(&job, 0); schedulerFreeJob(&job, 0);
@ -466,31 +449,29 @@ void schtFreeQueryJob(int32_t freeThread) {
} }
} }
void* schtRunJobThread(void *aa) { void *schtRunJobThread(void *aa) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SQueryPlan dag; SQueryPlan dag;
schtInitLogFile(); schtInitLogFile();
int32_t code = schedulerInit(); int32_t code = schedulerInit();
assert(code == 0); assert(code == 0);
schtSetPlanToString(); schtSetPlanToString();
schtSetExecNode(); schtSetExecNode();
schtSetAsyncSendMsgToServer(); schtSetAsyncSendMsgToServer();
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
SSchTaskCallbackParam *param = NULL; SSchTaskCallbackParam *param = NULL;
SHashObj *execTasks = NULL; SHashObj *execTasks = NULL;
SDataBuf dataBuf = {0}; SDataBuf dataBuf = {0};
uint32_t jobFinished = 0; uint32_t jobFinished = 0;
int32_t queryDone = 0; int32_t queryDone = 0;
while (!schtTestStop) { while (!schtTestStop) {
schtBuildQueryDag(&dag); schtBuildQueryDag(&dag);
@ -573,7 +554,6 @@ void* schtRunJobThread(void *aa) {
pIter = taosHashIterate(execTasks, pIter); pIter = taosHashIterate(execTasks, pIter);
} }
while (true) { while (true) {
if (queryDone) { if (queryDone) {
break; break;
@ -620,7 +600,7 @@ void* schtRunJobThread(void *aa) {
return NULL; return NULL;
} }
void* schtFreeJobThread(void *aa) { void *schtFreeJobThread(void *aa) {
while (!schtTestStop) { while (!schtTestStop) {
taosUsleep(taosRand() % 100); taosUsleep(taosRand() % 100);
schtFreeQueryJob(1); schtFreeQueryJob(1);
@ -628,17 +608,16 @@ void* schtFreeJobThread(void *aa) {
return NULL; return NULL;
} }
} // namespace
}
TEST(queryTest, normalCase) { TEST(queryTest, normalCase) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
int64_t job = 0; int64_t job = 0;
SQueryPlan dag; SQueryPlan dag;
memset(&dag, 0, sizeof(dag)); memset(&dag, 0, sizeof(dag));
@ -673,7 +652,6 @@ TEST(queryTest, normalCase) {
code = schedulerExecJob(&req, &job); code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL); void *pIter = taosHashIterate(pJob->execTasks, NULL);
@ -739,13 +717,13 @@ TEST(queryTest, normalCase) {
} }
TEST(queryTest, readyFirstCase) { TEST(queryTest, readyFirstCase) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
int64_t job = 0; int64_t job = 0;
SQueryPlan dag; SQueryPlan dag;
memset(&dag, 0, sizeof(dag)); memset(&dag, 0, sizeof(dag));
@ -779,7 +757,6 @@ TEST(queryTest, readyFirstCase) {
code = schedulerExecJob(&req, &job); code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL); void *pIter = taosHashIterate(pJob->execTasks, NULL);
@ -812,7 +789,6 @@ TEST(queryTest, readyFirstCase) {
taosUsleep(10000); taosUsleep(10000);
} }
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
@ -844,16 +820,14 @@ TEST(queryTest, readyFirstCase) {
schedulerDestroy(); schedulerDestroy();
} }
TEST(queryTest, flowCtrlCase) { TEST(queryTest, flowCtrlCase) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
int64_t job = 0; int64_t job = 0;
SQueryPlan dag; SQueryPlan dag;
schtInitLogFile(); schtInitLogFile();
@ -875,7 +849,7 @@ TEST(queryTest, flowCtrlCase) {
schtSetExecNode(); schtSetExecNode();
schtSetAsyncSendMsgToServer(); schtSetAsyncSendMsgToServer();
int32_t queryDone = 0; int32_t queryDone = 0;
SRequestConnInfo conn = {0}; SRequestConnInfo conn = {0};
conn.pTrans = mockPointer; conn.pTrans = mockPointer;
SSchedulerReq req = {0}; SSchedulerReq req = {0};
@ -889,7 +863,6 @@ TEST(queryTest, flowCtrlCase) {
code = schedulerExecJob(&req, &job); code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
bool qDone = false; bool qDone = false;
@ -958,15 +931,14 @@ TEST(queryTest, flowCtrlCase) {
schedulerDestroy(); schedulerDestroy();
} }
TEST(insertTest, normalCase) { TEST(insertTest, normalCase) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SQueryPlan dag; SQueryPlan dag;
uint64_t numOfRows = 0; uint64_t numOfRows = 0;
SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
@ -1032,7 +1004,7 @@ TEST(multiThread, forceFree) {
taosSsleep(3); taosSsleep(3);
} }
int main(int argc, char** argv) { int main(int argc, char **argv) {
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();

View File

@ -12,25 +12,7 @@ FORMAT_DIR_LIST=(
"${PRJ_ROOT_DIR}/source/os" "${PRJ_ROOT_DIR}/source/os"
"${PRJ_ROOT_DIR}/source/util" "${PRJ_ROOT_DIR}/source/util"
"${PRJ_ROOT_DIR}/source/common" "${PRJ_ROOT_DIR}/source/common"
"${PRJ_ROOT_DIR}/source/libs/cache" "${PRJ_ROOT_DIR}/source/libs"
"${PRJ_ROOT_DIR}/source/libs/catalog"
"${PRJ_ROOT_DIR}/source/libs/command"
"${PRJ_ROOT_DIR}/source/libs/executor"
"${PRJ_ROOT_DIR}/source/libs/function"
"${PRJ_ROOT_DIR}/source/libs/index"
"${PRJ_ROOT_DIR}/source/libs/monitor"
"${PRJ_ROOT_DIR}/source/libs/nodes"
# "${PRJ_ROOT_DIR}/source/libs/parser"
"${PRJ_ROOT_DIR}/source/libs/planner"
"${PRJ_ROOT_DIR}/source/libs/qcom"
"${PRJ_ROOT_DIR}/source/libs/qworker"
"${PRJ_ROOT_DIR}/source/libs/scalar"
"${PRJ_ROOT_DIR}/source/libs/stream"
"${PRJ_ROOT_DIR}/source/libs/sync"
"${PRJ_ROOT_DIR}/source/libs/tdb"
"${PRJ_ROOT_DIR}/source/libs/tfs"
"${PRJ_ROOT_DIR}/source/libs/transport"
"${PRJ_ROOT_DIR}/source/libs/wal"
"${PRJ_ROOT_DIR}/source/client/inc" "${PRJ_ROOT_DIR}/source/client/inc"
"${PRJ_ROOT_DIR}/source/client/src" "${PRJ_ROOT_DIR}/source/client/src"
"${PRJ_ROOT_DIR}/source/client/test" "${PRJ_ROOT_DIR}/source/client/test"
@ -45,7 +27,7 @@ EXCLUDE_FILE_LIST=(
) )
for d in ${FORMAT_DIR_LIST[@]}; do for d in ${FORMAT_DIR_LIST[@]}; do
for f in $(find $d -type f -regex '.*\.\(cpp\|hpp\|c\|h\)'); do for f in $(find $d -type f -not -name "*/sql.c" -regex '.*\.\(cpp\|hpp\|c\|h\)'); do
${FORMAT_BIN} -i $f ${FORMAT_BIN} -i $f
done done
done done