From 955679276fa8c4f4b88ace0c682ab34efeb69d33 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 27 May 2022 22:05:53 +0800 Subject: [PATCH 1/9] feat:optimize for group by tag --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/executorInt.h | 4 ++ source/libs/executor/inc/executorimpl.h | 8 +++- source/libs/executor/src/executorimpl.c | 6 +-- source/libs/executor/src/groupoperator.c | 13 ++++-- source/libs/executor/src/scanoperator.c | 51 ++++++++++++++++++++---- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3ae2d18e5d..b2244c572c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -211,6 +211,7 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; + SNodeList* pPartitionKeys; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f2f0bc2055..85572a9e17 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -27,6 +27,10 @@ typedef struct { int32_t bytes; } SGroupKeys, SStateKeys; +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList); +uint64_t calcGroupId(char* pData, int32_t len); +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols); +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc613ddd86..aaa2cccf47 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -334,6 +334,12 @@ typedef struct STableScanInfo { int32_t dataBlockLoadFlag; double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result } STableScanInfo; typedef struct STagScanInfo { @@ -704,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 684c657d17..96b3b9f9e3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4477,9 +4477,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pDataReader == NULL && terrno != 0) { return NULL; } - + SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; @@ -4510,7 +4510,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableListInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ef770e8afc..18f24d7640 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -24,10 +24,10 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "executorInt.h" static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); -static uint64_t calcGroupId(char* pData, int32_t len); static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; @@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo return true; } -static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { @@ -137,7 +137,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } } -static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -607,8 +607,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 32187c81a7..3131c27e28 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -33,6 +33,8 @@ #include "ttypes.h" #include "vnode.h" +#include "executorInt.h" + #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -369,6 +371,19 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, code); } + int32_t numOfGroupCols = taosArrayGetSize(pTableScanInfo->pGroupCols); + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0, numOfGroupCols); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + + uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (!groupId) { + pBlock->info.groupId = *groupId; + + }else{ + pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); + } + // current block is filter out according to filter condition, continue load the next block if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { continue; @@ -479,21 +494,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pTableScanInfo->pResBlock); tsdbCleanupReadHandle(pTableScanInfo->dataReader); + taosArrayDestroy(pTableScanInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pTableScanInfo->pGroupColVals); + taosMemoryFree(pTableScanInfo->keyBuf); + taosHashCleanup(pTableScanInfo->pGroupSet); if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -504,7 +523,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -533,6 +552,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; + // for table group + pInfo->pGroupCols = groupKyes; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); @@ -540,6 +571,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->cost.openCost = 0; return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; } SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { From f6c6083aad623e85bc6d3ed146f55f013c5f9f8a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 2 Jun 2022 21:02:02 +0800 Subject: [PATCH 2/9] reschedule timeout task --- include/libs/qworker/qworker.h | 2 + include/util/taoserror.h | 3 +- source/client/src/clientEnv.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 + source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/inc/qwMsg.h | 1 + source/libs/qworker/src/qwMsg.c | 35 ++++ source/libs/qworker/src/qworker.c | 36 +++- source/libs/scheduler/inc/schedulerInt.h | 50 +++++- source/libs/scheduler/src/schJob.c | 177 ++++++++++++++------ source/libs/scheduler/src/schRemote.c | 50 +++--- source/libs/scheduler/src/scheduler.c | 4 + 14 files changed, 277 insertions(+), 97 deletions(-) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 91cf975a56..aa20082fe0 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -58,6 +58,8 @@ typedef struct { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4550bccbed..0e1fd33e83 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -563,7 +563,8 @@ int32_t* taosGetErrno(); //scheduler&qworker #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) #define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) -#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) +#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) +#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504) //parser #define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 4f5f23b68b..a4655f7bcd 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -131,6 +131,7 @@ void destroyTscObj(void *pObj) { hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); + schedulerStopTransport(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a945358d34..378451c301 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -238,6 +238,8 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType switch (qtype) { case QUERY_QUEUE: + vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); + dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pVnode->pQueryQ, pMsg); break; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 308f89736d..c3016eeff7 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -56,6 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ab879c5dbb..82a174025b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -189,7 +189,15 @@ _err: return -1; } -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { +int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) { + return 0; + } + + return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg); +} + +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4fe3c18393..3804c114cb 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -33,7 +33,7 @@ extern "C" { #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 -#define QW_DEFAULT_HEARTBEAT_MSEC 3000 +#define QW_DEFAULT_HEARTBEAT_MSEC 5000 enum { QW_PHASE_PRE_QUERY = 1, diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index ede085b6f9..5a8a45cd51 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -23,6 +23,7 @@ extern "C" { #include "qwInt.h" #include "dataSinkMgt.h" +int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index f8205a6bb4..79fcbc4079 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -248,6 +248,41 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * return TSDB_CODE_SUCCESS; } +int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == qWorkerMgmt || NULL == pMsg) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + SSubQueryMsg *msg = pMsg->pCont; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); + msg->refId = be64toh(msg->refId); + msg->phyLen = ntohl(msg->phyLen); + msg->sqlLen = ntohl(msg->sqlLen); + + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + int64_t rId = msg->refId; + + SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + + QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); + QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg)); + QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p", pMsg->info.handle); + + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fd16fa53b7..9ad36a32c4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -248,11 +248,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); - if (QW_PHASE_PRE_QUERY == phase) { - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); - } else { - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - } + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); @@ -285,7 +281,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); break; } case QW_PHASE_PRE_FETCH: { @@ -437,7 +433,7 @@ _return: QW_RET(code); } -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { +int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false; SSubplan *plan = NULL; @@ -448,6 +444,30 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START)); + +_return: + + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + qwReleaseTaskCtx(mgmt, ctx); + } + + QW_RET(TSDB_CODE_SUCCESS); +} + + +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { + int32_t code = 0; + bool queryRsped = false; + SSubplan *plan = NULL; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SQWTaskCtx *ctx = NULL; + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -663,7 +683,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 44b3e6d396..709f3a4e34 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -32,6 +32,10 @@ extern "C" { #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT +#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 +#define SCH_MAX_TASK_TIMEOUT_USEC 60000000 + +#define SCH_TASK_MAX_EXEC_TIMES 5 #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA enum { @@ -51,6 +55,7 @@ typedef struct SSchTrans { typedef struct SSchHbTrans { SRWLatch lock; + int64_t taskNum; SRpcCtx rpcCtx; SSchTrans trans; } SSchHbTrans; @@ -114,7 +119,8 @@ typedef struct SSchTaskCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; - void *transport; + int32_t execIdx; + void *pTrans; } SSchTaskCallbackParam; typedef struct SSchHbCallbackParam { @@ -148,8 +154,16 @@ typedef struct SSchLevel { SArray *subTasks; // Element is SQueryTask } SSchLevel; +typedef struct SSchTaskProfile { + int64_t startTs; + int64_t execUseTime[SCH_TASK_MAX_EXEC_TIMES]; + int64_t waitTime; + int64_t endTs; +} SSchTaskProfile; + typedef struct SSchTask { uint64_t taskId; // task id + int32_t execIdx; // task current execute try index SRWLatch lock; // task lock SSchLevel *level; // level SSubplan *plan; // subplan @@ -157,16 +171,17 @@ typedef struct SSchTask { int32_t msgLen; // msg length int8_t status; // task status int32_t lastMsgType; // last sent msg type - int32_t tryTimes; // task already tried times + int64_t timeoutUsec; // taks timeout useconds before reschedule SQueryNodeAddr succeedAddr; // task executed success node address int8_t candidateIdx; // current try condidation index SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr - SArray *execNodes; // all tried node for current task, element is SSchNodeInfo - SQueryProfileSummary summary; // task execution summary + SHashObj *execNodes; // all tried node for current task, element is SSchNodeInfo + SSchTaskProfile profile; // task execution profile int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* - void* handle; // task send handle + void* handle; // task send handle + bool registerdHb; // registered in hb } SSchTask; typedef struct SSchJobAttr { @@ -215,6 +230,24 @@ typedef struct SSchJob { extern SSchedulerMgmt schMgmt; +#define SCH_LOG_TASK_START_TS(_task) \ + do { \ + int64_t us = taosGetTimestampUs(); \ + (_task)->profile.tryUseTime[(_task)->execIdx] = us; \ + if (0 == (_task)->execIdx) { \ + (_task)->profile.startTs = us; \ + } \ + } while (0) + +#define SCH_LOG_TASK_END_TS(_task) \ + do { \ + int64_t us = taosGetTimestampUs(); \ + (_task)->profile.tryUseTime[(_task)->execIdx] = us - (_task)->profile.tryUseTime[(_task)->execIdx]; \ + (_task)->profile.endTs = us; \ + } while (0) + +#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.tryUseTime[(_task)->execIdx]) > (_taks)->timeoutUsec) + #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) @@ -284,7 +317,7 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); -int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); +int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction); int32_t schCloneSMsgSendInfo(void *src, void **dst); int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); void schFreeJobImpl(void *job); @@ -301,10 +334,9 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); void schProcessOnDataFetched(SSchJob *job); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); -int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode); void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); -int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); +int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx); int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, SSchResInfo *pRes, bool sync); int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, @@ -318,7 +350,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index dbad053c65..7600dafa59 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -28,11 +28,13 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgm int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; + pTask->execIdx = -1; + pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); - pTask->execNodes = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SSchNodeInfo)); + pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (NULL == pTask->execNodes) { - SCH_TASK_ELOG("taosArrayInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM); + SCH_TASK_ELOG("taosHashInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -123,7 +125,33 @@ _return: SCH_RET(code); } -void schFreeTask(SSchTask *pTask) { +void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { + if (!pTask->registerdHb) { + return; + } + + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SQueryNodeEpId epId = {0}; + + epId.nodeId = addr->nodeId; + + SEp* pEp = SCH_GET_CUR_EP(addr); + strcpy(epId.ep.fqdn, pEp->fqdn); + epId.ep.port = pEp->port; + + SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port); + } + + atomic_sub_fetch_64(&hb->taskNum, 1); + + pTask->registerdHb = false; +} + +void schFreeTask(SSchJob *pJob, SSchTask *pTask) { + schDeregisterTaskHb(pJob, pTask); + if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); } @@ -139,7 +167,7 @@ void schFreeTask(SSchTask *pTask) { } if (pTask->execNodes) { - taosArrayDestroy(pTask->execNodes); + taosHashCleanup(pTask->execNodes); } } @@ -329,44 +357,51 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle) { - SSchNodeInfo nodeInfo = {.addr = *addr, .handle = handle}; +int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) { + SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; - if (NULL == taosArrayPush(pTask->execNodes, &nodeInfo)) { - SCH_TASK_ELOG("taosArrayPush nodeInfo to execNodes list failed, errno:%d", errno); + if (NULL == taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) { + SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task execNode recorded, handle:%p", handle); + SCH_TASK_DLOG("task execNode added, execIdx:%d", execIdx); return TSDB_CODE_SUCCESS; } -int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle) { +int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) { if (NULL == pTask->execNodes) { return TSDB_CODE_SUCCESS; } - int32_t num = taosArrayGetSize(pTask->execNodes); - for (int32_t i = 0; i < num; ++i) { - SSchNodeInfo* pNode = taosArrayGet(pTask->execNodes, i); - if (pNode->handle == handle) { - taosArrayRemove(pTask->execNodes, i); - break; - } + taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx)); + if (execIdx != pTask->execIdx) { // ignore it + SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t rspCode) { +int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { + if (taosArrayGetSize(pTask->execNodes) <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx)); + nodeInfo->handle = handle; + + return TSDB_CODE_SUCCESS; +} + +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx) { + if (msgType == TDMT_SCH_LINK_BROKEN) { + SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx)); + } + SCH_SET_TASK_HANDLE(pTask, handle); - schUpdateTaskExecNodeHandle(pTask, handle, rspCode); - - if (msgType == TDMT_SCH_LINK_BROKEN) { - schDropTaskExecNode(pJob, pTask, handle); - } + schUpdateTaskExecNode(pTask, handle, execIdx); return TSDB_CODE_SUCCESS; } @@ -672,7 +707,6 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { int8_t status = 0; - ++pTask->tryTimes; if (schJobNeedToStop(pJob, &status)) { *needRetry = false; @@ -680,9 +714,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) { + if ((pTask->execIdx + 1) >= SCH_TASK_MAX_EXEC_TIMES) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes); + SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx); return TSDB_CODE_SUCCESS; } @@ -694,9 +728,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo // TODO CHECK epList/condidateList if (SCH_IS_DATA_SRC_TASK(pTask)) { - if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { + if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes, + SCH_TASK_DLOG("task no more retry since all ep tried, execIdx:%d, epNum:%d", pTask->execIdx, SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); return TSDB_CODE_SUCCESS; } @@ -712,7 +746,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } *needRetry = true; - SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode)); + SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execIdx + 1, errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; } @@ -728,6 +762,8 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } + schDeregisterTaskHb(pJob, pTask); + if (SCH_IS_DATA_SRC_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { @@ -906,6 +942,8 @@ void schProcessOnDataFetched(SSchJob *job) { int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int8_t status = 0; + SCH_LOG_TASK_END_TS(pTask); + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_RET(atomic_load_32(&pJob->errCode)); @@ -989,6 +1027,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_LOG_TASK_END_TS(pTask); + SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); @@ -1105,6 +1145,57 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs return TSDB_CODE_SUCCESS; } +int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + return TSDB_CODE_SUCCESS; + } + + +} + +int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { + int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); + SSchTask *pTask = NULL; + + qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port); + + for (int32_t i = 0; i < taskNum; ++i) { + STaskStatus *taskStatus = taosArrayGet(pStatusList, i); + + SSchJob *pJob = schAcquireJob(taskStatus->refId); + if (NULL == pJob) { + qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, + taskStatus->queryId, taskStatus->taskId); + // TODO DROP TASK FROM SERVER!!!! + continue; + } + + SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status)); + + pTask = NULL; + schGetTaskInJob(pJob, taskStatus->taskId, &pTask); + if (NULL == pTask) { + // TODO DROP TASK FROM SERVER!!!! + schReleaseJob(taskStatus->refId); + continue; + } + + if (taskStatus->status == JOB_TASK_STATUS_FAILED) { + // RECORD AND HANDLE ERROR!!!! + schReleaseJob(taskStatus->refId); + continue; + } + + if (taskStatus->status == JOB_TASK_STATUS_NOT_START && SCH_TASK_TIMEOUT(pTask)) { + schRescheduleTask(pJob, pTask); + } + + schReleaseJob(taskStatus->refId); + } + +} + + int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { if (NULL == pJob->execRes.res) { @@ -1156,25 +1247,16 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { return TSDB_CODE_SUCCESS; } - -int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) { - if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || - taosArrayGetSize(pTask->execNodes) <= 0) { - return TSDB_CODE_SUCCESS; - } - - SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0); - nodeInfo->handle = handle; - - return TSDB_CODE_SUCCESS; -} - int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int8_t status = 0; int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); + pTask->execIdx++; + + SCH_LOG_TASK_START_TS(pTask); + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status)); @@ -1263,19 +1345,20 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { return; } - int32_t size = (int32_t)taosArrayGetSize(pTask->execNodes); + int32_t size = (int32_t)taosHashGetSize(pTask->execNodes); if (size <= 0) { SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; } - SSchNodeInfo *nodeInfo = NULL; - for (int32_t i = 0; i < size; ++i) { - nodeInfo = (SSchNodeInfo *)taosArrayGet(pTask->execNodes, i); + SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); + while (nodeInfo) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK); + + nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); } SCH_TASK_DLOG("task has %d exec address", size); @@ -1332,7 +1415,7 @@ void schFreeJobImpl(void *job) { int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); - schFreeTask(pTask); + schFreeTask(pJob, pTask); } taosArrayDestroy(pLevel->subTasks); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 0ba91a1c85..60384de6e0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -366,7 +366,12 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, rspCode)); + if (pParam->execIdx != pTask->execIdx) { + SCH_TASK_DLOG("execIdx %d mis-match current execIdx %d", pParam->execIdx, pTask->execIdx); + goto _return; + } + + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, pParam->execIdx)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); @@ -426,7 +431,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); - SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId)); + SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL)); } else { SCH_ERR_RET(schHandleCallback(param, pMsg, TDMT_SCH_LINK_BROKEN, code)); } @@ -454,7 +459,8 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; + param->execIdx = pTask->execIdx; msgSendInfo->param = param; msgSendInfo->fp = fp; @@ -717,7 +723,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); - param->transport = trans.pTrans; + param->pTrans = trans.pTrans; pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; @@ -768,10 +774,14 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { bool exist = false; SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); if (!exist) { - SCH_ERR_RET(schBuildAndSendHbMsg(&epId)); + SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); } } + atomic_add_fetch_64(&hb->taskNum, 1); + + pTask->registerdHb = true; + return TSDB_CODE_SUCCESS; } @@ -810,33 +820,12 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { } SSchTrans trans = {0}; - trans.pTrans = pParam->transport; + trans.pTrans = pParam->pTrans; trans.pHandle = pMsg->handle; SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); - int32_t taskNum = (int32_t)taosArrayGetSize(rsp.taskStatus); - qDebug("%d task status in hb rsp, nodeId:%d, fqdn:%s, port:%d", taskNum, rsp.epId.nodeId, rsp.epId.ep.fqdn, - rsp.epId.ep.port); - - for (int32_t i = 0; i < taskNum; ++i) { - STaskStatus *taskStatus = taosArrayGet(rsp.taskStatus, i); - - SSchJob *pJob = schAcquireJob(taskStatus->refId); - if (NULL == pJob) { - qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId, - taskStatus->queryId, taskStatus->taskId); - // TODO DROP TASK FROM SERVER!!!! - continue; - } - - // TODO - - SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, - jobTaskStatusStr(taskStatus->status)); - - schReleaseJob(taskStatus->refId); - } + SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); _return: @@ -856,7 +845,8 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->pTrans; + param->pTrans = pJob->pTrans; + param->taskId = pTask->taskId; *pParam = param; @@ -1158,7 +1148,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, (rpcCtx.args ? &rpcCtx : NULL))); if (msgType == TDMT_VND_QUERY) { - SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle)); + SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 522bd8044d..25d07aa35d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -176,6 +176,10 @@ int32_t scheduleCancelJob(int64_t job) { SCH_RET(code); } +void schedulerStopTransport(void *pTrans) { + // CLOSE && REMOVE RELATED HB CONNECTIONS +} + void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { From b6e60082ae89eaf2f1212d4c03b0709ab9e03b47 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 3 Jun 2022 20:44:32 +0800 Subject: [PATCH 3/9] rescheduler timeout task --- include/util/taoserror.h | 3 +- source/client/src/clientEnv.c | 1 - source/client/src/clientMain.c | 3 +- source/libs/qworker/src/qwMsg.c | 2 +- source/libs/scheduler/inc/schedulerInt.h | 32 ++++++-- source/libs/scheduler/src/schJob.c | 94 ++++++++++++++++-------- source/libs/scheduler/src/schRemote.c | 13 +++- source/libs/scheduler/src/scheduler.c | 4 - source/util/src/terror.c | 1 + 9 files changed, 101 insertions(+), 52 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0e1fd33e83..703f189562 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,7 +564,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) #define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) #define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) -#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504) +#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504) +#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2550) //parser #define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index a4655f7bcd..4f5f23b68b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -131,7 +131,6 @@ void destroyTscObj(void *pObj) { hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); - schedulerStopTransport(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e144885e9e..7d9de8da95 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -66,10 +66,11 @@ void taos_cleanup(void) { hbMgrCleanUp(); - rpcCleanup(); catalogDestroy(); schedulerDestroy(); + rpcCleanup(); + tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 79fcbc4079..317877c253 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -274,7 +274,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t tId = msg->taskId; int64_t rId = msg->refId; - SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; + SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle); QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg)); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 709f3a4e34..592eec3592 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -163,9 +163,11 @@ typedef struct SSchTaskProfile { typedef struct SSchTask { uint64_t taskId; // task id - int32_t execIdx; // task current execute try index SRWLatch lock; // task lock + int32_t maxExecTimes; // task may exec times + int32_t execIdx; // task current execute try index SSchLevel *level; // level + SRWLatch planLock; // task update plan lock SSubplan *plan; // subplan char *msg; // operator tree int32_t msgLen; // msg length @@ -230,26 +232,39 @@ typedef struct SSchJob { extern SSchedulerMgmt schMgmt; -#define SCH_LOG_TASK_START_TS(_task) \ +#define SCH_LOG_TASK_START_TS(_task) \ + do { \ + int64_t us = taosGetTimestampUs(); \ + int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + (_task)->profile.execUseTime[idx] = us; \ + if (0 == (_task)->execIdx) { \ + (_task)->profile.startTs = us; \ + } \ + } while (0) + +#define SCH_LOG_TASK_WAIT_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - (_task)->profile.tryUseTime[(_task)->execIdx] = us; \ - if (0 == (_task)->execIdx) { \ - (_task)->profile.startTs = us; \ - } \ + int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + (_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \ } while (0) + #define SCH_LOG_TASK_END_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - (_task)->profile.tryUseTime[(_task)->execIdx] = us - (_task)->profile.tryUseTime[(_task)->execIdx]; \ + int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + (_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \ (_task)->profile.endTs = us; \ } while (0) -#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.tryUseTime[(_task)->execIdx]) > (_taks)->timeoutUsec) +#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) +#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock) +#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock) + #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) #define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) @@ -351,6 +366,7 @@ int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64 int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx); +int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 7600dafa59..cceadcc034 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -29,6 +29,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->plan = pPlan; pTask->level = pLevel; pTask->execIdx = -1; + pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); @@ -142,6 +143,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port); + return; } atomic_sub_fetch_64(&hb->taskNum, 1); @@ -360,7 +362,7 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) { SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; - if (NULL == taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) { + if (taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) { SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -384,7 +386,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ } int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { - if (taosArrayGetSize(pTask->execNodes) <= 0) { + if (taosHashGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } @@ -714,7 +716,17 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if ((pTask->execIdx + 1) >= SCH_TASK_MAX_EXEC_TIMES) { + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { + pTask->maxExecTimes++; + if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) { + pTask->timeoutUsec *= 2; + if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) { + pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC; + } + } + } + + if ((pTask->execIdx + 1) >= pTask->maxExecTimes) { *needRetry = false; SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx); return TSDB_CODE_SUCCESS; @@ -737,7 +749,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); - if ((pTask->candidateIdx + 1) >= candidateNum) { + if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", pTask->candidateIdx, candidateNum); @@ -767,7 +779,10 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_DATA_SRC_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { - ++pTask->candidateIdx; + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + if (++pTask->candidateIdx >= candidateNum) { + pTask->candidateIdx = 0; + } } SCH_ERR_RET(schLaunchTask(pJob, pTask)); @@ -942,8 +957,12 @@ void schProcessOnDataFetched(SSchJob *job) { int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int8_t status = 0; - SCH_LOG_TASK_END_TS(pTask); - + if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) { + SCH_LOG_TASK_WAIT_TS(pTask); + } else { + SCH_LOG_TASK_END_TS(pTask); + } + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_RET(atomic_load_32(&pJob->errCode)); @@ -1145,12 +1164,46 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs return TSDB_CODE_SUCCESS; } +void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { + if (NULL == pTask->execNodes) { + SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + return; + } + + int32_t size = (int32_t)taosHashGetSize(pTask->execNodes); + + if (size <= 0) { + SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + return; + } + + SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); + while (nodeInfo) { + SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); + + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK); + + nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); + } + + SCH_TASK_DLOG("task has %d exec address", size); +} + + int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { return TSDB_CODE_SUCCESS; } - + SCH_LOCK_TASK(pTask); + if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) { + schDropTaskOnExecNode(pJob, pTask); + taosHashClear(pTask->execNodes); + schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR); + } + SCH_UNLOCK_TASK(pTask); + + return TSDB_CODE_SUCCESS; } int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { @@ -1193,6 +1246,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { schReleaseJob(taskStatus->refId); } + return TSDB_CODE_SUCCESS; } @@ -1339,30 +1393,6 @@ int32_t schLaunchJob(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } -void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { - if (NULL == pTask->execNodes) { - SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); - return; - } - - int32_t size = (int32_t)taosHashGetSize(pTask->execNodes); - - if (size <= 0) { - SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); - return; - } - - SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); - while (nodeInfo) { - SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK); - - nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); - } - - SCH_TASK_DLOG("task has %d exec address", size); -} void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { if (!SCH_IS_NEED_DROP_JOB(pJob)) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 60384de6e0..a940e45b43 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -92,8 +92,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), - rspCode); + SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); taosMemoryFreeClear(msg); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -344,7 +343,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - taosMemoryFreeClear(msg); + taosMemoryFreeClear(msg); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -364,6 +363,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask)); + SCH_LOCK_TASK(pTask); + SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); if (pParam->execIdx != pTask->execIdx) { @@ -376,6 +377,10 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: + + if (pTask) { + SCH_UNLOCK_TASK(pTask); + } if (pJob) { schReleaseJob(pParam->refId); @@ -667,7 +672,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * } -int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { +int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) { SSchedulerHbReq req = {0}; int32_t code = 0; SRpcCtx rpcCtx = {0}; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 25d07aa35d..522bd8044d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -176,10 +176,6 @@ int32_t scheduleCancelJob(int64_t job) { SCH_RET(code); } -void schedulerStopTransport(void *pTrans) { - // CLOSE && REMOVE RELATED HB CONNECTIONS -} - void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 74fc14ecdd..d179887b2f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -444,6 +444,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout") TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order") // parser From fc04aa847c122bedbf8fc1abcf073e72699a41d2 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 09:57:16 +0800 Subject: [PATCH 4/9] fix qnode mnode query issue --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 1 + source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 ++ source/dnode/mgmt/mgmt_qnode/inc/qmInt.h | 2 ++ source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 ++ source/dnode/mnode/impl/src/mndQuery.c | 8 ++++++++ source/dnode/qnode/src/qnode.c | 8 ++++++++ source/libs/qworker/src/qwDbg.c | 3 ++- source/libs/qworker/src/qwMsg.c | 12 ++++++------ source/libs/qworker/src/qworker.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 19 +++++++++++++------ 10 files changed, 46 insertions(+), 15 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 9a0cfdfc93..22691300bc 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -56,6 +56,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 460f2242f2..42fa7b718e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -89,6 +89,8 @@ int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + mndPreprocessQueryMsg(pMgmt->pMnode, pMsg); + return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 54e9da24a4..acc101386b 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -51,6 +51,8 @@ int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index e36efa83db..6814643b59 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -57,6 +57,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { } int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { + qndPreprocessQueryMsg(pMgmt->pQnode, pMsg); + return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 97594f2b91..12b39e5b78 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,6 +18,14 @@ #include "mndMnode.h" #include "qworker.h" +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) { + return 0; + } + + return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); +} + int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 438982ac6a..45b88318c4 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -63,6 +63,14 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } +int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg * pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) { + return 0; + } + + return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg); +} + int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { int32_t code = -1; SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 27fe22295d..3914541157 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -36,7 +36,8 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, break; case JOB_TASK_STATUS_NOT_START: - if (newStatus != JOB_TASK_STATUS_CANCELLED) { + if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_FAILED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 317877c253..2f1ecc5172 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -300,12 +300,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = be64toh(msg->sId); - msg->queryId = be64toh(msg->queryId); - msg->taskId = be64toh(msg->taskId); - msg->refId = be64toh(msg->refId); - msg->phyLen = ntohl(msg->phyLen); - msg->sqlLen = ntohl(msg->sqlLen); + msg->sId = msg->sId; + msg->queryId = msg->queryId; + msg->taskId = msg->taskId; + msg->refId = msg->refId; + msg->phyLen = msg->phyLen; + msg->sqlLen = msg->sqlLen; uint64_t sId = msg->sId; uint64_t qId = msg->queryId; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9ad36a32c4..8c0366fa0b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -446,6 +446,8 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + ctx->ctrlConnInfo = qwMsg->connInfo; + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START)); _return: @@ -475,8 +477,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex atomic_store_8(&ctx->taskType, taskType); atomic_store_8(&ctx->explain, explain); - ctx->ctrlConnInfo = qwMsg->connInfo; - /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/ code = qStringToSubplan(qwMsg->msg, &plan); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a940e45b43..88caf7438e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -774,13 +774,20 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { strcpy(epId.ep.fqdn, pEp->fqdn); epId.ep.port = pEp->port; - SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); - if (NULL == hb) { - bool exist = false; - SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); - if (!exist) { - SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); + SSchHbTrans *hb = NULL; + while (true) { + hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); + if (NULL == hb) { + bool exist = false; + SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist)); + if (!exist) { + SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL)); + } + + continue; } + + break; } atomic_add_fetch_64(&hb->taskNum, 1); From f1860b7918629f6e9d246eb3366393ebb06ef674 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Jun 2022 10:25:55 +0800 Subject: [PATCH 5/9] fix:fix error in table scan by group --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b9507f9f06..7eb8e9fea4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -379,7 +379,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); - if (!groupId) { + if (groupId) { pBlock->info.groupId = *groupId; }else{ From a3033b59e6c8db0b1737a4a27bb69158e5605806 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Jun 2022 11:19:38 +0800 Subject: [PATCH 6/9] fix:fix error in table scan by group --- source/libs/executor/src/scanoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7eb8e9fea4..0774ccec1c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -381,8 +381,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); if (groupId) { pBlock->info.groupId = *groupId; - - }else{ + }else if(len != 0){ pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); } From 6c1966a28e707086b09574e09bb85eaff7346d70 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jun 2022 11:30:57 +0800 Subject: [PATCH 7/9] refactor: adjust msg type --- include/common/tmsgdef.h | 50 ++++++++++++++------------- source/dnode/mnode/impl/src/mndMain.c | 8 ++--- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fe9ffda69c..c8a809c175 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -103,6 +103,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL) @@ -115,26 +116,26 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) @@ -153,8 +154,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "confirm-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL) // Requests handled by VNODE @@ -182,8 +183,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) @@ -203,26 +202,29 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "vnode-sync-client-request", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "vnode-sync-client-request-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "vnode-sync-request-vote", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "vnode-sync-request-vote-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "vnode-sync-append-entries", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "vnode-sync-append-entries-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "vnode-sync-noop", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "sync-ping", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "sync-noop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3a3fd7ebdb..48a470c215 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -625,7 +625,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is acquired, ref:%d", ref); + // mTrace("mnode rpc is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -634,7 +634,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { void mndReleaseRpcRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is released, ref:%d", ref); + // mTrace("mnode rpc is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } @@ -675,7 +675,7 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is acquired, ref:%d", ref); + // mTrace("mnode sync is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -684,6 +684,6 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { void mndReleaseSyncRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is released, ref:%d", ref); + // mTrace("mnode sync is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } From 7f61dbc8cc8c250929d0f3462ebb27accf7ffa6f Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 11:45:55 +0800 Subject: [PATCH 8/9] fix rpc connection fail issue --- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 592eec3592..c8fee1e532 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -365,7 +365,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 6e1319d5f4..3e23c395c9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -396,8 +396,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx) { - if (msgType == TDMT_SCH_LINK_BROKEN) { +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) { + if (dropExecNode) { SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx)); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 88caf7438e..b336ce8c76 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -372,7 +372,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in goto _return; } - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, pParam->execIdx)); + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); From 1f419e0a3642fa217dc4169729cfb6dd86996634 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jun 2022 12:31:53 +0800 Subject: [PATCH 9/9] refactor: adjust msg type --- include/common/tmsgdef.h | 5 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 58 ++++++++++----------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a4bbd48434..2cf188866b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -93,7 +93,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DNODE, "alter-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) @@ -159,6 +158,8 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL) @@ -180,8 +181,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index bbdafbdfe1..4a79513736 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -129,12 +129,7 @@ SArray *mmGetMsgHandles() { SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - - // Requests handled by DNODE if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -146,7 +141,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -159,6 +153,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -172,53 +168,50 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -233,6 +226,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + code = 0; _OVER: