Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-30987-3

This commit is contained in:
Hongze Cheng 2024-07-16 20:19:31 +08:00
commit 7cce77c260
19 changed files with 208 additions and 86 deletions

View File

@ -50,7 +50,7 @@ typedef struct {
int32_t auditInit(const SAuditCfg *pCfg); int32_t auditInit(const SAuditCfg *pCfg);
void auditCleanup(); void auditCleanup();
void auditSend(SJson *pJson); int32_t auditSend(SJson *pJson);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len); char *detail, int32_t len);
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,

View File

@ -23,8 +23,6 @@ extern "C" {
#include "catalog.h" #include "catalog.h"
#include "planner.h" #include "planner.h"
extern tsem_t schdRspSem;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed int64_t endTs; // the timestamp when the task is completed
@ -101,8 +99,6 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void); void schedulerDestroy(void);
void schdExecCallback(SExecResult* pResult, void* param, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -53,6 +53,8 @@ void taosPrintBackTrace();
void taosMemoryTrim(int32_t size); void taosMemoryTrim(int32_t size);
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))
#define taosMemoryFreeClear(ptr) \ #define taosMemoryFreeClear(ptr) \
do { \ do { \
if (ptr) { \ if (ptr) { \

View File

@ -59,6 +59,8 @@ typedef enum { M2C = 0, C2M } ConvType;
(dst)[(size)-1] = 0; \ (dst)[(size)-1] = 0; \
} while (0) } while (0)
#define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src))
char *tstrdup(const char *src); char *tstrdup(const char *src);
int32_t taosUcs4len(TdUcs4 *ucs4); int32_t taosUcs4len(TdUcs4 *ucs4);
int64_t taosStr2int64(const char *str); int64_t taosStr2int64(const char *str);

View File

@ -942,6 +942,11 @@ int32_t taosGetErrSize();
// UTIL // UTIL
#define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000) #define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000)
// AUDIT
#define TSDB_CODE_AUDIT_NOT_FORMAT_TO_JSON TAOS_DEF_ERROR_CODE(0, 0x6100)
#define TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD TAOS_DEF_ERROR_CODE(0, 0x6101)
#define TSDB_CODE_AUDIT_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0x6102)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -32,8 +32,8 @@ char* tsAuditBatchUri = "/audit-batch";
int32_t auditInit(const SAuditCfg *pCfg) { int32_t auditInit(const SAuditCfg *pCfg) {
tsAudit.cfg = *pCfg; tsAudit.cfg = *pCfg;
tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *)); tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *));
taosThreadMutexInit(&tsAudit.lock, NULL); if(tsAudit.records == NULL) return TSDB_CODE_OUT_OF_MEMORY;
return 0; return taosThreadMutexInit(&tsAudit.lock, NULL);
} }
static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) { static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) {

View File

@ -218,8 +218,8 @@ typedef struct SQWorkerMgmt {
#define QW_IDS() sId, qId, tId, rId, eId #define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS() #define QW_FPARAMS() mgmt, QW_IDS()
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)
#define QW_STAT_GET(_item) atomic_load_64(&(_item)) #define QW_STAT_GET(_item) atomic_load_64(&(_item))
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) #define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
@ -250,7 +250,7 @@ typedef struct SQWorkerMgmt {
} while (0) } while (0)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_UPDATE_RSP_CODE(ctx, code) (void)atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)

View File

@ -221,11 +221,11 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
SEpSet epSet = {0}; SEpSet epSet = {0};
epSet.inUse = 1; epSet.inUse = 1;
epSet.numOfEps = 3; epSet.numOfEps = 3;
strcpy(epSet.eps[0].fqdn, "localhost"); TAOS_STRCPY(epSet.eps[0].fqdn, "localhost");
epSet.eps[0].port = 7100; epSet.eps[0].port = 7100;
strcpy(epSet.eps[1].fqdn, "localhost"); TAOS_STRCPY(epSet.eps[1].fqdn, "localhost");
epSet.eps[1].port = 7200; epSet.eps[1].port = 7200;
strcpy(epSet.eps[2].fqdn, "localhost"); TAOS_STRCPY(epSet.eps[2].fqdn, "localhost");
epSet.eps[2].port = 7300; epSet.eps[2].port = 7300;
ctx->phase = QW_PHASE_POST_QUERY; ctx->phase = QW_PHASE_POST_QUERY;

