enh: add return code processing
This commit is contained in:
parent
a82b0b805d
commit
a8c39cdd11
|
@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
|
|||
|
||||
if (pEpSet) {
|
||||
contLen = tSerializeSEpSet(NULL, 0, pEpSet);
|
||||
if (contLen < 0) {
|
||||
qError("tSerializeSEpSet failed, code:%x", terrno);
|
||||
return terrno;
|
||||
}
|
||||
rsp = rpcMallocCont(contLen);
|
||||
tSerializeSEpSet(rsp, contLen, pEpSet);
|
||||
if (NULL == rsp) {
|
||||
qError("rpcMallocCont %d failed, code:%x", contLen, terrno);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
contLen = tSerializeSEpSet(rsp, contLen, pEpSet);
|
||||
if (contLen < 0) {
|
||||
qError("tSerializeSEpSet second failed, code:%x", terrno);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) {
|
|||
epSet.eps[2].port = 7300;
|
||||
|
||||
ctx->phase = QW_PHASE_POST_QUERY;
|
||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet);
|
||||
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error
|
||||
*rsped = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
|
||||
QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY);
|
||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
|
||||
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
|
||||
*rsped = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) {
|
||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL);
|
||||
(void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error
|
||||
*rsped = true;
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
|||
int32_t code) {
|
||||
if (NULL == pRsp) {
|
||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
if (NULL == pRsp) {
|
||||
QW_RET(terrno);
|
||||
}
|
||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||
dataLength = 0;
|
||||
}
|
||||
|
@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve
|
|||
#if 0
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||
if (NULL == pRsp) {
|
||||
QW_RET(terrno);
|
||||
}
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
|
|||
|
||||
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
|
||||
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
||||
if (NULL == pRsp) {
|
||||
QW_RET(terrno);
|
||||
}
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
SSubQueryMsg msg = {0};
|
||||
if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
|
||||
|
@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
int32_t eId = msg.execId;
|
||||
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
qwAbortPrerocessQuery(QW_FPARAMS());
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle);
|
||||
code = qwAbortPrerocessQuery(QW_FPARAMS());
|
||||
QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
|
||||
|
||||
tFreeSSubQueryMsg(&msg);
|
||||
|
||||
|
@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
int32_t code = 0;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
|
||||
|
||||
SSubQueryMsg msg = {0};
|
||||
|
@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
SQWTaskCtx *handles = NULL;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
|
@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
SResFetchReq req = {0};
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
|
||||
|
||||
if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
|
@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
|
||||
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
|
||||
int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg);
|
||||
|
||||
QW_SCH_TASK_DLOG("processFetch end, node:%p", node);
|
||||
QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
if (mgmt) {
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
|
||||
}
|
||||
|
||||
|
@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
int32_t code = 0;
|
||||
STaskCancelReq *msg = pMsg->pCont;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
|
||||
|
||||
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
|
||||
|
@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
|||
int32_t code = 0;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
|
||||
|
||||
STaskDropReq msg = {0};
|
||||
|
@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
|
|||
|
||||
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
|
||||
code = qwProcessDrop(QW_FPARAMS(), &qwMsg);
|
||||
|
||||
QW_SCH_TASK_DLOG("processDrop end, node:%p", node);
|
||||
QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
int32_t code = 0;
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
|
||||
|
||||
STaskNotifyReq msg = {0};
|
||||
|
@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
|
|||
|
||||
QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg));
|
||||
code = qwProcessNotify(QW_FPARAMS(), &qwMsg);
|
||||
|
||||
QW_SCH_TASK_DLOG("processNotify end, node:%p", node);
|
||||
QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
|||
SSchedulerHbReq req = {0};
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
|
||||
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
|
||||
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
|
||||
|
||||
if (NULL == pMsg->pCont) {
|
||||
|
@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
|||
|
||||
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||
|
||||
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
|
||||
code = qwProcessHb(mgmt, &qwMsg, &req);
|
||||
|
||||
QW_SCH_DLOG("processHb end, node:%p", node);
|
||||
QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD
|
|||
|
||||
QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
|
||||
|
||||
tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req);
|
||||
QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req));
|
||||
|
||||
uint64_t sId = req.sId;
|
||||
uint64_t qId = req.queryId;
|
||||
|
|
|
@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) {
|
|||
|
||||
|
||||
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||
|
||||
ctx->explainRsped = true;
|
||||
|
||||
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
|
||||
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
|
||||
if (NULL == execInfoList) {
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList));
|
||||
|
||||
if (ctx->localExec) {
|
||||
SExplainLocalRsp localRsp = {0};
|
||||
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
|
||||
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
|
||||
if (NULL == pExec) {
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
|
||||
localRsp.rsp.subplanInfo = pExec;
|
||||
localRsp.qId = qId;
|
||||
localRsp.tId = tId;
|
||||
localRsp.rId = rId;
|
||||
localRsp.eId = eId;
|
||||
taosArrayPush(ctx->explainRes, &localRsp);
|
||||
if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) {
|
||||
QW_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
taosArrayDestroy(execInfoList);
|
||||
execInfoList = NULL;
|
||||
} else {
|
||||
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
|
||||
connInfo.ahandle = NULL;
|
||||
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
|
||||
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
|
||||
execInfoList = NULL;
|
||||
|
||||
QW_ERR_RET(code);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_return:
|
||||
|
||||
taosArrayDestroyEx(execInfoList, freeExplainExecItem);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
|
|||
void qwCloseRef(void) {
|
||||
taosWLockLatch(&gQwMgmt.lock);
|
||||
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
|
||||
taosCloseRef(gQwMgmt.qwRef);
|
||||
(void)taosCloseRef(gQwMgmt.qwRef); // ignore error
|
||||
gQwMgmt.qwRef = -1;
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
|
@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) {
|
|||
int32_t schStatusCount = 0;
|
||||
qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
|
||||
|
||||
taosTmrStop(mgmt->hbTimer);
|
||||
(void)taosTmrStop(mgmt->hbTimer); //ignore error
|
||||
mgmt->hbTimer = NULL;
|
||||
taosTmrCleanUp(mgmt->timer);
|
||||
|
||||
|
@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
|||
return pStat->num ? (pStat->total / pStat->num) : 0;
|
||||
default:
|
||||
qError("unsupported queue type %d", type);
|
||||
break;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t num = taosArrayGetSize(pExpiredSch);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
uint64_t *sId = taosArrayGet(pExpiredSch, i);
|
||||
SQWSchStatus *pSch = NULL;
|
||||
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
|
||||
if (NULL == sId) {
|
||||
qError("get the %dth sch failed, code:%x", i, terrno);
|
||||
break;
|
||||
}
|
||||
|
||||
code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (taosHashGetSize(pSch->tasksHash) <= 0) {
|
||||
qwDestroySchStatus(pSch);
|
||||
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
|
||||
qDebug("sch %" PRIx64 " destroyed", *sId);
|
||||
code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
|
||||
qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code);
|
||||
}
|
||||
|
||||
qwReleaseScheduler(QW_WRITE, mgmt);
|
||||
|
|
|
@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
|||
_return:
|
||||
|
||||
if (mgmt->refId >= 0) {
|
||||
qwRelease(mgmt->refId); // ignore error
|
||||
(void)qwRelease(mgmt->refId); // ignore error
|
||||
} else {
|
||||
taosHashCleanup(mgmt->schHash);
|
||||
taosHashCleanup(mgmt->ctxHash);
|
||||
|
|
Loading…
Reference in New Issue