enh: add return code validation
This commit is contained in:
parent
9ed56939df
commit
cb48954ef3
|
@ -53,6 +53,8 @@ void taosPrintBackTrace();
|
||||||
void taosMemoryTrim(int32_t size);
|
void taosMemoryTrim(int32_t size);
|
||||||
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
|
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
|
||||||
|
|
||||||
|
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))
|
||||||
|
|
||||||
#define taosMemoryFreeClear(ptr) \
|
#define taosMemoryFreeClear(ptr) \
|
||||||
do { \
|
do { \
|
||||||
if (ptr) { \
|
if (ptr) { \
|
||||||
|
|
|
@ -59,6 +59,8 @@ typedef enum { M2C = 0, C2M } ConvType;
|
||||||
(dst)[(size)-1] = 0; \
|
(dst)[(size)-1] = 0; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
#define TAOS_STRCPY(_dst, _src) ((void)strcpy(_dst, _src))
|
||||||
|
|
||||||
char *tstrdup(const char *src);
|
char *tstrdup(const char *src);
|
||||||
int32_t taosUcs4len(TdUcs4 *ucs4);
|
int32_t taosUcs4len(TdUcs4 *ucs4);
|
||||||
int64_t taosStr2int64(const char *str);
|
int64_t taosStr2int64(const char *str);
|
||||||
|
|
|
@ -218,8 +218,8 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_IDS() sId, qId, tId, rId, eId
|
#define QW_IDS() sId, qId, tId, rId, eId
|
||||||
#define QW_FPARAMS() mgmt, QW_IDS()
|
#define QW_FPARAMS() mgmt, QW_IDS()
|
||||||
|
|
||||||
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
|
#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
|
||||||
#define QW_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
|
#define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)
|
||||||
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
|
#define QW_STAT_GET(_item) atomic_load_64(&(_item))
|
||||||
|
|
||||||
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
|
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
|
||||||
|
@ -250,7 +250,7 @@ typedef struct SQWorkerMgmt {
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
|
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
|
||||||
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
#define QW_UPDATE_RSP_CODE(ctx, code) (void)atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
||||||
|
|
||||||
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
||||||
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
|
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
|
||||||
|
|
|
@ -221,11 +221,11 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
epSet.inUse = 1;
|
epSet.inUse = 1;
|
||||||
epSet.numOfEps = 3;
|
epSet.numOfEps = 3;
|
||||||
strcpy(epSet.eps[0].fqdn, "localhost");
|
TAOS_STRCPY(epSet.eps[0].fqdn, "localhost");
|
||||||
epSet.eps[0].port = 7100;
|
epSet.eps[0].port = 7100;
|
||||||
strcpy(epSet.eps[1].fqdn, "localhost");
|
TAOS_STRCPY(epSet.eps[1].fqdn, "localhost");
|
||||||
epSet.eps[1].port = 7200;
|
epSet.eps[1].port = 7200;
|
||||||
strcpy(epSet.eps[2].fqdn, "localhost");
|
TAOS_STRCPY(epSet.eps[2].fqdn, "localhost");
|
||||||
epSet.eps[2].port = 7300;
|
epSet.eps[2].port = 7300;
|
||||||
|
|
||||||
ctx->phase = QW_PHASE_POST_QUERY;
|
ctx->phase = QW_PHASE_POST_QUERY;
|
||||||
|
|
|
@ -21,7 +21,7 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == *rsp) {
|
if (NULL == *rsp) {
|
||||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
}
|
}
|
||||||
|
|
||||||
*rsp = pRsp;
|
*rsp = pRsp;
|
||||||
|
@ -169,7 +169,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
QW_RET(terrno);
|
QW_RET(terrno);
|
||||||
}
|
}
|
||||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
dataLength = 0;
|
dataLength = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -342,7 +342,7 @@ int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
if (NULL == pExec) {
|
if (NULL == pExec) {
|
||||||
QW_ERR_JRET(terrno);
|
QW_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
(void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
||||||
localRsp.rsp.subplanInfo = pExec;
|
localRsp.rsp.subplanInfo = pExec;
|
||||||
localRsp.qId = qId;
|
localRsp.qId = qId;
|
||||||
localRsp.tId = tId;
|
localRsp.tId = tId;
|
||||||
|
@ -537,7 +537,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbFName[0] && tbName[0]) {
|
if (dbFName[0] && tbName[0]) {
|
||||||
sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName);
|
(void)sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName);
|
||||||
} else {
|
} else {
|
||||||
tbInfo.tbFName[0] = 0;
|
tbInfo.tbFName[0] = 0;
|
||||||
}
|
}
|
||||||
|
@ -613,7 +613,7 @@ void qwDestroyImpl(void *pMgmt) {
|
||||||
|
|
||||||
taosMemoryFree(mgmt);
|
taosMemoryFree(mgmt);
|
||||||
|
|
||||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
(void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||||
|
|
||||||
qwCloseRef();
|
qwCloseRef();
|
||||||
|
|
||||||
|
|
|
@ -452,8 +452,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
|
||||||
pRes->skey = pDelRes->skey;
|
pRes->skey = pDelRes->skey;
|
||||||
pRes->ekey = pDelRes->ekey;
|
pRes->ekey = pDelRes->ekey;
|
||||||
pRes->affectedRows = pDelRes->affectedRows;
|
pRes->affectedRows = pDelRes->affectedRows;
|
||||||
strcpy(pRes->tableFName, pDelRes->tableName);
|
TAOS_STRCPY(pRes->tableFName, pDelRes->tableName);
|
||||||
strcpy(pRes->tsColName, pDelRes->tsColName);
|
TAOS_STRCPY(pRes->tsColName, pDelRes->tsColName);
|
||||||
taosMemoryFree(output.pData);
|
taosMemoryFree(output.pData);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1126,8 +1126,8 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||||
sch->hbConnInfo.handle = NULL;
|
sch->hbConnInfo.handle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
|
(void)memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
|
||||||
memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
|
(void)memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
|
||||||
|
|
||||||
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
|
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
|
||||||
|
|
||||||
|
@ -1138,7 +1138,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
|
(void)memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
|
||||||
code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
|
code = qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -1299,19 +1299,19 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
||||||
|
|
||||||
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
|
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
|
||||||
if (1 == qwNum) {
|
if (1 == qwNum) {
|
||||||
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
|
TAOS_MEMSET(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = qwOpenRef();
|
int32_t code = qwOpenRef();
|
||||||
if (code) {
|
if (code) {
|
||||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
(void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
|
SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker));
|
||||||
if (NULL == mgmt) {
|
if (NULL == mgmt) {
|
||||||
qError("calloc %d failed", (int32_t)sizeof(SQWorker));
|
qError("calloc %d failed", (int32_t)sizeof(SQWorker));
|
||||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
(void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||||
QW_RET(TSDB_CODE_OUT_OF_MEMORY);
|
QW_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1345,7 +1345,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
||||||
if (pMsgCb) {
|
if (pMsgCb) {
|
||||||
mgmt->msgCb = *pMsgCb;
|
mgmt->msgCb = *pMsgCb;
|
||||||
} else {
|
} else {
|
||||||
memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
|
TAOS_MEMSET(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
|
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
|
||||||
|
@ -1379,7 +1379,7 @@ _return:
|
||||||
taosTmrCleanUp(mgmt->timer);
|
taosTmrCleanUp(mgmt->timer);
|
||||||
taosMemoryFreeClear(mgmt);
|
taosMemoryFreeClear(mgmt);
|
||||||
|
|
||||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
(void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
|
|
|
@ -170,7 +170,7 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
|
||||||
printf("malloc failed");
|
printf("malloc failed");
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
(void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
||||||
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
|
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
|
||||||
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
|
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
|
||||||
qwtTestFetchQueueWIdx = 0;
|
qwtTestFetchQueueWIdx = 0;
|
||||||
|
@ -199,7 +199,7 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
|
||||||
printf("malloc failed");
|
printf("malloc failed");
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
(void)memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
|
||||||
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
|
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
|
||||||
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
|
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
|
||||||
qwtTestQueryQueueWIdx = 0;
|
qwtTestQueryQueueWIdx = 0;
|
||||||
|
|
|
@ -583,7 +583,7 @@ char *schGetOpStr(SCH_OP_TYPE type);
|
||||||
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
||||||
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
||||||
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
|
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
|
||||||
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
|
void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
|
||||||
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
|
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
|
||||||
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
|
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
|
||||||
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
|
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
|
||||||
|
|
|
@ -252,7 +252,7 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
|
||||||
if (i < (taskNum - 1)) {
|
if (i < (taskNum - 1)) {
|
||||||
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
|
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
|
||||||
if (NULL == pLastTask) {
|
if (NULL == pLastTask) {
|
||||||
SCH_JOB_ELOG("fail to get the last task, num:%d", taosArrayGetSize(ctrl->taskList));
|
SCH_JOB_ELOG("fail to get the last task, num:%d", (int32_t)taosArrayGetSize(ctrl->taskList));
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -262,7 +262,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
||||||
|
|
||||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
|
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
|
||||||
if (NULL == pLevel) {
|
if (NULL == pLevel) {
|
||||||
SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", taosArrayGetSize(pJob->levels));
|
SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", (int32_t)taosArrayGetSize(pJob->levels));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
||||||
|
|
||||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
|
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
|
||||||
if (NULL == pLevel) {
|
if (NULL == pLevel) {
|
||||||
SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", taosArrayGetSize(pLevel->subTasks));
|
SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", (int32_t)taosArrayGetSize(pLevel->subTasks));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +424,7 @@ _return:
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
|
void schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
|
||||||
pRes->code = atomic_load_32(&pJob->errCode);
|
pRes->code = atomic_load_32(&pJob->errCode);
|
||||||
pRes->numOfRows = pJob->resNumOfRows;
|
pRes->numOfRows = pJob->resNumOfRows;
|
||||||
|
|
||||||
|
@ -436,8 +436,6 @@ int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes) {
|
||||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
|
||||||
SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));
|
SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
|
int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
|
||||||
|
@ -463,7 +461,7 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
|
||||||
if (NULL == *pData) {
|
if (NULL == *pData) {
|
||||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
|
||||||
if (NULL == rsp) {
|
if (NULL == rsp) {
|
||||||
SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", sizeof(SRetrieveTableRsp), terrno);
|
SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", (int32_t)sizeof(SRetrieveTableRsp), terrno);
|
||||||
SCH_ERR_JRET(terrno);
|
SCH_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -485,14 +483,11 @@ _return:
|
||||||
int32_t schNotifyUserExecRes(SSchJob *pJob) {
|
int32_t schNotifyUserExecRes(SSchJob *pJob) {
|
||||||
SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
|
SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
|
||||||
if (NULL == pRes) {
|
if (NULL == pRes) {
|
||||||
qError("malloc execResult %d failed, error: %x", sizeof(SExecResult), terrno);
|
qError("malloc execResult %d failed, error: %x", (int32_t)sizeof(SExecResult), terrno);
|
||||||
SCH_RET(terrno);
|
SCH_RET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = schDumpJobExecRes(pJob, pRes);
|
schDumpJobExecRes(pJob, pRes);
|
||||||
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) {
|
|
||||||
atomic_store_32(&pJob->errCode, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
|
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
|
||||||
(*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
|
(*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
|
||||||
|
@ -505,7 +500,7 @@ int32_t schNotifyUserFetchRes(SSchJob *pJob) {
|
||||||
void *pRes = NULL;
|
void *pRes = NULL;
|
||||||
|
|
||||||
int32_t code = schDumpJobFetchRes(pJob, &pRes);
|
int32_t code = schDumpJobFetchRes(pJob, &pRes);
|
||||||
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) {
|
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(&pJob->errCode)) {
|
||||||
atomic_store_32(&pJob->errCode, code);
|
atomic_store_32(&pJob->errCode, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -624,11 +619,11 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SSchLevel *pLevel = pTask->level;
|
SSchLevel *pLevel = pTask->level;
|
||||||
int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
|
int32_t doneNum = atomic_load_32(&pLevel->taskExecDoneNum);
|
||||||
if (doneNum == pLevel->taskNum) {
|
if (doneNum == pLevel->taskNum) {
|
||||||
atomic_sub_fetch_32(&pJob->levelIdx, 1);
|
(void)atomic_sub_fetch_32(&pJob->levelIdx, 1);
|
||||||
|
|
||||||
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
|
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
|
||||||
if (NULL == pLevel) {
|
if (NULL == pLevel) {
|
||||||
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels));
|
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,7 +693,7 @@ int32_t schLaunchJob(SSchJob *pJob) {
|
||||||
} else {
|
} else {
|
||||||
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
|
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
|
||||||
if (NULL == level) {
|
if (NULL == level) {
|
||||||
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels));
|
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, (int32_t)taosArrayGetSize(pJob->levels));
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -844,7 +839,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
||||||
} else {
|
} else {
|
||||||
pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
|
pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
|
||||||
if (NULL == pJob->nodeList) {
|
if (NULL == pJob->nodeList) {
|
||||||
qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, taosArrayGetSize(pReq->pNodeList));
|
qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, (int32_t)taosArrayGetSize(pReq->pNodeList));
|
||||||
SCH_ERR_JRET(terrno);
|
SCH_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -880,7 +875,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
||||||
SCH_ERR_JRET(terrno);
|
SCH_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_32(&schMgmt.jobNum, 1);
|
(void)atomic_add_fetch_32(&schMgmt.jobNum, 1);
|
||||||
|
|
||||||
*pJobId = pJob->refId;
|
*pJobId = pJob->refId;
|
||||||
|
|
||||||
|
@ -963,7 +958,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
|
||||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||||
if (NULL == pLevel) {
|
if (NULL == pLevel) {
|
||||||
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
|
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
pLevel->taskExecDoneNum = 0;
|
pLevel->taskExecDoneNum = 0;
|
||||||
|
@ -974,7 +969,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
|
||||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
||||||
if (NULL == pTask) {
|
if (NULL == pTask) {
|
||||||
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
|
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
|
@ -1077,7 +1072,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int
|
||||||
}
|
}
|
||||||
|
|
||||||
if (errCode) {
|
if (errCode) {
|
||||||
schHandleJobFailure(pJob, errCode);
|
(void)schHandleJobFailure(pJob, errCode); // handle internal
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
|
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
|
||||||
|
@ -1156,11 +1151,11 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (errCode) {
|
if (errCode) {
|
||||||
schHandleJobFailure(pJob, errCode);
|
(void)schHandleJobFailure(pJob, errCode); // ignore error
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pJob) {
|
if (pJob) {
|
||||||
schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId); // ignore error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1195,7 +1190,7 @@ _return:
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
}
|
}
|
||||||
if (pJob) {
|
if (pJob) {
|
||||||
schReleaseJob(rId);
|
(void)schReleaseJob(rId); // ignore error
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
|
|
|
@ -122,7 +122,7 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
|
||||||
|
|
||||||
SQuerySubDesc subDesc = {0};
|
SQuerySubDesc subDesc = {0};
|
||||||
subDesc.tid = pTask->taskId;
|
subDesc.tid = pTask->taskId;
|
||||||
strcpy(subDesc.status, jobTaskStatusStr(pTask->status));
|
TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status));
|
||||||
|
|
||||||
if (NULL == taosArrayPush(pSub, &subDesc)) {
|
if (NULL == taosArrayPush(pSub, &subDesc)) {
|
||||||
qError("taosArrayPush task %d failed, error: %x, ", m, terrno);
|
qError("taosArrayPush task %d failed, error: %x, ", m, terrno);
|
||||||
|
|
Loading…
Reference in New Issue