From 3b2a50050ba1a3fb71f783afdfb97ea51fa369ee Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 22 Mar 2022 15:17:09 +0800 Subject: [PATCH] feature/scheduler --- source/libs/qworker/src/qworker.c | 34 ++++++++++++++------------- source/libs/qworker/src/qworkerMsg.c | 19 ++++++++++----- source/libs/scheduler/src/scheduler.c | 17 ++++++++++++-- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 33e859d354..ca0dd4a965 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -697,8 +697,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; - void *dropConnection = NULL; - void *cancelConnection = NULL; + SQWConnInfo *dropConnection = NULL; + SQWConnInfo *cancelConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -793,12 +793,12 @@ _return: if (dropConnection) { qwBuildAndSendDropRsp(dropConnection, code); - QW_TASK_DLOG("drop msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); } if (cancelConnection) { qwBuildAndSendCancelRsp(cancelConnection, code); - QW_TASK_DLOG("cancel msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); @@ -811,7 +811,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp int32_t code = 0; SQWTaskCtx *ctx = NULL; SQWConnInfo connInfo = {0}; - void *readyConnection = NULL; + SQWConnInfo *readyConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -879,7 +879,7 @@ _return: if (TSDB_CODE_SUCCESS == code && readyConnection) { qwBuildAndSendReadyRsp(readyConnection, code); - QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); } if (code) { @@ -907,7 +907,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); atomic_store_8(&ctx->taskType, taskType); - + atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle); + atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); + code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code)); @@ -926,7 +928,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { } QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); queryRsped = true; @@ -944,7 +946,7 @@ _return: if (!queryRsped) { qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - QW_TASK_DLOG("query msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } QW_RET(TSDB_CODE_SUCCESS); @@ -1007,7 +1009,7 @@ _return: if (needRsp) { qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); - QW_TASK_DLOG("ready msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } QW_RET(TSDB_CODE_SUCCESS); @@ -1050,7 +1052,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); - QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { atomic_store_8(&ctx->queryContinue, 1); } @@ -1067,7 +1069,7 @@ _return: qwFreeFetchRsp(rsp); rsp = NULL; qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); - QW_TASK_DLOG("fetch msg rsped, code:%x - %s", code, tstrerror(code)); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } QW_LOCK(QW_WRITE, &ctx->lock); @@ -1147,7 +1149,7 @@ _return: if (code || rsp) { qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); - QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, dataLen); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } QW_RET(TSDB_CODE_SUCCESS); @@ -1210,8 +1212,7 @@ _return: if (TSDB_CODE_SUCCESS != code || needRsp) { qwBuildAndSendDropRsp(&qwMsg->connInfo, code); - - QW_TASK_DLOG("drop msg rsped, code:%x", code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); } QW_RET(TSDB_CODE_SUCCESS); @@ -1247,6 +1248,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); } @@ -1297,8 +1299,8 @@ _return: QW_UNLOCK(QW_READ, &mgmt->schLock); for (int32_t j = 0; j < i; ++j) { - QW_DLOG("hb on connection handle %p, taskNum:%d", rspList[j].connInfo.handle, (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); + QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); tFreeSSchedulerHbRsp(&rspList[j].rsp); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 42d5b94397..b3921368e1 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -324,7 +324,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.ahandle = pMsg->ahandle; char* sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql); + QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); tfree(sql); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType)); @@ -357,7 +357,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; - QW_SCH_TASK_DLOG("processCQuery start, node:%p", node); + QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); @@ -391,7 +391,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; - QW_SCH_TASK_DLOG("processReady start, node:%p", node); + QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg)); @@ -453,7 +453,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; - QW_SCH_TASK_DLOG("processFetch start, node:%p", node); + QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); @@ -472,6 +472,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -484,6 +485,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + int64_t rId = msg->refId; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; @@ -493,6 +499,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { _return: QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); + QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); return TSDB_CODE_SUCCESS; } @@ -525,7 +532,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; - QW_SCH_TASK_DLOG("processDrop start, node:%p", node); + QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); @@ -559,7 +566,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; - QW_SCH_DLOG("processHb start, node:%p", node); + QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 01cf0d8d8a..2bd18277bf 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -137,7 +137,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m return TSDB_CODE_SUCCESS; case TDMT_VND_RES_READY_RSP: reqMsgType = TDMT_VND_QUERY; - break; + if (lastMsgType != reqMsgType && -1 != lastMsgType) { + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", (lastMsgType > 0 ? TMSG_INFO(lastMsgType) : "null"), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + SCH_SET_TASK_LASTMSG_TYPE(pTask, -1); + return TSDB_CODE_SUCCESS; case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: case TDMT_VND_FETCH_RSP: @@ -1085,7 +1096,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in } pTask = *task; - SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); + SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); pTask->handle = pMsg->handle; SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); @@ -1174,6 +1185,8 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); + qDebug("handle %p is broken", pMsg->handle); + if (head->isHbParam) { SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL};