From e24498b0fdf48b4ce69291bf136659c639423761 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 25 Aug 2023 14:59:34 +0800 Subject: [PATCH 1/3] enh: add task notify --- include/common/tmsg.h | 21 ++++- include/common/tmsgdef.h | 1 + include/libs/qworker/qworker.h | 3 + source/client/src/clientEnv.c | 3 +- source/common/src/tmsg.c | 60 ++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/mnode/impl/src/mndMain.c | 3 +- source/dnode/mnode/impl/src/mndQuery.c | 4 + source/dnode/qnode/src/qnode.c | 4 + source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/function/src/udfd.c | 2 +- source/libs/qworker/inc/qwInt.h | 3 + source/libs/qworker/inc/qwMsg.h | 1 + source/libs/qworker/src/qwMsg.c | 35 ++++++++ source/libs/qworker/src/qwUtil.c | 39 +++++++++ source/libs/qworker/src/qworker.c | 80 ++++++++++++------- source/libs/qworker/test/qworkerTests.cpp | 3 + source/libs/scheduler/inc/schInt.h | 3 +- source/libs/scheduler/src/schJob.c | 4 + source/libs/scheduler/src/schRemote.c | 48 ++++++++++- source/libs/scheduler/src/schTask.c | 54 ++++++++++++- 24 files changed, 339 insertions(+), 39 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b6684cdd0e..8deec53470 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1418,6 +1418,7 @@ typedef struct { int64_t numOfProcessedCQuery; int64_t numOfProcessedFetch; int64_t numOfProcessedDrop; + int64_t numOfProcessedNotify; int64_t numOfProcessedHb; int64_t numOfProcessedDelete; int64_t cacheDataSize; @@ -2159,8 +2160,24 @@ typedef struct { int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); -int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); -int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq); + + +typedef enum { + TASK_NOTIFY_FINISHED = 1, +} ETaskNotifyType; + +typedef struct { + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; + int64_t refId; + int32_t execId; + ETaskNotifyType type; +} STaskNotifyReq; + +int32_t tSerializeSTaskNotifyReq(void* buf, int32_t bufLen, STaskNotifyReq* pReq); +int32_t tDeserializeSTaskNotifyReq(void* buf, int32_t bufLen, STaskNotifyReq* pReq); int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 323b2de6ba..29f0667dac 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -244,6 +244,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 60ed94d4de..bbd2d76b59 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -45,6 +45,7 @@ typedef struct { uint64_t cqueryProcessed; uint64_t fetchProcessed; uint64_t dropProcessed; + uint64_t notifyProcessed; uint64_t hbProcessed; uint64_t deleteProcessed; @@ -90,6 +91,8 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); +int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); + int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 40c27bf164..b555f4e683 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -133,7 +133,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK || + msgType == TDMT_SCH_TASK_NOTIFY) { return false; } return true; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1431cd049a..9a21563abe 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1092,6 +1092,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.numOfProcessedCQuery) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedNotify) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDelete) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1; @@ -1189,6 +1190,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedCQuery) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedNotify) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDelete) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1; @@ -5874,6 +5876,64 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) return 0; } +int32_t tSerializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->refId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->type) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSTaskNotifyReq(void *buf, int32_t bufLen, STaskNotifyReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; + if (tDecodeI32(&decoder, (int32_t*)&pReq->type) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ae1b46a21d..eaa80ba775 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -187,6 +187,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 5017ad7b74..86bc11c616 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -89,6 +89,7 @@ SArray *qmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index e50a75d33a..f43e1f5537 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -755,6 +755,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SCH_TASK_NOTIFY, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e0f7da3ac4..665f86034d 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -272,7 +272,7 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) { code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) { return false; } return true; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index fd4ebf549f..12e28969c9 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -629,7 +629,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY || pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT || - pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) { + pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK || + pMsg->msgType == TDMT_SCH_TASK_NOTIFY) { return 0; } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 8e95fa3d6d..c03b02c17f 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -55,6 +55,9 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { case TDMT_SCH_QUERY_HEARTBEAT: code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg, 0); break; + case TDMT_SCH_TASK_NOTIFY: + code = qWorkerProcessNotifyMsg(pMnode, pMnode->pQuery, pMsg, 0); + break; default: terrno = TSDB_CODE_APP_ERROR; mError("unknown msg type:%d in query queue", pMsg->msgType); @@ -175,6 +178,7 @@ int32_t mndInitQuery(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 3482355512..9937debb13 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -57,6 +57,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { pLoad->numOfProcessedCQuery = stat.cqueryProcessed; pLoad->numOfProcessedFetch = stat.fetchProcessed; pLoad->numOfProcessedDrop = stat.dropProcessed; + pLoad->numOfProcessedNotify = stat.notifyProcessed; pLoad->numOfProcessedHb = stat.hbProcessed; pLoad->numOfProcessedDelete = stat.deleteProcessed; @@ -100,6 +101,9 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { case TDMT_SCH_QUERY_HEARTBEAT: code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg, ts); break; + case TDMT_SCH_TASK_NOTIFY: + code = qWorkerProcessNotifyMsg(pQnode, pQnode->pQuery, pMsg, ts); + break; default: qError("unknown msg type:%d in qnode queue", pMsg->msgType); terrno = TSDB_CODE_APP_ERROR; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a9399e4db1..737fd03d6f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -702,6 +702,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_DROP_TASK: return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0); + case TDMT_SCH_TASK_NOTIFY: + return qWorkerProcessNotifyMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0); case TDMT_VND_TABLE_META: diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 575bce09bb..bd459af9f5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -971,7 +971,7 @@ static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) { return false; } return true; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 8797a8cf6b..b4bd1943c5 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -133,6 +133,7 @@ typedef struct SQWTaskCtx { bool queryContinue; bool queryExecDone; bool queryInQueue; + bool explainRsped; int32_t rspCode; int64_t affectedRows; // for insert ...select stmt @@ -169,6 +170,7 @@ typedef struct SQWMsgStat { uint64_t rspProcessed; uint64_t cancelProcessed; uint64_t dropProcessed; + uint64_t notifyProcessed; uint64_t hbProcessed; uint64_t deleteProcessed; } SQWMsgStat; @@ -406,6 +408,7 @@ int32_t qwAddTaskCtx(QW_FPARAMS_DEF); void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); void qwDbgSimulateSleep(void); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); +int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); #ifdef __cplusplus } diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index f226b223f7..ae68f69802 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -30,6 +30,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); +int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 0cbcf44ed4..9a1c309ab0 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -610,6 +610,41 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 return TSDB_CODE_SUCCESS; } +int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + int32_t code = 0; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + + qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); + + STaskNotifyReq msg = {0}; + if (tDeserializeSTaskNotifyReq(pMsg->pCont, pMsg->contLen, &msg) < 0) { + QW_ELOG("tDeserializeSTaskNotifyReq failed, contLen:%d", pMsg->contLen); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + uint64_t sId = msg.sId; + uint64_t qId = msg.queryId; + uint64_t tId = msg.taskId; + int64_t rId = msg.refId; + int32_t eId = msg.execId; + + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info, .msgType = msg.type}; + + QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); + + QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); + + QW_SCH_TASK_DLOG("processNotify end, node:%p", node); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 3b127ee780..f00c4aef30 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -316,6 +316,45 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { taosArrayDestroy(ctx->tbInfo); } +static void freeExplainExecItem(void *param) { + SExplainExecInfo *pInfo = param; + taosMemoryFree(pInfo->verboseInfo); +} + + +int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + qTaskInfo_t taskHandle = ctx->taskHandle; + + ctx->explainRsped = true; + + SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); + QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); + + if (ctx->localExec) { + SExplainLocalRsp localRsp = {0}; + localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); + SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); + localRsp.rsp.subplanInfo = pExec; + localRsp.qId = qId; + localRsp.tId = tId; + localRsp.rId = rId; + localRsp.eId = eId; + taosArrayPush(ctx->explainRes, &localRsp); + taosArrayDestroy(execInfoList); + } else { + SRpcHandleInfo connInfo = ctx->ctrlConnInfo; + connInfo.ahandle = NULL; + int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); + taosArrayDestroyEx(execInfoList, freeExplainExecItem); + QW_ERR_RET(code); + } + + return TSDB_CODE_SUCCESS; +} + + + int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, tId, eId); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 1759cc89f5..afce4a496a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -90,11 +90,6 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re QW_RET(TSDB_CODE_SUCCESS); } -static void freeItem(void *param) { - SExplainExecInfo *pInfo = param; - taosMemoryFree(pInfo->verboseInfo); -} - int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; @@ -102,29 +97,8 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ctx->queryExecDone = true; if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { - if (ctx->explain) { - SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); - QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); - - if (ctx->localExec) { - SExplainLocalRsp localRsp = {0}; - localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); - SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); - memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); - localRsp.rsp.subplanInfo = pExec; - localRsp.qId = qId; - localRsp.tId = tId; - localRsp.rId = rId; - localRsp.eId = eId; - taosArrayPush(ctx->explainRes, &localRsp); - taosArrayDestroy(execInfoList); - } else { - SRpcHandleInfo connInfo = ctx->ctrlConnInfo; - connInfo.ahandle = NULL; - int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); - taosArrayDestroyEx(execInfoList, freeItem); - QW_ERR_RET(code); - } + if (ctx->explain && !ctx->explainRsped) { + QW_ERR_RET(qwSendExplainResponse(QW_FPARAMS(), ctx)); } if (!ctx->needFetch) { @@ -1030,6 +1004,55 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } +int32_t qwProcessNotify(QW_FPARAMS_DEF, SQWMsg *qwMsg) { + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + bool locked = false; + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); + + locked = true; + + if (QW_QUERY_RUNNING(ctx)) { + QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); + } + + switch (qwMsg->msgType) { + case TASK_NOTIFY_FINISHED: + if (ctx->explain && !ctx->explainRsped) { + QW_ERR_RET(qwSendExplainResponse(QW_FPARAMS(), ctx)); + } + break; + default: + QW_ELOG("Invalid task notify type %d", qwMsg->msgType); + QW_ERR_JRET(TSDB_CODE_INVALID_MSG); + break; + } + +_return: + + if (code) { + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); + } + } + + if (locked) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } + + if (ctx) { + qwReleaseTaskCtx(mgmt, ctx); + } + + QW_RET(TSDB_CODE_SUCCESS); +} + + int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; @@ -1354,6 +1377,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt pStat->cqueryProcessed = QW_STAT_GET(mgmt->stat.msgStat.cqueryProcessed); pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed); pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed); + pStat->notifyProcessed = QW_STAT_GET(mgmt->stat.msgStat.notifyProcessed); pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed); pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 02b341e28c..4a0d74a6e3 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -818,6 +818,9 @@ void *fetchQueueThread(void *param) { case TDMT_SCH_DROP_TASK: qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); break; + case TDMT_SCH_TASK_NOTIFY: + qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); + break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); assert(0); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index aecf3d5d91..63efa6bba4 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -531,7 +531,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void* param); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); void schFreeFlowCtrl(SSchJob *pJob); @@ -598,6 +598,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d2ed26d405..87370e8993 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -638,6 +638,10 @@ void schDropJobAllTasks(SSchJob *pJob) { // schDropTaskInHashList(pJob, pJob->failTasks); } +int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type) { + SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type)); +} + void schFreeJobImpl(void *job) { if (NULL == job) { return; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 01b4e7e9e6..10e25cae36 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -481,6 +481,18 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } +int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { + SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, + code); + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + return TSDB_CODE_SUCCESS; +} + + int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); @@ -646,6 +658,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_SCH_DROP_TASK: *fp = schHandleDropCallback; break; + case TDMT_SCH_TASK_NOTIFY: + *fp = schHandleNotifyCallback; + break; case TDMT_SCH_QUERY_HEARTBEAT: *fp = schHandleHbCallback; break; @@ -1027,7 +1042,7 @@ _return: SCH_RET(code); } -int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { +int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType, void* param) { int32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -1205,6 +1220,37 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, persistHandle = true; break; } + case TDMT_SCH_TASK_NOTIFY: { + ETaskNotifyType* pType = param; + STaskNotifyReq qMsg; + qMsg.header.vgId = addr->nodeId; + qMsg.header.contLen = 0; + qMsg.sId = schMgmt.sId; + qMsg.queryId = pJob->queryId; + qMsg.taskId = pTask->taskId; + qMsg.refId = pJob->refId; + qMsg.execId = pTask->execId; + qMsg.type = *pType; + + msgSize = tSerializeSTaskNotifyReq(NULL, 0, &qMsg); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSTaskNotifyReq get size, msgSize:%d", msgSize); + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + SCH_TASK_ELOG("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + if (tSerializeSTaskNotifyReq(msg, msgSize, &qMsg) < 0) { + SCH_TASK_ELOG("tSerializeSTaskNotifyReq failed, msgSize:%d", msgSize); + taosMemoryFree(msg); + SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + break; + } default: SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9985e7d6a1..b284edf39a 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -862,7 +862,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { while (nodeInfo) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL); SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { SCH_TASK_DLOG("no need to drop task %dth execNode", i); @@ -875,6 +875,33 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_DLOG("task has been dropped on %d exec nodes", size); } +int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) { + int32_t size = (int32_t)taosHashGetSize(pTask->execNodes); + if (size <= 0) { + SCH_TASK_DLOG("task no exec address to notify, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + return TSDB_CODE_SUCCESS; + } + + int32_t i = 0; + SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); + while (nodeInfo) { + if (nodeInfo->handle) { + SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); + SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_TASK_NOTIFY, &type)); + SCH_TASK_DLOG("start to notify %d to task's %dth execNode", type, i); + } else { + SCH_TASK_DLOG("no need to notify %d to task %dth execNode", type, i); + } + + ++i; + nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); + } + + SCH_TASK_DLOG("task has been notified %d on %d exec nodes", type, size); + return TSDB_CODE_SUCCESS; +} + + int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; @@ -1001,7 +1028,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); } - SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); + SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType, NULL)); } int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { @@ -1238,8 +1265,29 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type) { + int32_t code = TSDB_CODE_SUCCESS; + void *pIter = taosHashIterate(list, NULL); + while (pIter) { + SSchTask *pTask = *(SSchTask **)pIter; + + SCH_LOCK_TASK(pTask); + code = schNotifyTaskOnExecNode(pJob, pTask, type); + SCH_UNLOCK_TASK(pTask); + + if (TSDB_CODE_SUCCESS != code) { + break; + } + + pIter = taosHashIterate(list, pIter); + } + + SCH_RET(code); +} + + int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { - SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); + SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL)); } int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { From e0cc4e7ed7fccc325ca8dce43055b04c1501e6a7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 25 Aug 2023 15:12:28 +0800 Subject: [PATCH 2/3] fix: notify task --- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schRemote.c | 2 ++ 2 files changed, 3 insertions(+) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 63efa6bba4..c649e645a0 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -613,6 +613,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32 int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); +int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 10e25cae36..291a383393 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -88,6 +88,8 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); if (pRsp) { SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } else { + SCH_ERR_JRET(schNotifyJobAllTasks(pJob, TASK_NOTIFY_FINISHED)); } taosMemoryFreeClear(msg); From cf50665c209ede9d33f02887544dbcc313a00728 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 25 Aug 2023 15:20:12 +0800 Subject: [PATCH 3/3] fix: dead lock issue --- source/libs/scheduler/inc/schInt.h | 4 ++-- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/scheduler/src/schTask.c | 20 ++++++++++++-------- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index c649e645a0..3b7a76bfc7 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -598,7 +598,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode); int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); -int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type); +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); @@ -613,7 +613,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32 int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); -int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type); +int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 87370e8993..b565619e75 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -638,8 +638,8 @@ void schDropJobAllTasks(SSchJob *pJob) { // schDropTaskInHashList(pJob, pJob->failTasks); } -int32_t schNotifyJobAllTasks(SSchJob *pJob, ETaskNotifyType type) { - SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type)); +int32_t schNotifyJobAllTasks(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType type) { + SCH_RET(schNotifyTaskInHashList(pJob, pJob->execTasks, type, pTask)); } void schFreeJobImpl(void *job) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 291a383393..7b8decc007 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -89,7 +89,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs if (pRsp) { SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); } else { - SCH_ERR_JRET(schNotifyJobAllTasks(pJob, TASK_NOTIFY_FINISHED)); + SCH_ERR_JRET(schNotifyJobAllTasks(pJob, pTask, TASK_NOTIFY_FINISHED)); } taosMemoryFreeClear(msg); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index b284edf39a..d96c01fc76 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -1265,18 +1265,22 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } -int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type) { +int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pCurrTask) { int32_t code = TSDB_CODE_SUCCESS; + + SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; - - SCH_LOCK_TASK(pTask); - code = schNotifyTaskOnExecNode(pJob, pTask, type); - SCH_UNLOCK_TASK(pTask); - - if (TSDB_CODE_SUCCESS != code) { - break; + if (pTask != pCurrTask) { + SCH_LOCK_TASK(pTask); + code = schNotifyTaskOnExecNode(pJob, pTask, type); + SCH_UNLOCK_TASK(pTask); + + if (TSDB_CODE_SUCCESS != code) { + break; + } } pIter = taosHashIterate(list, pIter);