Merge pull request #26610 from taosdata/fix/TD-30966.sch.0

fix: scheduler return code
This commit is contained in:
dapan1121 2024-07-17 11:21:51 +08:00 committed by GitHub
commit aeb21a536e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 402 additions and 120 deletions

View File

@ -32,9 +32,9 @@ extern "C" {
#define TSWAP(a, b) \
do { \
char *__tmp = (char*)alloca(sizeof(a)); \
memcpy(__tmp, &(a), sizeof(a)); \
memcpy(&(a), &(b), sizeof(a)); \
memcpy(&(b), __tmp, sizeof(a)); \
(void)memcpy(__tmp, &(a), sizeof(a)); \
(void)memcpy(&(a), &(b), sizeof(a)); \
(void)memcpy(&(b), __tmp, sizeof(a)); \
} while (0)
#ifdef WINDOWS

View File

@ -54,6 +54,7 @@ void taosMemoryTrim(int32_t size);
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))
#define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n))
#define taosMemoryFreeClear(ptr) \
do { \

View File

@ -336,7 +336,7 @@ extern SSchedulerMgmt schMgmt;
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_UPDATE_REDIRECT_CODE(job, _code) (void)atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDIRECT_CODE(job, _code) \
(((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
@ -413,7 +413,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
taosArrayPush((_task)->profile.execTime, &us); \
(void)taosArrayPush((_task)->profile.execTime, &us); \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
@ -422,7 +422,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
(_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
int64_t* startus = (int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
if (NULL != startus) { \
(_task)->profile.waitTime += us - *startus; \
} \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
@ -430,7 +433,9 @@ extern SSchedulerMgmt schMgmt;
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
if (NULL != startts) { \
*startts = us - *startts; \
} \
(_task)->profile.endTs = us; \
} while (0)
@ -538,7 +543,7 @@ void schCleanClusterHb(void *pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
SSchJob *schAcquireJob(int64_t refId);
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchJob *pJob);
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
@ -578,7 +583,7 @@ int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
char *schDumpEpSet(SEpSet *pEpSet);
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes);
char *schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);

View File

@ -1165,7 +1165,9 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_
int8_t status = 0;
SSchTask *pTask = NULL;
SSchJob *pJob = schAcquireJob(rId);
SSchJob *pJob = NULL;
(void)schAcquireJob(rId, &pJob);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);

View File

@ -113,7 +113,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
}
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
(void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
@ -166,13 +166,27 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
if (NULL == pJob->execRes.res) {
code = terrno;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
}
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (rsp->pMeta) {
taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta);
if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) {
code = terrno;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
}
if (TSDB_CODE_SUCCESS != rsp->code) {
@ -264,7 +278,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
SCH_ERR_JRET(code);
}
atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
@ -275,7 +289,12 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
SSubmitRsp2 *sum = pJob->execRes.res;
sum->affectedRows += rsp->affectedRows;
if (sum->aCreateTbRsp) {
taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp);
if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) {
code = terrno;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_ERR_JRET(code);
}
taosArrayDestroy(rsp->aCreateTbRsp);
} else {
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
@ -313,10 +332,14 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
SDecoder coder = {0};
SVDeleteRsp rsp = {0};
tDecoderInit(&coder, msg, msgSize);
tDecodeSVDeleteRsp(&coder, &rsp);
if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
code = terrno;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
}
tDecoderClear(&coder);
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
}
@ -351,7 +374,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
taosMemoryFreeClear(msg);
@ -479,7 +502,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code);
// called if drop task rsp received code
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
@ -501,7 +524,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
qDebug("handle %p is broken", pMsg->handle);
@ -531,7 +554,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
if (code) {
qError("hb rsp error:%s", tstrerror(code));
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
SCH_ERR_JRET(code);
}
@ -585,9 +608,14 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
param->head.isHbParam = true;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
taosMemoryFree(param);
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
param->nodeEpId.nodeId = addr->nodeId;
SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn);
TAOS_STRCPY(param->nodeEpId.ep.fqdn, pEp->fqdn);
param->nodeEpId.ep.port = pEp->port;
param->pTrans = trans->pTrans;
*pParam = param;
@ -712,7 +740,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
int32_t code = 0;
memcpy(pDst, pSrc, sizeof(SRpcCtx));
TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx));
pDst->brokenVal.val = NULL;
pDst->args = NULL;
@ -760,7 +788,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pCtx->args) {
@ -877,7 +905,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(dst, pSrc, sizeof(*dst));
TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
*pDst = (SSchCallbackParamHeader *)dst;
return TSDB_CODE_SUCCESS;
@ -889,7 +917,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(dst, pSrc, sizeof(*dst));
TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
*pDst = (SSchCallbackParamHeader *)dst;
return TSDB_CODE_SUCCESS;
@ -904,7 +932,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pDst, pSrc, sizeof(*pSrc));
TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc));
pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
@ -948,6 +976,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
if (isHb && persistHandle && trans->pHandle == 0) {
trans->pHandle = rpcAllocHandle();
if (NULL == trans->pHandle) {
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno);
SCH_ERR_JRET(terrno);
}
}
if (pJob && pTask) {
@ -1000,7 +1032,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
@ -1013,7 +1045,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
SCH_LOCK(SCH_WRITE, &hb->lock);
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
memcpy(&trans, &hb->trans, sizeof(trans));
TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
@ -1039,7 +1071,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
addr.epSet.inUse = 0;
addr.epSet.numOfEps = 1;
memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
msg = NULL;
@ -1064,6 +1096,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to get condidateAddr, candidateIdx %d, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_JRET(terrno);
}
isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
addr->epSet.numOfEps);
@ -1082,7 +1119,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(msg, pTask->msg, msgSize);
TAOS_MEMCPY(msg, pTask->msg, msgSize);
break;
}
@ -1098,13 +1135,21 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
req.msg = pTask->msg;
req.source = pJob->source;
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno);
SCH_ERR_JRET(terrno);
}
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
tSerializeSVDeleteReq(msg, msgSize, &req);
msgSize = tSerializeSVDeleteReq(msg, msgSize, &req);
if (msgSize < 0) {
SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno);
SCH_ERR_JRET(terrno);
}
break;
}
case TDMT_SCH_QUERY:
@ -1221,7 +1266,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
req.sId = schMgmt.sId;
req.header.vgId = addr->nodeId;
req.epId.nodeId = addr->nodeId;
memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {

View File

@ -43,7 +43,7 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
break;
case JOB_TASK_STATUS_DROP:
schProcessOnJobDropped(pJob, *(int32_t*)param);
(void)schProcessOnJobDropped(pJob, *(int32_t*)param); // ignore error
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
@ -65,7 +65,8 @@ _return:
}
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob* pJob = schAcquireJob(jobId);
SSchJob* pJob = NULL;
(void)schAcquireJob(jobId, &pJob);
if (NULL == pJob) {
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS);
@ -90,7 +91,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
code = pJob->errCode;
}
schReleaseJob(pJob->refId);
(void)schReleaseJob(pJob->refId); // ignore error
return code;
}

View File

@ -165,7 +165,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
}
schUpdateTaskExecNode(pJob, pTask, handle, execId);
SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
@ -297,6 +297,10 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
if (NULL == parent) {
SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK(SCH_WRITE, &parent->planLock);
SDownstreamSourceNode source = {
@ -308,9 +312,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.fetchMsgType = SCH_FETCH_TYPE(pTask),
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
}
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
SCH_ERR_RET(code);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
@ -413,16 +422,16 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
schDropTaskOnExecNode(pJob, pTask);
if (pTask->delayTimer) {
taosTmrStopA(&pTask->delayTimer);
(void)taosTmrStopA(&pTask->delayTimer); // ignore error
}
taosHashClear(pTask->execNodes);
schRemoveTaskFromExecList(pJob, pTask);
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
schDeregisterTaskHb(pJob, pTask);
taosMemoryFreeClear(pTask->msg);
pTask->msgLen = 0;
pTask->lastMsgType = 0;
pTask->childReady = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
@ -443,11 +452,21 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
} else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
}
@ -476,7 +495,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask *pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild);
schDoTaskRedirect(pJob, pChild, NULL, rspCode);
(void)schDoTaskRedirect(pJob, pChild, NULL, rspCode); // error handled internal
SCH_UNLOCK_TASK(pChild);
}
@ -494,18 +513,23 @@ int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
(void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
}
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
(void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
int32_t childrenNum = taosArrayGetSize(pTask->children);
for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask *pChild = taosArrayGetP(pTask->children, i);
if (NULL == pChild) {
SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK_TASK(pChild);
pLevel = pChild->level;
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
(void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
(void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
SCH_UNLOCK_TASK(pChild);
}
@ -711,9 +735,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
(void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
schRemoveTaskFromExecList(pJob, pTask);
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
@ -724,6 +748,11 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_BIND_TASK(pTask)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_SWITCH_EPSET(addr);
} else {
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
@ -743,6 +772,11 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
for (int32_t i = 0; i < nodeNum; ++i) {
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
if (NULL == nload) {
SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SQueryNodeAddr *naddr = &nload->addr;
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
@ -810,6 +844,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
(int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
@ -817,18 +852,27 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
}
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
if (NULL == pAddr) {
SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
char *origEpset = schDumpEpSet(&pAddr->epSet);
char *newEpset = schDumpEpSet(pEpSet);
char *origEpset = NULL;
char *newEpset = NULL;
SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
_return:
taosMemoryFree(origEpset);
taosMemoryFree(newEpset);
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
return TSDB_CODE_SUCCESS;
return code;
}
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
@ -877,7 +921,6 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
}
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
if (size <= 0) {
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
@ -889,7 +932,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
if (nodeInfo->handle) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
void *pExecId = taosHashGetKey(nodeInfo, NULL);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);
(void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); // ignore error and continue
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
} else {
@ -939,6 +982,11 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *pStatus = taosArrayGet(pStatusList, i);
if (NULL == pStatus) {
qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
continue;
}
int32_t code = 0;
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
@ -983,10 +1031,15 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
if (NULL == localRsp) {
qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
continue;
}
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
pJob = schAcquireJob(localRsp->rId);
pJob = NULL;
(void)schAcquireJob(localRsp->rId, &pJob);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
localRsp->tId, localRsp->rId);
@ -996,7 +1049,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
schReleaseJob(pJob->refId);
(void)schReleaseJob(pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
@ -1006,7 +1059,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
}
schReleaseJob(pJob->refId);
(void)schReleaseJob(pJob->refId);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
localRsp->tId, code);
@ -1022,6 +1075,11 @@ _return:
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
if (NULL == localRsp) {
qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
continue;
}
tFreeSExplainRsp(&localRsp->rsp);
}
@ -1076,6 +1134,9 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
if (NULL == explainRes) {
SCH_ERR_RET(terrno);
}
}
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
@ -1097,7 +1158,9 @@ _return:
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
SSchJob *pJob = NULL;
(void)schAcquireJob(pCtx->jobRid, &pJob);
if (NULL == pJob) {
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
taosMemoryFree(param);
@ -1113,7 +1176,7 @@ int32_t schLaunchTaskImpl(void *param) {
int8_t status = 0;
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
(void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
pTask->retryTimes++;
pTask->waitRetry = false;
@ -1160,7 +1223,7 @@ _return:
SCH_UNLOCK_TASK(pTask);
}
schReleaseJob(pJob->refId);
(void)schReleaseJob(pJob->refId);
taosMemoryFree(param);
@ -1178,7 +1241,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
param->asyncLaunch = true;
taosAsyncExec(schLaunchTaskImpl, param, NULL);
SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
} else {
SCH_ERR_RET(schLaunchTaskImpl(param));
}
@ -1252,7 +1315,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
(void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
return TSDB_CODE_SUCCESS;
}
@ -1283,7 +1346,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
SCH_LOCK_TASK(pTask);
if (pTask->delayTimer) {
taosTmrStopA(&pTask->delayTimer);
(void)taosTmrStopA(&pTask->delayTimer);
}
schDropTaskOnExecNode(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
@ -1327,6 +1390,9 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
if (NULL == explainRes) {
SCH_ERR_RET(terrno);
}
}
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,

View File

@ -22,9 +22,14 @@
#include "tref.h"
#include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob** ppJob) {
qDebug("sch acquire jobId:0x%" PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
*ppJob = (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
if (NULL == *ppJob) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
@ -36,15 +41,16 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId);
}
char *schDumpEpSet(SEpSet *pEpSet) {
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
*ppRes = NULL;
if (NULL == pEpSet) {
return NULL;
return TSDB_CODE_SUCCESS;
}
int32_t maxSize = 1024;
char *str = taosMemoryMalloc(maxSize);
if (NULL == str) {
return NULL;
return terrno;
}
int32_t n = 0;
@ -54,7 +60,8 @@ char *schDumpEpSet(SEpSet *pEpSet) {
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
}
return str;
*ppRes = str;
return TSDB_CODE_SUCCESS;
}
char *schGetOpStr(SCH_OP_TYPE type) {
@ -73,7 +80,7 @@ char *schGetOpStr(SCH_OP_TYPE type) {
}
void schFreeHbTrans(SSchHbTrans *pTrans) {
rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
(void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
schFreeRpcCtx(&pTrans->rpcCtx);
}
@ -86,7 +93,7 @@ void schCleanClusterHb(void *pTrans) {
if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
(void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
}
hb = taosHashIterate(schMgmt.hbConnections, hb);
@ -109,7 +116,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
int64_t taskNum = atomic_load_64(&hb->taskNum);
if (taskNum <= 0) {
schFreeHbTrans(hb);
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
(void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
@ -165,7 +172,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
break;
}
atomic_add_fetch_64(&hb->taskNum, 1);
(void)atomic_add_fetch_64(&hb->taskNum, 1);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
@ -178,12 +185,17 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
return;
}
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
@ -197,7 +209,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
if (0 == taskNum) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
schRemoveHbConnection(pJob, pTask, &epId);
(void)schRemoveHbConnection(pJob, pTask, &epId);
} else {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
}
@ -211,12 +223,17 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp *pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
@ -240,7 +257,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
TAOS_MEMCPY(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
@ -256,7 +273,7 @@ void schCloseJobRef(void) {
}
if (schMgmt.jobRef >= 0) {
taosCloseRef(schMgmt.jobRef);
(void)taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
}

View File

@ -169,7 +169,8 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
return;
}
SSchJob *pJob = schAcquireJob(*jobId);
SSchJob *pJob = NULL;
(void)schAcquireJob(*jobId, &pJob);
if (NULL == pJob) {
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return;

View File

@ -75,9 +75,7 @@ int32_t schtStartFetch = 0;
void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
rpcInit();
tsAsyncLog = 0;
rpcInit();
qDebugFlag = 159;
strcpy(tsLogDir, TD_LOG_DIR_PATH);
@ -136,8 +134,13 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) {
tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret);
void *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return terrno;
}
tEncoderInit(&ec, (uint8_t *)msg, msgSize);
tEncodeSSubmitRsp2(&ec, &submitRsp);
if (tEncodeSSubmitRsp2(&ec, &submitRsp) < 0) {
return -1;
}
tEncoderClear(&ec);
*rspMsg = msg;
@ -152,11 +155,26 @@ void schtBuildQueryDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == scan) {
return;
}
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == merge) {
return;
}
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == scanPlan) {
return;
}
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == mergePlan) {
return;
}
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
@ -170,7 +188,13 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
if (NULL == scanPlan->pParents) {
return;
}
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == scanPlan->pNode) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
mergePlan->id.queryId = qId;
@ -181,21 +205,33 @@ void schtBuildQueryDag(SQueryPlan *dag) {
mergePlan->execNode.epSet.numOfEps = 0;
mergePlan->pChildren = nodesMakeList();
if (NULL == mergePlan->pChildren) {
return;
}
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == mergePlan->pNode) {
return;
}
mergePlan->msgType = TDMT_SCH_QUERY;
merge->pNodeList = nodesMakeList();
if (NULL == merge->pNodeList) {
return;
}
scan->pNodeList = nodesMakeList();
if (NULL == scan->pNodeList) {
return;
}
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode *)scan);
(void)nodesListAppend(dag->pSubplans, (SNode *)merge);
(void)nodesListAppend(dag->pSubplans, (SNode *)scan);
}
void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
@ -205,18 +241,42 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == scan) {
return;
}
SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == merge) {
return;
}
SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == mergePlan) {
return;
}
merge->pNodeList = nodesMakeList();
if (NULL == merge->pNodeList) {
return;
}
scan->pNodeList = nodesMakeList();
if (NULL == scan->pNodeList) {
return;
}
mergePlan->pChildren = nodesMakeList();
if (NULL == mergePlan->pChildren) {
return;
}
for (int32_t i = 0; i < scanPlanNum; ++i) {
SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == scanPlan) {
return;
}
scanPlan->id.queryId = qId;
scanPlan->id.groupId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003 + i;
@ -233,13 +293,19 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = nodesMakeList();
if (NULL == scanPlan->pParents) {
return;
}
scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == scanPlan->pNode) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
}
mergePlan->id.queryId = qId;
@ -251,12 +317,15 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == mergePlan->pNode) {
return;
}
mergePlan->msgType = TDMT_SCH_QUERY;
nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
nodesListAppend(dag->pSubplans, (SNode *)merge);
nodesListAppend(dag->pSubplans, (SNode *)scan);
(void)nodesListAppend(dag->pSubplans, (SNode *)merge);
(void)nodesListAppend(dag->pSubplans, (SNode *)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {}
@ -267,10 +336,22 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = nodesMakeList();
if (NULL == dag->pSubplans) {
return;
}
SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST);
if (NULL == inserta) {
return;
}
inserta->pNodeList = nodesMakeList();
if (NULL == inserta->pNodeList) {
return;
}
SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == insertPlan) {
return;
}
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
@ -286,13 +367,22 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
if (NULL == insertPlan->pDataSink) {
return;
}
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) {
return;
}
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
(void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
if (NULL == insertPlan) {
return;
}
insertPlan->id.queryId = qId;
insertPlan->id.groupId = 0x0000000000000003;
@ -308,22 +398,31 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan->pParents = NULL;
insertPlan->pNode = NULL;
insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT);
if (NULL == insertPlan->pDataSink) {
return;
}
((SDataInserterNode *)insertPlan->pDataSink)->size = 1;
((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1);
if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) {
return;
}
insertPlan->msgType = TDMT_VND_SUBMIT;
nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
(void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan);
nodesListAppend(dag->pSubplans, (SNode *)inserta);
(void)nodesListAppend(dag->pSubplans, (SNode *)inserta);
}
int32_t schtPlanToString(const SSubplan *subplan, char **str, int32_t *len) {
*str = (char *)taosMemoryCalloc(1, 20);
if (NULL == *str) {
return -1;
}
*len = 20;
return 0;
}
void schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) {}
int32_t schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) { return 0; }
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {}
@ -431,7 +530,10 @@ void *schtSendRsp(void *param) {
taosMsleep(1);
}
pJob = schAcquireJob(job);
code = schAcquireJob(job, &pJob);
if (code) {
return NULL;
}
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
@ -439,16 +541,16 @@ void *schtSendRsp(void *param) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildSubmitRspMsg(&msg.len, &rmsg);
(void)schtBuildSubmitRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_VND_SUBMIT_RSP;
msg.pData = rmsg;
schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
(void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
schReleaseJob(job);
(void)schReleaseJob(job);
schtJobDone = true;
@ -457,20 +559,25 @@ void *schtSendRsp(void *param) {
void *schtCreateFetchRspThread(void *param) {
int64_t job = *(int64_t *)param;
SSchJob *pJob = schAcquireJob(job);
SSchJob *pJob = NULL;
(void)schAcquireJob(job, &pJob);
if (NULL == pJob) {
return NULL;
}
taosSsleep(1);
int32_t code = 0;
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildFetchRspMsg(&msg.len, &rmsg);
(void)schtBuildFetchRspMsg(&msg.len, &rmsg);
msg.msgType = TDMT_SCH_MERGE_FETCH_RSP;
msg.pData = rmsg;
code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0);
schReleaseJob(job);
(void)schReleaseJob(job);
assert(code == 0);
return NULL;
@ -488,12 +595,17 @@ void *schtFetchRspThread(void *aa) {
taosUsleep(100);
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
return NULL;
}
param->queryId = schtQueryId;
param->taskId = schtFetchTaskId;
int32_t code = 0;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (NULL == rsp) {
return NULL;
}
rsp->completed = 1;
rsp->numOfRows = 10;
@ -549,12 +661,17 @@ void *schtRunJobThread(void *aa) {
schtBuildQueryDag(dag);
SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
if (NULL == qnodeList) {
assert(0);
}
SQueryNodeLoad load = {0};
load.addr.epSet.numOfEps = 1;
strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep");
load.addr.epSet.eps[0].port = 6031;
taosArrayPush(qnodeList, &load);
if (NULL == taosArrayPush(qnodeList, &load)) {
assert(0);
}
queryDone = 0;
@ -572,7 +689,9 @@ void *schtRunJobThread(void *aa) {
code = schedulerExecJob(&req, &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
pJob = NULL;
code = schAcquireJob(queryJobRefId, &pJob);
if (NULL == pJob) {
taosArrayDestroy(qnodeList);
schtFreeQueryDag(dag);
@ -580,16 +699,24 @@ void *schtRunJobThread(void *aa) {
}
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == execTasks) {
assert(0);
}
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schtFetchTaskId = task->taskId - 1;
taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task));
if (taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task))) {
assert(0);
}
pIter = taosHashIterate(pJob->execTasks, pIter);
}
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
assert(0);
}
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
@ -601,7 +728,9 @@ void *schtRunJobThread(void *aa) {
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
if (schtBuildQueryRspMsg(&msg.len, &rmsg)) {
assert(0);
}
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -612,6 +741,9 @@ void *schtRunJobThread(void *aa) {
}
param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param));
if (NULL == param) {
assert(0);
}
param->refId = queryJobRefId;
param->queryId = pJob->queryId;
@ -622,7 +754,9 @@ void *schtRunJobThread(void *aa) {
param->taskId = task->taskId - 1;
SDataBuf msg = {0};
void *rmsg = NULL;
schtBuildQueryRspMsg(&msg.len, &rmsg);
if (schtBuildQueryRspMsg(&msg.len, &rmsg)) {
assert(0);
}
msg.msgType = TDMT_SCH_QUERY_RSP;
msg.pData = rmsg;
@ -728,7 +862,9 @@ TEST(queryTest, normalCase) {
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
SSchJob *pJob = NULL;
code = schAcquireJob(job, &pJob);
ASSERT_EQ(code, 0);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
@ -839,7 +975,9 @@ TEST(queryTest, readyFirstCase) {
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
SSchJob *pJob = NULL;
code = schAcquireJob(job, &pJob);
ASSERT_EQ(code, 0);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
@ -956,7 +1094,9 @@ TEST(queryTest, flowCtrlCase) {
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
SSchJob *pJob = NULL;
code = schAcquireJob(job, &pJob);
ASSERT_EQ(code, 0);
while (!queryDone) {
void *pIter = taosHashIterate(pJob->execTasks, NULL);
@ -1094,13 +1234,17 @@ TEST(otherTest, otherCase) {
schReleaseJob(0);
schFreeRpcCtx(NULL);
ASSERT_EQ(schDumpEpSet(NULL), (char *)NULL);
char* ep = NULL;
ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS);
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
}
int main(int argc, char **argv) {
schtInitLogFile();
if (rpcInit()) {
assert(0);
}
taosSeedRand(taosGetTimestampSec());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();