From 5a202e89e4a0e0252d369cb5afd15cc39f0c5166 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 22 Mar 2022 14:06:30 +0800 Subject: [PATCH] feature/scheduler --- source/libs/qworker/inc/qworkerInt.h | 11 ++++++----- source/libs/qworker/src/qworker.c | 14 +++++++++----- source/libs/scheduler/src/scheduler.c | 16 ++++++++++++---- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index ab55b4b76d..785e809fe4 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -121,11 +121,12 @@ typedef struct SQWTaskCtx { } SQWTaskCtx; typedef struct SQWSchStatus { - int32_t lastAccessTs; // timestamp in second - SRWLatch connLock; - SQWConnInfo connInfo; - SRWLatch tasksLock; - SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus + int32_t lastAccessTs; // timestamp in second + SRWLatch connLock; + SQWConnInfo hbConnInfo; + SQueryNodeEpId epId; + SRWLatch tasksLock; + SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus } SQWSchStatus; // Qnode/Vnode level task management diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3b01c2c29e..33e859d354 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -580,6 +580,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; + hbInfo->connInfo = sch->hbConnInfo; + hbInfo->rsp.epId = sch->epId; + QW_LOCK(QW_READ, &sch->tasksLock); taskNum = taosHashGetSize(sch->tasksHash); @@ -591,8 +594,6 @@ int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbI return TSDB_CODE_QRY_OUT_OF_MEMORY; } - hbInfo->connInfo = sch->connInfo; - void *key = NULL; size_t keyLen = 0; int32_t i = 0; @@ -1228,10 +1229,13 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_LOCK(QW_WRITE, &sch->connLock); + + if (sch->hbConnInfo.handle) { + rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + } - origHandle = sch->connInfo.handle; - - memcpy(&sch->connInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); + memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); + memcpy(&sch->epId, &req->epId, sizeof(req->epId)); QW_UNLOCK(QW_WRITE, &sch->connLock); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 23233fc4c3..01cf0d8d8a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -672,15 +672,14 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); - SCH_ERR_RET(code); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } SCH_LOCK(SCH_WRITE, &hb->lock); memcpy(&hb->trans, trans, sizeof(*trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, sId:%" PRIx64 - ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId, epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle); @@ -1563,6 +1562,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* pMsgSendInfo->msgInfo.handle = trans->transHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; + + qDebug("start to send %s msg, refId:%" PRIx64 "instance:%p, handle:%p", + TMSG_INFO(msgType), pJob->refId, trans->transInst, trans->transHandle); int64_t transporterId = 0; code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); @@ -1646,13 +1648,17 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { int64_t transporterId = 0; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); + + qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { + qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", + trans.transInst, trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); SCH_ERR_JRET(code); } - qDebug("req msg sent, type:%d, %s", msgType, TMSG_INFO(msgType)); + qDebug("hb msg sent"); return TSDB_CODE_SUCCESS; _return: @@ -1833,6 +1839,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { epId.nodeId = addr->nodeId; memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); +#if 0 SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { bool exist = false; @@ -1841,6 +1848,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schBuildAndSendHbMsg(&epId)); } } +#endif return TSDB_CODE_SUCCESS; }