diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index dd3df27866..4fa69f1b4f 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -50,7 +50,7 @@ typedef struct { int32_t auditInit(const SAuditCfg *pCfg); void auditCleanup(); -void auditSend(SJson *pJson); +int32_t auditSend(SJson *pJson); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 1e2ccf8705..8ab3b898ca 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,8 +23,6 @@ extern "C" { #include "catalog.h" #include "planner.h" -extern tsem_t schdRspSem; - typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -101,8 +99,6 @@ void schedulerFreeJob(int64_t* job, int32_t errCode); void schedulerDestroy(void); -void schdExecCallback(SExecResult* pResult, void* param, int32_t code); - #ifdef __cplusplus } #endif diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 683d10e926..6166f1dc07 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -53,6 +53,8 @@ void taosPrintBackTrace(); void taosMemoryTrim(int32_t size); void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); +#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n)) + #define taosMemoryFreeClear(ptr) \ do { \ if (ptr) { \ diff --git a/include/os/osString.h b/include/os/osString.h index ac7dd7eda8..5fa5a25c6d 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -59,6 +59,8 @@ typedef enum { M2C = 0, C2M } ConvType; (dst)[(size)-1] = 0; \ } while (0) +#define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src)) + char *tstrdup(const char *src); int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 900d3eedad..053bb20e2a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -942,6 +942,11 @@ int32_t taosGetErrSize(); // UTIL #define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000) +// AUDIT +#define TSDB_CODE_AUDIT_NOT_FORMAT_TO_JSON TAOS_DEF_ERROR_CODE(0, 0x6100) +#define TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD TAOS_DEF_ERROR_CODE(0, 0x6101) +#define TSDB_CODE_AUDIT_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0x6102) + #ifdef __cplusplus } #endif diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index aa3b669c1b..7a8de49abe 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -32,8 +32,8 @@ char* tsAuditBatchUri = "/audit-batch"; int32_t auditInit(const SAuditCfg *pCfg) { tsAudit.cfg = *pCfg; tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *)); - taosThreadMutexInit(&tsAudit.lock, NULL); - return 0; + if(tsAudit.records == NULL) return TSDB_CODE_OUT_OF_MEMORY; + return taosThreadMutexInit(&tsAudit.lock, NULL); } static FORCE_INLINE void auditDeleteRecord(SAuditRecord * record) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index dfd00dc5a7..93bbc44a25 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -218,8 +218,8 @@ typedef struct SQWorkerMgmt { #define QW_IDS() sId, qId, tId, rId, eId #define QW_FPARAMS() mgmt, QW_IDS() -#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) -#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n) +#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) +#define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n) #define QW_STAT_GET(_item) atomic_load_64(&(_item)) #define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) @@ -250,7 +250,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) -#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) +#define QW_UPDATE_RSP_CODE(ctx, code) (void)atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index b7a4b718e2..d3b8d36b25 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -221,11 +221,11 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { SEpSet epSet = {0}; epSet.inUse = 1; epSet.numOfEps = 3; - strcpy(epSet.eps[0].fqdn, "localhost"); + TAOS_STRCPY(epSet.eps[0].fqdn, "localhost"); epSet.eps[0].port = 7100; - strcpy(epSet.eps[1].fqdn, "localhost"); + TAOS_STRCPY(epSet.eps[1].fqdn, "localhost"); epSet.eps[1].port = 7200; - strcpy(epSet.eps[2].fqdn, "localhost"); + TAOS_STRCPY(epSet.eps[2].fqdn, "localhost"); epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 9f4a540f3c..f84e5e463e 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -21,7 +21,7 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r } if (NULL == *rsp) { - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp)); } *rsp = pRsp; @@ -169,7 +169,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve if (NULL == pRsp) { QW_RET(terrno); } - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp)); dataLength = 0; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 0451532cbe..0b404ec176 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -342,7 +342,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (NULL == pExec) { QW_ERR_JRET(terrno); } - memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); + (void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; localRsp.tId = tId; @@ -537,7 +537,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { } if (dbFName[0] && tbName[0]) { - sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName); + (void)sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName); } else { tbInfo.tbFName[0] = 0; } @@ -613,7 +613,7 @@ void qwDestroyImpl(void *pMgmt) { taosMemoryFree(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); qwCloseRef(); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5840cc0245..6e684545f7 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -452,8 +452,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes pRes->skey = pDelRes->skey; pRes->ekey = pDelRes->ekey; pRes->affectedRows = pDelRes->affectedRows; - strcpy(pRes->tableFName, pDelRes->tableName); - strcpy(pRes->tsColName, pDelRes->tsColName); + TAOS_STRCPY(pRes->tableFName, pDelRes->tableName); + TAOS_STRCPY(pRes->tsColName, pDelRes->tsColName); taosMemoryFree(output.pData); return TSDB_CODE_SUCCESS; @@ -1126,8 +1126,8 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { sch->hbConnInfo.handle = NULL; } - memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); - memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); + (void)memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); + (void)memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); QW_UNLOCK(QW_WRITE, &sch->hbConnLock); @@ -1138,7 +1138,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: - memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + (void)memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); if (code) { @@ -1299,19 +1299,19 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); if (1 == qwNum) { - memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); + TAOS_MEMSET(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); } int32_t code = qwOpenRef(); if (code) { - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); QW_RET(code); } SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker)); if (NULL == mgmt) { qError("calloc %d failed", (int32_t)sizeof(SQWorker)); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); QW_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -1345,7 +1345,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S if (pMsgCb) { mgmt->msgCb = *pMsgCb; } else { - memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); + TAOS_MEMSET(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); } mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); @@ -1379,7 +1379,7 @@ _return: taosTmrCleanUp(mgmt->timer); taosMemoryFreeClear(mgmt); - atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); } QW_RET(code); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index d292b271c5..91e1dda2ac 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -170,7 +170,7 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { printf("malloc failed"); assert(0); } - memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); + (void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { qwtTestFetchQueueWIdx = 0; @@ -199,7 +199,7 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { printf("malloc failed"); assert(0); } - memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); + (void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { qwtTestQueryQueueWIdx = 0; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index c7f6c20b7d..17476072bb 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -583,7 +583,7 @@ char *schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); -int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes); +void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet); int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode); void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode); @@ -606,7 +606,7 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); -int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); +void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode); diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index d6c2b638b8..deaaaec7c1 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -16,17 +16,5 @@ #include "query.h" #include "schInt.h" -tsem_t schdRspSem; SSchDebug gSCHDebug = {0}; -void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { - if (code) { - pResult->code = code; - } - - *(SExecResult*)param = *pResult; - - taosMemoryFree(pResult); - - tsem_post(&schdRspSem); -} diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 8c2b65e125..20e1600737 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -50,6 +50,10 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); for (int32_t i = 0; i < taskNum; ++i) { SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); + if (NULL == pTask) { + SCH_JOB_DLOG("fail to get the %dth task", i); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } sum += pTask->plan->execNodeStat.tableNum; } @@ -214,6 +218,10 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { for (int32_t i = 0; i < taskNum; ++i) { pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task", i); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { @@ -243,6 +251,11 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { if (i < (taskNum - 1)) { SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); + if (NULL == pLastTask) { + SCH_JOB_ELOG("fail to get the last task, num:%d", (int32_t)taosArrayGetSize(ctrl->taskList)); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (remainNum < pLastTask->plan->execNodeStat.tableNum) { SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d", ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 278768981a..a8e2e79aee 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -47,6 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { return; _return: + SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode)); } @@ -166,9 +167,18 @@ _return: int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSubplan *pPlan = pTask->plan; int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; @@ -188,6 +198,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t n = 0; n < childNum; ++n) { SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); + if (NULL == child) { + SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -222,6 +237,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t n = 0; n < parentNum; ++n) { SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); + if (NULL == parent) { + SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -241,6 +261,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (SCH_IS_QUERY_JOB(pJob)) { if (pLevel->taskNum > 1) { SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); @@ -248,6 +273,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) { pJob->attr.needFetch = true; } @@ -261,7 +291,9 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - taosArrayPush(pJob->dataSrcTasks, &pTask); + if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) { + SCH_ERR_RET(terrno); + } return TSDB_CODE_SUCCESS; } @@ -318,6 +350,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pLevel->level = i; plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); @@ -342,6 +379,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); + if (NULL == plan) { + SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } SCH_SET_JOB_TYPE(pJob, plan->subplanType); @@ -383,7 +424,7 @@ _return: SCH_RET(code); } -int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) { +void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; @@ -395,8 +436,6 @@ int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) { SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code)); - - return TSDB_CODE_SUCCESS; } int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { @@ -421,10 +460,13 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { if (NULL == *pData) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - if (rsp) { - rsp->completed = 1; + if (NULL == rsp) { + SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno); + SCH_ERR_JRET(terrno); } + rsp->completed = 1; + *pData = rsp; SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } @@ -440,10 +482,13 @@ _return: int32_t schNotifyUserExecRes(SSchJob *pJob) { SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult)); - if (pRes) { - schDumpJobExecRes(pJob, pRes); + if (NULL == pRes) { + qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno); + SCH_RET(terrno); } + schDumpJobExecRes(pJob, pRes); + SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode)); @@ -454,7 +499,10 @@ int32_t schNotifyUserExecRes(SSchJob *pJob) { int32_t schNotifyUserFetchRes(SSchJob *pJob) { void *pRes = NULL; - schDumpJobFetchRes(pJob, &pRes); + int32_t code = schDumpJobFetchRes(pJob, &pRes); + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) { + atomic_store_32(&pJob->errCode, code); + } SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode)); (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); @@ -478,13 +526,13 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { if (SCH_JOB_IN_SYNC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - tsem_post(&pJob->rspSem); + (void)tsem_post(&pJob->rspSem); // ignore error } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - schNotifyUserExecRes(pJob); + (void)schNotifyUserExecRes(pJob); // ignore error } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - schNotifyUserFetchRes(pJob); + (void)schNotifyUserFetchRes(pJob); // ignore error } else { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status)); @@ -519,7 +567,8 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); + (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); // ignore error + return TSDB_CODE_SCH_IGNORE_ERROR; } @@ -530,7 +579,8 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); + (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); // ignore error + return TSDB_CODE_SCH_IGNORE_ERROR; } @@ -569,12 +619,21 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { SSchLevel *pLevel = pTask->level; int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum); if (doneNum == pLevel->taskNum) { - atomic_sub_fetch_32(&pJob->levelIdx, 1); + (void)atomic_sub_fetch_32(&pJob->levelIdx, 1); pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + for (int32_t i = 0; i < pLevel->taskNum; ++i) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); - + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (pTask->children && taosArrayGetSize(pTask->children) > 0) { continue; } @@ -602,7 +661,11 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { } } - taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo)); + if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo))) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_ERR_RET(terrno); + } + taosArrayDestroy(rsp->tbVerInfo); pJob->execRes.msgType = TDMT_SCH_QUERY; @@ -629,6 +692,11 @@ int32_t schLaunchJob(SSchJob *pJob) { SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } else { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + if (NULL == level) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); } @@ -661,10 +729,19 @@ void schFreeJobImpl(void *job) { int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels); + continue; + } int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks); + continue; + } + schFreeTask(pJob, pTask); } @@ -687,12 +764,12 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); qDestroyQueryPlan(pJob->pDag); - nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); + (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); // ignore error taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->sql); - tsem_destroy(&pJob->rspSem); + (void)tsem_destroy(&pJob->rspSem); // ignore error taosMemoryFree(pJob); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); @@ -711,7 +788,7 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - tsem_wait(&pJob->rspSem); + (void)tsem_wait(&pJob->rspSem); // ignore error SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } } else { @@ -739,9 +816,19 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->conn = *pReq->pConn; if (pReq->sql) { pJob->sql = taosStrdup(pReq->sql); + if (NULL == pJob->sql) { + qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql); + SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } } pJob->pDag = pReq->pDag; - pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); + if (pReq->allocatorRefId > 0) { + pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); + if (pJob->allocatorRefId <= 0) { + qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId); + SCH_ERR_JRET(terrno); + } + } pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; @@ -752,6 +839,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); } else { pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL); + if (NULL == pJob->nodeList) { + qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, (int32_t)taosArrayGetSize(pReq->pNodeList)); + SCH_ERR_JRET(terrno); + } } pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, @@ -785,7 +876,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { SCH_ERR_JRET(terrno); } - atomic_add_fetch_32(&schMgmt.jobNum, 1); + (void)atomic_add_fetch_32(&schMgmt.jobNum, 1); *pJobId = pJob->refId; @@ -800,7 +891,7 @@ _return: } else if (pJob->refId < 0) { schFreeJobImpl(pJob); } else { - taosRemoveRef(schMgmt.jobRef, pJob->refId); + (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error } SCH_RET(code); @@ -814,7 +905,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { if (pReq->syncReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - tsem_wait(&pJob->rspSem); + (void)tsem_wait(&pJob->rspSem); // ignore error } SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); @@ -845,7 +936,7 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC)); } return TSDB_CODE_SUCCESS; @@ -866,6 +957,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; @@ -873,6 +968,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_LOCK_TASK(pTask); code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode); if (TSDB_CODE_SUCCESS != code) { @@ -973,7 +1073,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int } if (errCode) { - schHandleJobFailure(pJob, errCode); + (void)schHandleJobFailure(pJob, errCode); // handle internal } SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); @@ -1052,11 +1152,11 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { } if (errCode) { - schHandleJobFailure(pJob, errCode); + (void)schHandleJobFailure(pJob, errCode); // ignore error } if (pJob) { - schReleaseJob(pJob->refId); + (void)schReleaseJob(pJob->refId); // ignore error } } @@ -1091,7 +1191,7 @@ _return: SCH_UNLOCK_TASK(pTask); } if (pJob) { - schReleaseJob(rId); + (void)schReleaseJob(rId); // ignore error } SCH_RET(code); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 82b2e021af..689c98d395 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -316,18 +316,18 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { } } -int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { +void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { + *pTask = NULL; + int32_t s = taosHashGetSize(pTaskList); if (s <= 0) { - return TSDB_CODE_SUCCESS; + return; } SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); if (NULL == task || NULL == (*task)) { - return TSDB_CODE_SUCCESS; + return; } *pTask = *task; - - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fc92be8214..8f85e066cd 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -108,14 +108,26 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) { for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + qError("failed to get level %d", i); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + if (NULL == pTask) { + qError("failed to get task %d, total: %d", m, pLevel->taskNum); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SQuerySubDesc subDesc = {0}; subDesc.tid = pTask->taskId; - strcpy(subDesc.status, jobTaskStatusStr(pTask->status)); + TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status)); - taosArrayPush(pSub, &subDesc); + if (NULL == taosArrayPush(pSub, &subDesc)) { + qError("taosArrayPush task %d failed, error: %x, ", m, terrno); + SCH_ERR_JRET(terrno); + } } } @@ -141,7 +153,7 @@ int32_t schedulerUpdatePolicy(int32_t policy) { qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy); break; default: - return TSDB_CODE_TSC_INVALID_INPUT; + SCH_RET(TSDB_CODE_TSC_INVALID_INPUT); } return TSDB_CODE_SUCCESS; @@ -159,14 +171,14 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { SSchJob *pJob = schAcquireJob(*jobId); if (NULL == pJob) { - qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); + qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; } SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); - schHandleJobDrop(pJob, errCode); + (void)schHandleJobDrop(pJob, errCode); // ignore any error - schReleaseJob(*jobId); + (void)schReleaseJob(*jobId); // ignore error *jobId = 0; } @@ -182,7 +194,7 @@ void schedulerDestroy(void) { if (refId == 0) { break; } - taosRemoveRef(schMgmt.jobRef, pJob->refId); + (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error pJob = taosIterateRef(schMgmt.jobRef, refId); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 347f0be4ff..ab50b83937 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -789,6 +789,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") +//AUDIT +TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_NOT_FORMAT_TO_JSON, "can't format to json") +TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_SEND_AUDIT_RECORD, "Failed to send out audit record") +TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_FAIL_GENERATE_JSON, "Failed to generate json") #ifdef TAOS_ERROR_C }; #endif