View File

@ -21,7 +21,7 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r
} }
if (NULL == *rsp) { if (NULL == *rsp) {
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
} }
*rsp = pRsp; *rsp = pRsp;
@ -169,7 +169,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
if (NULL == pRsp) { if (NULL == pRsp) {
QW_RET(terrno); QW_RET(terrno);
} }
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
dataLength = 0; dataLength = 0;
} }

View File

@ -342,7 +342,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (NULL == pExec) { if (NULL == pExec) {
QW_ERR_JRET(terrno); QW_ERR_JRET(terrno);
} }
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); (void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
localRsp.rsp.subplanInfo = pExec; localRsp.rsp.subplanInfo = pExec;
localRsp.qId = qId; localRsp.qId = qId;
localRsp.tId = tId; localRsp.tId = tId;
@ -537,7 +537,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
} }
if (dbFName[0] && tbName[0]) { if (dbFName[0] && tbName[0]) {
sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName); (void)sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else { } else {
tbInfo.tbFName[0] = 0; tbInfo.tbFName[0] = 0;
} }
@ -613,7 +613,7 @@ void qwDestroyImpl(void *pMgmt) {
taosMemoryFree(mgmt); taosMemoryFree(mgmt);
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
qwCloseRef(); qwCloseRef();

View File

@ -452,8 +452,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
pRes->affectedRows = pDelRes->affectedRows; pRes->affectedRows = pDelRes->affectedRows;
strcpy(pRes->tableFName, pDelRes->tableName); TAOS_STRCPY(pRes->tableFName, pDelRes->tableName);
strcpy(pRes->tsColName, pDelRes->tsColName); TAOS_STRCPY(pRes->tsColName, pDelRes->tsColName);
taosMemoryFree(output.pData); taosMemoryFree(output.pData);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1126,8 +1126,8 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
sch->hbConnInfo.handle = NULL; sch->hbConnInfo.handle = NULL;
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); (void)memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); (void)memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
QW_UNLOCK(QW_WRITE, &sch->hbConnLock); QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
@ -1138,7 +1138,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); (void)memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
@ -1299,19 +1299,19 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
if (1 == qwNum) { if (1 == qwNum) {
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); TAOS_MEMSET(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
} }
int32_t code = qwOpenRef(); int32_t code = qwOpenRef();
if (code) { if (code) {
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
QW_RET(code); QW_RET(code);
} }
SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker)); SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
if (NULL == mgmt) { if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorker)); qError("calloc %d failed", (int32_t)sizeof(SQWorker));
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
QW_RET(TSDB_CODE_OUT_OF_MEMORY); QW_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -1345,7 +1345,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
if (pMsgCb) { if (pMsgCb) {
mgmt->msgCb = *pMsgCb; mgmt->msgCb = *pMsgCb;
} else { } else {
memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); TAOS_MEMSET(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
} }
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
@ -1379,7 +1379,7 @@ _return:
taosTmrCleanUp(mgmt->timer); taosTmrCleanUp(mgmt->timer);
taosMemoryFreeClear(mgmt); taosMemoryFreeClear(mgmt);
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
} }
QW_RET(code); QW_RET(code);

View File

@ -170,7 +170,7 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
printf("malloc failed"); printf("malloc failed");
assert(0); assert(0);
} }
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); (void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
qwtTestFetchQueueWIdx = 0; qwtTestFetchQueueWIdx = 0;
@ -199,7 +199,7 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
printf("malloc failed"); printf("malloc failed");
assert(0); assert(0);
} }
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); (void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
qwtTestQueryQueueWIdx = 0; qwtTestQueryQueueWIdx = 0;

View File

@ -583,7 +583,7 @@ char *schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes); void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode); void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
@ -606,7 +606,7 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);

