From fad5a80bab03ec67008e4cf3874c210493a2ce11 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 25 Mar 2022 13:27:14 +0800 Subject: [PATCH] feature/scheduler --- include/libs/qworker/qworker.h | 1 + source/dnode/mgmt/mnode/src/mmMsg.c | 2 +- source/dnode/mnode/impl/CMakeLists.txt | 2 +- source/dnode/mnode/impl/inc/mndInt.h | 5 +- source/dnode/mnode/impl/inc/mndQuery.h | 33 ++++++++++ source/dnode/mnode/impl/src/mndQuery.c | 70 ++++++++++++++++++++++ source/dnode/mnode/impl/src/mnode.c | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 3 +- source/libs/parser/src/parTranslater.c | 17 +++--- source/libs/planner/src/planPhysiCreater.c | 13 ++-- 10 files changed, 131 insertions(+), 17 deletions(-) create mode 100644 source/dnode/mnode/impl/inc/mndQuery.h create mode 100644 source/dnode/mnode/impl/src/mndQuery.c diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 944ac97ddb..0846841cef 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -28,6 +28,7 @@ enum { NODE_TYPE_VNODE = 1, NODE_TYPE_QNODE, NODE_TYPE_SNODE, + NODE_TYPE_MNODE, }; diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 1cae2220ad..f38bf3a65f 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -159,6 +159,6 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID); } diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 514bba19f4..60bf366504 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - mnode scheduler sdb wal transport cjson sync monitor + mnode scheduler sdb wal transport cjson sync monitor executor qworker ) if(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 20e85973be..f89e9d8fe0 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -59,6 +59,8 @@ typedef struct SMnodeLoad { int64_t compStorage; } SMnodeLoad; +typedef struct SQWorkerMgmt SQHandle; + typedef struct { const char *name; MndInitFp initFp; @@ -112,6 +114,7 @@ typedef struct SMnode { SSdb *pSdb; SMgmtWrapper *pWrapper; SArray *pSteps; + SQHandle *pQuery; SShowMgmt showMgmt; SProfileMgmt profileMgmt; STelemMgmt telemMgmt; @@ -119,7 +122,7 @@ typedef struct SMnode { SHashObj *infosMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; - SMsgCb msgCb; + SMsgCb msgCb; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); diff --git a/source/dnode/mnode/impl/inc/mndQuery.h b/source/dnode/mnode/impl/inc/mndQuery.h new file mode 100644 index 0000000000..7fab80de77 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndQuery.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_QUERY_H_ +#define _TD_MND_QUERY_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitQuery(SMnode *pMnode); +void mndCleanupQuery(SMnode *pMnode); + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_QUERY_H_*/ diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c new file mode 100644 index 0000000000..e93a0d9b17 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndQuery.h" +#include "mndMnode.h" +#include "executor.h" +#include "qworker.h" + +int32_t mndProcessQueryMsg(SNodeMsg *pReq) { + mTrace("message in query queue is processing"); + SMnode *pMnode = pReq->pNode; + SReadHandle handle = {0}; + + switch (pReq->rpcMsg.msgType) { + case TDMT_VND_QUERY: + return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg); + case TDMT_VND_QUERY_CONTINUE: + return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg); + default: + mError("unknown msg type:%d in query queue", pReq->rpcMsg.msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + +int32_t mndProcessFetchMsg(SNodeMsg *pReq) { + mTrace("message in fetch queue is processing"); + SMnode *pMnode = pReq->pNode; + + switch (pReq->rpcMsg.msgType) { + case TDMT_VND_FETCH: + return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg); + case TDMT_VND_DROP_TASK: + return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg); + case TDMT_VND_QUERY_HEARTBEAT: + return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg); + default: + mError("unknown msg type:%d in fetch queue", pReq->rpcMsg.msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + +int32_t mndInitQuery(SMnode *pMnode) { + int32_t code = qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb); + if (code) { + return code; + } + + mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg); + mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg); + + return 0; +} + +void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); } + diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index c4d389a379..754bed030b 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -39,6 +39,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "mndQuery.h" #define MQ_TIMER_MS 3000 #define TRNAS_TIMER_MS 6000 @@ -217,6 +218,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { // if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1; if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1; if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 9859d4a9b9..9b453d17bc 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1480,13 +1480,14 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: { - pNode->datum.p = calloc(1, pNode->node.resType.bytes); + pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1 + 100); if (NULL == pNode->datum.p) { code = TSDB_CODE_OUT_OF_MEMORY; break; } varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); + nodesDebug("varchar len:%d,string:%s", pNode->node.resType.bytes, varDataVal(pNode->datum.p)); break; } case TSDB_DATA_TYPE_JSON: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f82ce2c1b4..2761f3ef36 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -383,12 +383,13 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: { - pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE); + pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1); if (NULL == pVal->datum.p) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); } varDataSetLen(pVal->datum.p, pVal->node.resType.bytes); - strcpy(varDataVal(pVal->datum.p), pVal->literal); + strncpy(varDataVal(pVal->datum.p), pVal->literal, pVal->node.resType.bytes); + parserDebug("varchar value:%s,len:%d", pVal->literal, pVal->node.resType.bytes); break; } case TSDB_DATA_TYPE_TIMESTAMP: { @@ -599,9 +600,9 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) { static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { // todo release - // if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) { - // return TSDB_CODE_SUCCESS; - // } + if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) { + return TSDB_CODE_SUCCESS; + } int32_t code = TSDB_CODE_SUCCESS; SArray* vgroupList = NULL; @@ -613,9 +614,9 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea if (TSDB_CODE_SUCCESS == code) { // todo remove - if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) { - taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1); - } + //if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) { + // taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1); + //} code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 4077c57f2c..c8ea98687b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -307,11 +307,14 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); } else { - for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) { - SQueryNodeAddr addr; - vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); - taosArrayPush(pCxt->pExecNodeList, &addr); - } + SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet }; + taosArrayPush(pCxt->pExecNodeList, &addr); + + //for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) { + // SQueryNodeAddr addr; + // vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); + // taosArrayPush(pCxt->pExecNodeList, &addr); + //} } pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);