From 7ea13edca89f23c1b56b4208f81e4ccd086afa56 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 24 Mar 2022 11:46:32 +0800 Subject: [PATCH] feature/scheduler --- source/libs/qworker/src/qworker.c | 7 ++-- source/libs/scheduler/inc/schedulerInt.h | 9 ++++- source/libs/scheduler/src/scheduler.c | 45 ++++++++++++++---------- source/libs/transport/src/transCli.c | 13 ++++--- 4 files changed, 45 insertions(+), 29 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 876807d17c..07e31d549e 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -520,7 +520,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { while (true) { QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); - taosSsleep(20); code = qExecTask(*taskHandle, &pRes, &useconds); if (code) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -1196,8 +1195,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); } else if (ctx->phase > 0) { - qwBuildAndSendDropRsp(&ctx->connInfo, code); - QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); + qwBuildAndSendDropRsp(&qwMsg->connInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); rsped = true; @@ -1206,7 +1205,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!rsped) { - ctx->connInfo.handle == qwMsg->connInfo.handle; + ctx->connInfo.handle = qwMsg->connInfo.handle; ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 9d973e4437..c1c4359607 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -103,6 +103,11 @@ typedef struct SSchFlowControl { SArray *taskList; // Element is SSchTask* } SSchFlowControl; +typedef struct SSchNodeInfo { + SQueryNodeAddr addr; + void *handle; +} SSchNodeInfo; + typedef struct SSchLevel { int32_t level; int8_t status; @@ -128,7 +133,7 @@ typedef struct SSchTask { SQueryNodeAddr succeedAddr; // task executed success node address int8_t candidateIdx; // current try condidation index SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr - SArray *execAddrs; // all tried node for current task, element is SQueryNodeAddr + SArray *execNodes; // all tried node for current task, element is SSchNodeInfo SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* @@ -190,6 +195,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) +#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL) +#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle)) #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 77f8ccf8cc..e612f3ae59 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -57,9 +57,9 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); - pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); - if (NULL == pTask->execAddrs) { - SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM); + pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo)); + if (NULL == pTask->execNodes) { + SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -101,8 +101,8 @@ void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->parents); } - if (pTask->execAddrs) { - taosArrayDestroy(pTask->execAddrs); + if (pTask->execNodes) { + taosArrayDestroy(pTask->execNodes); } } @@ -355,12 +355,16 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) { - if (NULL == taosArrayPush(pTask->execAddrs, addr)) { - SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno); +int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) { + SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle}; + + if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) { + SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("task execNode recorded, handle:%p", handle); + return TSDB_CODE_SUCCESS; } @@ -1090,7 +1094,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SSchJob *pJob = schAcquireJob(pParam->refId); if (NULL == pJob) { - qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:%" PRIx64, pParam->queryId, pParam->taskId, pParam->refId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } @@ -1110,7 +1114,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); - pTask->handle = pMsg->handle; + SCH_SET_TASK_HANDLE(pTask, pMsg->handle); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: @@ -1849,11 +1853,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask ? pTask->handle : NULL}; + SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); - if (isCandidateAddr) { - SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); + if (msgType == TDMT_VND_QUERY) { + SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle)); } return TSDB_CODE_SUCCESS; @@ -1935,6 +1939,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { bool enough = false; int32_t code = 0; + SCH_SET_TASK_HANDLE(pTask, NULL); + if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough)); @@ -1975,23 +1981,24 @@ int32_t schLaunchJob(SSchJob *pJob) { } void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { - if (NULL == pTask->execAddrs) { + if (NULL == pTask->execNodes) { SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; } - int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); + int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes); if (size <= 0) { - SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; } - SQueryNodeAddr *addr = NULL; + SSchNodeInfo *nodeInfo = NULL; for (int32_t i = 0; i < size; ++i) { - addr = (SQueryNodeAddr *)taosArrayGet(pTask->execAddrs, i); + nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i); + SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK); } SCH_TASK_DLOG("task has %d exec address", size); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4269865993..23bc43693e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -261,7 +261,7 @@ void cliHandleResp(SCliConn* conn) { if (pMsg == NULL) { transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); tDebug("cli conn %p construct ahandle %p by %d, persist: 1", conn, transMsg.ahandle, transMsg.msgType); - if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle = NULL) { + if (!CONN_RELEASE_BY_SERVER(conn)&& transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); tDebug("cli conn %p construct ahandle %p due brokenlink, persist: 1", conn, transMsg.ahandle); } @@ -330,10 +330,12 @@ void cliHandleExcept(SCliConn* pConn) { } SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - - while (!transQueueEmpty(&pConn->cliMsgs)) { + bool once = false; + do { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); - + if (pMsg == NULL && once) { + break; + } STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransMsg transMsg = {0}; @@ -356,6 +358,7 @@ void cliHandleExcept(SCliConn* pConn) { if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle except", pTransInst->label, pConn); if (transMsg.ahandle == NULL) { + once = true; continue; } (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -366,7 +369,7 @@ void cliHandleExcept(SCliConn* pConn) { } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); - }; + } while (!transQueueEmpty(&pConn->cliMsgs)); transUnrefCliHandle(pConn); }