View File

@ -16,17 +16,5 @@
#include "query.h" #include "query.h"
#include "schInt.h" #include "schInt.h"
tsem_t schdRspSem;
SSchDebug gSCHDebug = {0}; SSchDebug gSCHDebug = {0};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
pResult->code = code;
}
*(SExecResult*)param = *pResult;
taosMemoryFree(pResult);
tsem_post(&schdRspSem);
}

View File

@ -50,6 +50,10 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
if (NULL == pTask) {
SCH_JOB_DLOG("fail to get the %dth task", i);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
sum += pTask->plan->execNodeStat.tableNum; sum += pTask->plan->execNodeStat.tableNum;
} }
@ -214,6 +218,10 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task", i);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
@ -243,6 +251,11 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
if (i < (taskNum - 1)) { if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (NULL == pLastTask) {
SCH_JOB_ELOG("fail to get the last task, num:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (remainNum < pLastTask->plan->execNodeStat.tableNum) { if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d", SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);

View File

@ -47,6 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
return; return;
_return: _return:
SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode)); SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
} }
@ -166,9 +167,18 @@ _return:
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) { for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i); SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t m = 0; m < pLevel->taskNum; ++m) { for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSubplan *pPlan = pTask->plan; SSubplan *pPlan = pTask->plan;
int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
@ -188,6 +198,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t n = 0; n < childNum; ++n) { for (int32_t n = 0; n < childNum; ++n) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
if (NULL == child) {
SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) { if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -222,6 +237,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t n = 0; n < parentNum; ++n) { for (int32_t n = 0; n < parentNum; ++n) {
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
if (NULL == parent) {
SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) { if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -241,6 +261,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
} }
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (SCH_IS_QUERY_JOB(pJob)) { if (SCH_IS_QUERY_JOB(pJob)) {
if (pLevel->taskNum > 1) { if (pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
@ -248,6 +273,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
} }
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0); SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) { if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
pJob->attr.needFetch = true; pJob->attr.needFetch = true;
} }
@ -261,7 +291,9 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosArrayPush(pJob->dataSrcTasks, &pTask); if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) {
SCH_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -318,6 +350,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
} }
pLevel = taosArrayGet(pJob->levels, i); pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pLevel->level = i; pLevel->level = i;
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
@ -342,6 +379,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
for (int32_t n = 0; n < taskNum; ++n) { for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
if (NULL == plan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_SET_JOB_TYPE(pJob, plan->subplanType); SCH_SET_JOB_TYPE(pJob, plan->subplanType);
@ -383,7 +424,7 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) { void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
pRes->code = atomic_load_32(&pJob->errCode); pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows; pRes->numOfRows = pJob->resNumOfRows;
@ -395,8 +436,6 @@ int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code)); SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));
return TSDB_CODE_SUCCESS;
} }
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
@ -421,10 +460,13 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
if (NULL == *pData) { if (NULL == *pData) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (rsp) { if (NULL == rsp) {
rsp->completed = 1; SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno);
SCH_ERR_JRET(terrno);
} }
rsp->completed = 1;
*pData = rsp; *pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code); SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
} }
@ -440,10 +482,13 @@ _return:
int32_t schNotifyUserExecRes(SSchJob *pJob) { int32_t schNotifyUserExecRes(SSchJob *pJob) {
SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult)); SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
if (pRes) { if (NULL == pRes) {
schDumpJobExecRes(pJob, pRes); qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno);
SCH_RET(terrno);
} }
schDumpJobExecRes(pJob, pRes);
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
@ -454,7 +499,10 @@ int32_t schNotifyUserExecRes(SSchJob *pJob) {
int32_t schNotifyUserFetchRes(SSchJob *pJob) { int32_t schNotifyUserFetchRes(SSchJob *pJob) {
void *pRes = NULL; void *pRes = NULL;
schDumpJobFetchRes(pJob, &pRes); int32_t code = schDumpJobFetchRes(pJob, &pRes);
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) {
atomic_store_32(&pJob->errCode, code);
}
SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
@ -478,13 +526,13 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
if (SCH_JOB_IN_SYNC_OP(pJob)) { if (SCH_JOB_IN_SYNC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
tsem_post(&pJob->rspSem); (void)tsem_post(&pJob->rspSem); // ignore error
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserExecRes(pJob); (void)schNotifyUserExecRes(pJob); // ignore error
} else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) { } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserFetchRes(pJob); (void)schNotifyUserFetchRes(pJob); // ignore error
} else { } else {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
@ -519,7 +567,8 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); // ignore error
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
@ -530,7 +579,8 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); // ignore error
return TSDB_CODE_SCH_IGNORE_ERROR; return TSDB_CODE_SCH_IGNORE_ERROR;
} }
@ -569,12 +619,21 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
SSchLevel *pLevel = pTask->level; SSchLevel *pLevel = pTask->level;
int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum); int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
if (doneNum == pLevel->taskNum) { if (doneNum == pLevel->taskNum) {
atomic_sub_fetch_32(&pJob->levelIdx, 1); (void)atomic_sub_fetch_32(&pJob->levelIdx, 1);
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t i = 0; i < pLevel->taskNum; ++i) { for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (pTask->children && taosArrayGetSize(pTask->children) > 0) { if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
continue; continue;
} }
@ -602,7 +661,11 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
} }
} }
taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo)); if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo))) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_ERR_RET(terrno);
}
taosArrayDestroy(rsp->tbVerInfo); taosArrayDestroy(rsp->tbVerInfo);
pJob->execRes.msgType = TDMT_SCH_QUERY; pJob->execRes.msgType = TDMT_SCH_QUERY;
@ -629,6 +692,11 @@ int32_t schLaunchJob(SSchJob *pJob) {
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} else { } else {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
if (NULL == level) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
} }
@ -661,10 +729,19 @@ void schFreeJobImpl(void *job) {
int32_t numOfLevels = taosArrayGetSize(pJob->levels); int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) { for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i); SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
continue;
}
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) { for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks);
continue;
}
schFreeTask(pJob, pTask); schFreeTask(pJob, pTask);
} }
@ -687,12 +764,12 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes); destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag); qDestroyQueryPlan(pJob->pDag);
nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); // ignore error
taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->fetchRes);
taosMemoryFreeClear(pJob->sql); taosMemoryFreeClear(pJob->sql);
tsem_destroy(&pJob->rspSem); (void)tsem_destroy(&pJob->rspSem); // ignore error
taosMemoryFree(pJob); taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
@ -711,7 +788,7 @@ int32_t schJobFetchRows(SSchJob *pJob) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem); (void)tsem_wait(&pJob->rspSem); // ignore error
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} }
} else { } else {
@ -739,9 +816,19 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->conn = *pReq->pConn; pJob->conn = *pReq->pConn;
if (pReq->sql) { if (pReq->sql) {
pJob->sql = taosStrdup(pReq->sql); pJob->sql = taosStrdup(pReq->sql);
if (NULL == pJob->sql) {
qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
} }
pJob->pDag = pReq->pDag; pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); if (pReq->allocatorRefId > 0) {
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
if (pJob->allocatorRefId <= 0) {
qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
SCH_ERR_JRET(terrno);
}
}
pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp; pJob->userRes.execFp = pReq->execFp;
@ -752,6 +839,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else { } else {
pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL); pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
if (NULL == pJob->nodeList) {
qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, (int32_t)taosArrayGetSize(pReq->pNodeList));
SCH_ERR_JRET(terrno);
}
} }
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
@ -785,7 +876,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
atomic_add_fetch_32(&schMgmt.jobNum, 1); (void)atomic_add_fetch_32(&schMgmt.jobNum, 1);
*pJobId = pJob->refId; *pJobId = pJob->refId;
@ -800,7 +891,7 @@ _return:
} else if (pJob->refId < 0) { } else if (pJob->refId < 0) {
schFreeJobImpl(pJob); schFreeJobImpl(pJob);
} else { } else {
taosRemoveRef(schMgmt.jobRef, pJob->refId); (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
} }
SCH_RET(code); SCH_RET(code);
@ -814,7 +905,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
if (pReq->syncReq) { if (pReq->syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem); (void)tsem_wait(&pJob->rspSem); // ignore error
} }
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
@ -845,7 +936,7 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
} }
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -866,6 +957,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int32_t numOfLevels = taosArrayGetSize(pJob->levels); int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) { for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i); SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pLevel->taskExecDoneNum = 0; pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0; pLevel->taskLaunchedNum = 0;
@ -873,6 +968,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) { for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK_TASK(pTask); SCH_LOCK_TASK(pTask);
code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode); code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -973,7 +1073,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int
} }
if (errCode) { if (errCode) {
schHandleJobFailure(pJob, errCode); (void)schHandleJobFailure(pJob, errCode); // handle internal
} }
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
@ -1052,11 +1152,11 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
} }
if (errCode) { if (errCode) {
schHandleJobFailure(pJob, errCode); (void)schHandleJobFailure(pJob, errCode); // ignore error
} }
if (pJob) { if (pJob) {
schReleaseJob(pJob->refId); (void)schReleaseJob(pJob->refId); // ignore error
} }
} }
@ -1091,7 +1191,7 @@ _return:
SCH_UNLOCK_TASK(pTask); SCH_UNLOCK_TASK(pTask);
} }
if (pJob) { if (pJob) {
schReleaseJob(rId); (void)schReleaseJob(rId); // ignore error
} }
SCH_RET(code); SCH_RET(code);

View File

@ -316,18 +316,18 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
} }
} }
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
*pTask = NULL;
int32_t s = taosHashGetSize(pTaskList); int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) { if (s <= 0) {
return TSDB_CODE_SUCCESS; return;
} }
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) { if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS; return;
} }
*pTask = *task; *pTask = *task;
return TSDB_CODE_SUCCESS;
} }

View File

@ -108,14 +108,26 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i); SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
qError("failed to get level %d", i);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t m = 0; m < pLevel->taskNum; ++m) { for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
if (NULL == pTask) {
qError("failed to get task %d, total: %d", m, pLevel->taskNum);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SQuerySubDesc subDesc = {0}; SQuerySubDesc subDesc = {0};
subDesc.tid = pTask->taskId; subDesc.tid = pTask->taskId;
strcpy(subDesc.status, jobTaskStatusStr(pTask->status)); TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status));
taosArrayPush(pSub, &subDesc); if (NULL == taosArrayPush(pSub, &subDesc)) {
qError("taosArrayPush task %d failed, error: %x, ", m, terrno);
SCH_ERR_JRET(terrno);
}
} }
} }
@ -141,7 +153,7 @@ int32_t schedulerUpdatePolicy(int32_t policy) {
qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy); qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy);
break; break;
default: default:
return TSDB_CODE_TSC_INVALID_INPUT; SCH_RET(TSDB_CODE_TSC_INVALID_INPUT);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -159,14 +171,14 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
SSchJob *pJob = schAcquireJob(*jobId); SSchJob *pJob = schAcquireJob(*jobId);
if (NULL == pJob) { if (NULL == pJob) {
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return; return;
} }
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
schHandleJobDrop(pJob, errCode); (void)schHandleJobDrop(pJob, errCode); // ignore any error
schReleaseJob(*jobId); (void)schReleaseJob(*jobId); // ignore error
*jobId = 0; *jobId = 0;
} }
@ -182,7 +194,7 @@ void schedulerDestroy(void) {
if (refId == 0) { if (refId == 0) {
break; break;
} }
taosRemoveRef(schMgmt.jobRef, pJob->refId); (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
pJob = taosIterateRef(schMgmt.jobRef, refId); pJob = taosIterateRef(schMgmt.jobRef, refId);
} }

View File

@ -789,6 +789,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open
TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory")
//AUDIT
TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_NOT_FORMAT_TO_JSON, "can't format to json")
TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD, "Failed to send out audit record")
TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_GENERATE_JSON, "Failed to generate json")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
#endif #endif