From cdd60902d75f13ed2a010ff1fe6913755592bcaa Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 10 Feb 2022 05:43:14 -0500 Subject: [PATCH 1/6] TD-13338 SELECT statement translate code --- include/libs/function/functionMgt.h | 2 +- include/nodes/nodes.h | 274 ++--------------------- include/nodes/querynodes.h | 280 ++++++++++++++++++++++++ source/libs/parser/inc/astCreateFuncs.h | 2 +- source/libs/parser/inc/parserImpl.h | 3 +- source/libs/planner/inc/plannerImpl.h | 27 +++ source/libs/planner/src/plannerImpl.c | 19 ++ source/nodes/src/nodesEqualFuncs.c | 2 +- source/nodes/src/nodesTraverseFuncs.c | 142 +++++++++++- source/nodes/src/nodesUtilFuncs.c | 2 +- source/nodes/test/nodesTest.cpp | 38 +++- 11 files changed, 520 insertions(+), 271 deletions(-) create mode 100644 include/nodes/querynodes.h create mode 100644 source/libs/planner/inc/plannerImpl.h create mode 100644 source/libs/planner/src/plannerImpl.c diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index bf3a45fcca..c3c8b5c4ce 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -20,7 +20,7 @@ extern "C" { #endif -#include "nodes.h" +#include "querynodes.h" typedef enum EFunctionType { // aggregate function diff --git a/include/nodes/nodes.h b/include/nodes/nodes.h index b182063808..321190d254 100644 --- a/include/nodes/nodes.h +++ b/include/nodes/nodes.h @@ -40,7 +40,11 @@ extern "C" { (NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \ cell1 = cell1->pNext, cell2 = cell2->pNext) +#define FOREACH_FOR_REWRITE(node, list) \ + for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext) + typedef enum ENodeType { + // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, VALUE, OPERATOR, FUNCTION and so on. QUERY_NODE_COLUMN = 1, QUERY_NODE_VALUE, QUERY_NODE_OPERATOR, @@ -59,9 +63,10 @@ typedef enum ENodeType { QUERY_NODE_NODE_LIST, QUERY_NODE_FILL, - // only for parser + // Only be used in parser module. QUERY_NODE_RAW_EXPR, + // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR, QUERY_NODE_SELECT_STMT, QUERY_NODE_SHOW_STMT @@ -87,248 +92,6 @@ typedef struct SNodeList { SListCell* pTail; } SNodeList; -typedef struct SRawExprNode { - ENodeType nodeType; - char* p; - uint32_t n; - SNode* pNode; -} SRawExprNode; - -typedef struct SDataType { - uint8_t type; - uint8_t precision; - uint8_t scale; - int32_t bytes; -} SDataType; - -typedef struct SExprNode { - ENodeType nodeType; - SDataType resType; - char aliasName[TSDB_COL_NAME_LEN]; - SNodeList* pAssociationList; -} SExprNode; - -typedef enum EColumnType { - COLUMN_TYPE_COLUMN = 1, - COLUMN_TYPE_TAG -} EColumnType; - -typedef struct SColumnNode { - SExprNode node; // QUERY_NODE_COLUMN - int16_t colId; - EColumnType colType; // column or tag - char dbName[TSDB_DB_NAME_LEN]; - char tableName[TSDB_TABLE_NAME_LEN]; - char tableAlias[TSDB_TABLE_NAME_LEN]; - char colName[TSDB_COL_NAME_LEN]; - SNode* pProjectRef; -} SColumnNode; - -typedef struct SValueNode { - SExprNode node; // QUERY_NODE_VALUE - char* literal; - bool isDuration; - union { - bool b; - int64_t i; - uint64_t u; - double d; - char* p; - } datum; -} SValueNode; - -typedef enum EOperatorType { - // arithmetic operator - OP_TYPE_ADD = 1, - OP_TYPE_SUB, - OP_TYPE_MULTI, - OP_TYPE_DIV, - OP_TYPE_MOD, - - // comparison operator - OP_TYPE_GREATER_THAN, - OP_TYPE_GREATER_EQUAL, - OP_TYPE_LOWER_THAN, - OP_TYPE_LOWER_EQUAL, - OP_TYPE_EQUAL, - OP_TYPE_NOT_EQUAL, - OP_TYPE_IN, - OP_TYPE_NOT_IN, - OP_TYPE_LIKE, - OP_TYPE_NOT_LIKE, - OP_TYPE_MATCH, - OP_TYPE_NMATCH, - - // json operator - OP_TYPE_JSON_GET_VALUE, - OP_TYPE_JSON_CONTAINS -} EOperatorType; - -typedef struct SOperatorNode { - SExprNode node; // QUERY_NODE_OPERATOR - EOperatorType opType; - SNode* pLeft; - SNode* pRight; -} SOperatorNode; - -typedef enum ELogicConditionType { - LOGIC_COND_TYPE_AND, - LOGIC_COND_TYPE_OR, - LOGIC_COND_TYPE_NOT, -} ELogicConditionType; - -typedef struct SLogicConditionNode { - ENodeType type; // QUERY_NODE_LOGIC_CONDITION - ELogicConditionType condType; - SNodeList* pParameterList; -} SLogicConditionNode; - -typedef struct SIsNullCondNode { - ENodeType type; // QUERY_NODE_IS_NULL_CONDITION - SNode* pExpr; - bool isNull; -} SIsNullCondNode; - -typedef struct SNodeListNode { - ENodeType type; // QUERY_NODE_NODE_LIST - SNodeList* pNodeList; -} SNodeListNode; - -typedef struct SFunctionNode { - SExprNode node; // QUERY_NODE_FUNCTION - char functionName[TSDB_FUNC_NAME_LEN]; - int32_t funcId; - int32_t funcType; - SNodeList* pParameterList; -} SFunctionNode; - -typedef struct STableNode { - ENodeType type; - char dbName[TSDB_DB_NAME_LEN]; - char tableName[TSDB_TABLE_NAME_LEN]; - char tableAlias[TSDB_TABLE_NAME_LEN]; -} STableNode; - -struct STableMeta; - -typedef struct SRealTableNode { - STableNode table; // QUERY_NODE_REAL_TABLE - struct STableMeta* pMeta; -} SRealTableNode; - -typedef struct STempTableNode { - STableNode table; // QUERY_NODE_TEMP_TABLE - SNode* pSubquery; -} STempTableNode; - -typedef enum EJoinType { - JOIN_TYPE_INNER = 1 -} EJoinType; - -typedef struct SJoinTableNode { - STableNode table; // QUERY_NODE_JOIN_TABLE - EJoinType joinType; - SNode* pLeft; - SNode* pRight; - SNode* pOnCond; -} SJoinTableNode; - -typedef enum EGroupingSetType { - GP_TYPE_NORMAL = 1 -} EGroupingSetType; - -typedef struct SGroupingSetNode { - ENodeType type; // QUERY_NODE_GROUPING_SET - EGroupingSetType groupingSetType; - SNodeList* pParameterList; -} SGroupingSetNode; - -typedef enum EOrder { - ORDER_ASC = 1, - ORDER_DESC -} EOrder; - -typedef enum ENullOrder { - NULL_ORDER_DEFAULT = 1, - NULL_ORDER_FIRST, - NULL_ORDER_LAST -} ENullOrder; - -typedef struct SOrderByExprNode { - ENodeType type; // QUERY_NODE_ORDER_BY_EXPR - SNode* pExpr; - EOrder order; - ENullOrder nullOrder; -} SOrderByExprNode; - -typedef struct SLimitNode { - ENodeType type; // QUERY_NODE_LIMIT - uint64_t limit; - uint64_t offset; -} SLimitNode; - -typedef struct SStateWindowNode { - ENodeType type; // QUERY_NODE_STATE_WINDOW - SNode* pCol; -} SStateWindowNode; - -typedef struct SSessionWindowNode { - ENodeType type; // QUERY_NODE_SESSION_WINDOW - int64_t gap; // gap between two session window(in microseconds) - SNode* pCol; -} SSessionWindowNode; - -typedef struct SIntervalWindowNode { - ENodeType type; // QUERY_NODE_INTERVAL_WINDOW - SNode* pInterval; // SValueNode - SNode* pOffset; // SValueNode - SNode* pSliding; // SValueNode - SNode* pFill; -} SIntervalWindowNode; - -typedef enum EFillMode { - FILL_MODE_NONE = 1, - FILL_MODE_VALUE, - FILL_MODE_PREV, - FILL_MODE_NULL, - FILL_MODE_LINEAR, - FILL_MODE_NEXT -} EFillMode; - -typedef struct SFillNode { - ENodeType type; // QUERY_NODE_FILL - EFillMode mode; - SNode* pValues; // SNodeListNode -} SFillNode; - -typedef struct SSelectStmt { - ENodeType type; // QUERY_NODE_SELECT_STMT - bool isDistinct; - SNodeList* pProjectionList; // SNode - SNode* pFromTable; - SNode* pWhere; - SNodeList* pPartitionByList; // SNode - SNode* pWindow; - SNodeList* pGroupByList; // SGroupingSetNode - SNode* pHaving; - SNodeList* pOrderByList; // SOrderByExprNode - SNode* pLimit; - SNode* pSlimit; -} SSelectStmt; - -typedef enum ESetOperatorType { - SET_OP_TYPE_UNION_ALL = 1 -} ESetOperatorType; - -typedef struct SSetOperator { - ENodeType type; // QUERY_NODE_SET_OPERATOR - ESetOperatorType opType; - SNode* pLeft; - SNode* pRight; - SNodeList* pOrderByList; // SOrderByExprNode - SNode* pLimit; -} SSetOperator; - SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); @@ -343,12 +106,18 @@ typedef enum EDealRes { DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, } EDealRes; -typedef EDealRes (*FQueryNodeWalker)(SNode* pNode, void* pContext); -void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext); -void nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext); -void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext); -void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext); +typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); +void nodesWalkNode(SNode* pNode, FNodeWalker walker, void* pContext); +void nodesWalkList(SNodeList* pList, FNodeWalker walker, void* pContext); +void nodesWalkNodePostOrder(SNode* pNode, FNodeWalker walker, void* pContext); +void nodesWalkListPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext); + +typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext); +void nodesRewriteNode(SNode** pNode, FNodeRewriter rewriter, void* pContext); +void nodesRewriteList(SNodeList* pList, FNodeRewriter rewriter, void* pContext); +void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext); +void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext); bool nodesEqualNode(const SNode* a, const SNode* b); @@ -357,15 +126,6 @@ void nodesCloneNode(const SNode* pNode); int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen); int32_t nodesStringToNode(const char* pStr, SNode** pNode); -bool nodesIsExprNode(const SNode* pNode); - -bool nodesIsArithmeticOp(const SOperatorNode* pOp); -bool nodesIsComparisonOp(const SOperatorNode* pOp); -bool nodesIsJsonOp(const SOperatorNode* pOp); - -bool nodesIsTimeorderQuery(const SNode* pQuery); -bool nodesIsTimelineQuery(const SNode* pQuery); - #ifdef __cplusplus } #endif diff --git a/include/nodes/querynodes.h b/include/nodes/querynodes.h new file mode 100644 index 0000000000..646ea63c83 --- /dev/null +++ b/include/nodes/querynodes.h @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QUERY_NODES_H_ +#define _TD_QUERY_NODES_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "nodes.h" + +typedef struct SRawExprNode { + ENodeType nodeType; + char* p; + uint32_t n; + SNode* pNode; +} SRawExprNode; + +typedef struct SDataType { + uint8_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; +} SDataType; + +typedef struct SExprNode { + ENodeType nodeType; + SDataType resType; + char aliasName[TSDB_COL_NAME_LEN]; + SNodeList* pAssociationList; +} SExprNode; + +typedef enum EColumnType { + COLUMN_TYPE_COLUMN = 1, + COLUMN_TYPE_TAG +} EColumnType; + +typedef struct SColumnNode { + SExprNode node; // QUERY_NODE_COLUMN + int16_t colId; + EColumnType colType; // column or tag + char dbName[TSDB_DB_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + char tableAlias[TSDB_TABLE_NAME_LEN]; + char colName[TSDB_COL_NAME_LEN]; + SNode* pProjectRef; +} SColumnNode; + +typedef struct SValueNode { + SExprNode node; // QUERY_NODE_VALUE + char* literal; + bool isDuration; + union { + bool b; + int64_t i; + uint64_t u; + double d; + char* p; + } datum; +} SValueNode; + +typedef enum EOperatorType { + // arithmetic operator + OP_TYPE_ADD = 1, + OP_TYPE_SUB, + OP_TYPE_MULTI, + OP_TYPE_DIV, + OP_TYPE_MOD, + + // comparison operator + OP_TYPE_GREATER_THAN, + OP_TYPE_GREATER_EQUAL, + OP_TYPE_LOWER_THAN, + OP_TYPE_LOWER_EQUAL, + OP_TYPE_EQUAL, + OP_TYPE_NOT_EQUAL, + OP_TYPE_IN, + OP_TYPE_NOT_IN, + OP_TYPE_LIKE, + OP_TYPE_NOT_LIKE, + OP_TYPE_MATCH, + OP_TYPE_NMATCH, + + // json operator + OP_TYPE_JSON_GET_VALUE, + OP_TYPE_JSON_CONTAINS +} EOperatorType; + +typedef struct SOperatorNode { + SExprNode node; // QUERY_NODE_OPERATOR + EOperatorType opType; + SNode* pLeft; + SNode* pRight; +} SOperatorNode; + +typedef enum ELogicConditionType { + LOGIC_COND_TYPE_AND, + LOGIC_COND_TYPE_OR, + LOGIC_COND_TYPE_NOT, +} ELogicConditionType; + +typedef struct SLogicConditionNode { + ENodeType type; // QUERY_NODE_LOGIC_CONDITION + ELogicConditionType condType; + SNodeList* pParameterList; +} SLogicConditionNode; + +typedef struct SIsNullCondNode { + ENodeType type; // QUERY_NODE_IS_NULL_CONDITION + SNode* pExpr; + bool isNull; +} SIsNullCondNode; + +typedef struct SNodeListNode { + ENodeType type; // QUERY_NODE_NODE_LIST + SNodeList* pNodeList; +} SNodeListNode; + +typedef struct SFunctionNode { + SExprNode node; // QUERY_NODE_FUNCTION + char functionName[TSDB_FUNC_NAME_LEN]; + int32_t funcId; + int32_t funcType; + SNodeList* pParameterList; +} SFunctionNode; + +typedef struct STableNode { + ENodeType type; + char dbName[TSDB_DB_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + char tableAlias[TSDB_TABLE_NAME_LEN]; +} STableNode; + +struct STableMeta; + +typedef struct SRealTableNode { + STableNode table; // QUERY_NODE_REAL_TABLE + struct STableMeta* pMeta; +} SRealTableNode; + +typedef struct STempTableNode { + STableNode table; // QUERY_NODE_TEMP_TABLE + SNode* pSubquery; +} STempTableNode; + +typedef enum EJoinType { + JOIN_TYPE_INNER = 1 +} EJoinType; + +typedef struct SJoinTableNode { + STableNode table; // QUERY_NODE_JOIN_TABLE + EJoinType joinType; + SNode* pLeft; + SNode* pRight; + SNode* pOnCond; +} SJoinTableNode; + +typedef enum EGroupingSetType { + GP_TYPE_NORMAL = 1 +} EGroupingSetType; + +typedef struct SGroupingSetNode { + ENodeType type; // QUERY_NODE_GROUPING_SET + EGroupingSetType groupingSetType; + SNodeList* pParameterList; +} SGroupingSetNode; + +typedef enum EOrder { + ORDER_ASC = 1, + ORDER_DESC +} EOrder; + +typedef enum ENullOrder { + NULL_ORDER_DEFAULT = 1, + NULL_ORDER_FIRST, + NULL_ORDER_LAST +} ENullOrder; + +typedef struct SOrderByExprNode { + ENodeType type; // QUERY_NODE_ORDER_BY_EXPR + SNode* pExpr; + EOrder order; + ENullOrder nullOrder; +} SOrderByExprNode; + +typedef struct SLimitNode { + ENodeType type; // QUERY_NODE_LIMIT + uint64_t limit; + uint64_t offset; +} SLimitNode; + +typedef struct SStateWindowNode { + ENodeType type; // QUERY_NODE_STATE_WINDOW + SNode* pCol; +} SStateWindowNode; + +typedef struct SSessionWindowNode { + ENodeType type; // QUERY_NODE_SESSION_WINDOW + int64_t gap; // gap between two session window(in microseconds) + SNode* pCol; +} SSessionWindowNode; + +typedef struct SIntervalWindowNode { + ENodeType type; // QUERY_NODE_INTERVAL_WINDOW + SNode* pInterval; // SValueNode + SNode* pOffset; // SValueNode + SNode* pSliding; // SValueNode + SNode* pFill; +} SIntervalWindowNode; + +typedef enum EFillMode { + FILL_MODE_NONE = 1, + FILL_MODE_VALUE, + FILL_MODE_PREV, + FILL_MODE_NULL, + FILL_MODE_LINEAR, + FILL_MODE_NEXT +} EFillMode; + +typedef struct SFillNode { + ENodeType type; // QUERY_NODE_FILL + EFillMode mode; + SNode* pValues; // SNodeListNode +} SFillNode; + +typedef struct SSelectStmt { + ENodeType type; // QUERY_NODE_SELECT_STMT + bool isDistinct; + SNodeList* pProjectionList; // SNode + SNode* pFromTable; + SNode* pWhere; + SNodeList* pPartitionByList; // SNode + SNode* pWindow; + SNodeList* pGroupByList; // SGroupingSetNode + SNode* pHaving; + SNodeList* pOrderByList; // SOrderByExprNode + SNode* pLimit; + SNode* pSlimit; +} SSelectStmt; + +typedef enum ESetOperatorType { + SET_OP_TYPE_UNION_ALL = 1 +} ESetOperatorType; + +typedef struct SSetOperator { + ENodeType type; // QUERY_NODE_SET_OPERATOR + ESetOperatorType opType; + SNode* pLeft; + SNode* pRight; + SNodeList* pOrderByList; // SOrderByExprNode + SNode* pLimit; +} SSetOperator; + +bool nodesIsExprNode(const SNode* pNode); + +bool nodesIsArithmeticOp(const SOperatorNode* pOp); +bool nodesIsComparisonOp(const SOperatorNode* pOp); +bool nodesIsJsonOp(const SOperatorNode* pOp); + +bool nodesIsTimeorderQuery(const SNode* pQuery); +bool nodesIsTimelineQuery(const SNode* pQuery); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QUERY_NODES_H_*/ \ No newline at end of file diff --git a/source/libs/parser/inc/astCreateFuncs.h b/source/libs/parser/inc/astCreateFuncs.h index 5b97a0e0c6..43bc5349e4 100644 --- a/source/libs/parser/inc/astCreateFuncs.h +++ b/source/libs/parser/inc/astCreateFuncs.h @@ -20,7 +20,7 @@ extern "C" { #endif -#include "nodes.h" +#include "querynodes.h" #include "nodesShowStmts.h" #include "astCreateContext.h" #include "ttoken.h" diff --git a/source/libs/parser/inc/parserImpl.h b/source/libs/parser/inc/parserImpl.h index 183075d465..b55060def7 100644 --- a/source/libs/parser/inc/parserImpl.h +++ b/source/libs/parser/inc/parserImpl.h @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "nodes.h" +#include "querynodes.h" #include "parser.h" #ifndef _TD_AST_CREATE_FUNCS_H_ @@ -25,6 +25,7 @@ extern "C" { typedef struct SQuery { SNode* pRoot; + // todo reslut meta } SQuery; int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery); diff --git a/source/libs/planner/inc/plannerImpl.h b/source/libs/planner/inc/plannerImpl.h new file mode 100644 index 0000000000..050329693c --- /dev/null +++ b/source/libs/planner/inc/plannerImpl.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_PLANNER_IMPL_H_ +#define _TD_PLANNER_IMPL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_PLANNER_IMPL_H_*/ diff --git a/source/libs/planner/src/plannerImpl.c b/source/libs/planner/src/plannerImpl.c new file mode 100644 index 0000000000..804f673fff --- /dev/null +++ b/source/libs/planner/src/plannerImpl.c @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +int32_t getPlan(SNode* pRoot, SQueryPlanNode** pQueryPlan) { + +} \ No newline at end of file diff --git a/source/nodes/src/nodesEqualFuncs.c b/source/nodes/src/nodesEqualFuncs.c index 41d1e5b05d..65eb6e9b32 100644 --- a/source/nodes/src/nodesEqualFuncs.c +++ b/source/nodes/src/nodesEqualFuncs.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "nodes.h" +#include "querynodes.h" #define COMPARE_SCALAR_FIELD(fldname) \ do { \ diff --git a/source/nodes/src/nodesTraverseFuncs.c b/source/nodes/src/nodesTraverseFuncs.c index 796aab611b..d1ded390db 100644 --- a/source/nodes/src/nodesTraverseFuncs.c +++ b/source/nodes/src/nodesTraverseFuncs.c @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#include "nodes.h" +#include "querynodes.h" typedef enum ETraversalOrder { TRAVERSAL_PREORDER = 1, TRAVERSAL_POSTORDER } ETraversalOrder; -static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext); +static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext); -static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) { +static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { if (NULL == pNode) { return DEAL_RES_CONTINUE; } @@ -119,7 +119,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FQueryNodeWalker w return res; } -static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNodeWalker walker, void* pContext) { +static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { SNode* node; FOREACH(node, pNodeList) { if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) { @@ -129,18 +129,144 @@ static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FQueryNode return DEAL_RES_CONTINUE; } -void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext) { +void nodesWalkNode(SNode* pNode, FNodeWalker walker, void* pContext) { (void)walkNode(pNode, TRAVERSAL_PREORDER, walker, pContext); } -void nodesWalkList(SNodeList* pNodeList, FQueryNodeWalker walker, void* pContext) { +void nodesWalkList(SNodeList* pNodeList, FNodeWalker walker, void* pContext) { (void)walkList(pNodeList, TRAVERSAL_PREORDER, walker, pContext); } -void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext) { +void nodesWalkNodePostOrder(SNode* pNode, FNodeWalker walker, void* pContext) { (void)walkNode(pNode, TRAVERSAL_POSTORDER, walker, pContext); } -void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext) { +void nodesWalkListPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext) { (void)walkList(pList, TRAVERSAL_POSTORDER, walker, pContext); } + +static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext); + +static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { + if (NULL == pRawNode || NULL == *pRawNode) { + return DEAL_RES_CONTINUE; + } + + EDealRes res = DEAL_RES_CONTINUE; + + if (TRAVERSAL_PREORDER == order) { + res = rewriter(pRawNode, pContext); + if (DEAL_RES_CONTINUE != res) { + return res; + } + } + + SNode* pNode = *pRawNode; + switch (nodeType(pNode)) { + case QUERY_NODE_COLUMN: + case QUERY_NODE_VALUE: + case QUERY_NODE_LIMIT: + // these node types with no subnodes + break; + case QUERY_NODE_OPERATOR: { + SOperatorNode* pOpNode = (SOperatorNode*)pNode; + res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext); + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext); + } + break; + } + case QUERY_NODE_LOGIC_CONDITION: + res = rewriteList(((SLogicConditionNode*)pNode)->pParameterList, order, rewriter, pContext); + break; + case QUERY_NODE_IS_NULL_CONDITION: + res = rewriteNode(&(((SIsNullCondNode*)pNode)->pExpr), order, rewriter, pContext); + break; + case QUERY_NODE_FUNCTION: + res = rewriteList(((SFunctionNode*)pNode)->pParameterList, order, rewriter, pContext); + break; + case QUERY_NODE_REAL_TABLE: + case QUERY_NODE_TEMP_TABLE: + break; // todo + case QUERY_NODE_JOIN_TABLE: { + SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; + res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext); + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext); + } + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext); + } + break; + } + case QUERY_NODE_GROUPING_SET: + res = rewriteList(((SGroupingSetNode*)pNode)->pParameterList, order, rewriter, pContext); + break; + case QUERY_NODE_ORDER_BY_EXPR: + res = rewriteNode(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext); + break; + case QUERY_NODE_STATE_WINDOW: + res = rewriteNode(&(((SStateWindowNode*)pNode)->pCol), order, rewriter, pContext); + break; + case QUERY_NODE_SESSION_WINDOW: + res = rewriteNode(&(((SSessionWindowNode*)pNode)->pCol), order, rewriter, pContext); + break; + case QUERY_NODE_INTERVAL_WINDOW: { + SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; + res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext); + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext); + } + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext); + } + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); + } + break; + } + case QUERY_NODE_NODE_LIST: + res = rewriteList(((SNodeListNode*)pNode)->pNodeList, order, rewriter, pContext); + break; + case QUERY_NODE_FILL: + res = rewriteNode(&(((SFillNode*)pNode)->pValues), order, rewriter, pContext); + break; + case QUERY_NODE_RAW_EXPR: + res = rewriteNode(&(((SRawExprNode*)pNode)->pNode), order, rewriter, pContext); + break; + default: + break; + } + + if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) { + res = rewriter(pRawNode, pContext); + } + + return res; +} + +static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { + SNode** pNode; + FOREACH_FOR_REWRITE(pNode, pNodeList) { + if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) { + return DEAL_RES_ERROR; + } + } + return DEAL_RES_CONTINUE; +} + +void nodesRewriteNode(SNode** pNode, FNodeRewriter rewriter, void* pContext) { + (void)rewriteNode(pNode, TRAVERSAL_PREORDER, rewriter, pContext); +} + +void nodesRewriteList(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { + (void)rewriteList(pList, TRAVERSAL_PREORDER, rewriter, pContext); +} + +void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext) { + (void)rewriteNode(pNode, TRAVERSAL_POSTORDER, rewriter, pContext); +} + +void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { + (void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext); +} diff --git a/source/nodes/src/nodesUtilFuncs.c b/source/nodes/src/nodesUtilFuncs.c index e0e589157c..5d4ddd0e03 100644 --- a/source/nodes/src/nodesUtilFuncs.c +++ b/source/nodes/src/nodesUtilFuncs.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "nodes.h" +#include "querynodes.h" #include "nodesShowStmts.h" #include "taoserror.h" diff --git a/source/nodes/test/nodesTest.cpp b/source/nodes/test/nodesTest.cpp index 7df3cd8b4c..c116faf4ce 100644 --- a/source/nodes/test/nodesTest.cpp +++ b/source/nodes/test/nodesTest.cpp @@ -15,8 +15,44 @@ #include +#include "querynodes.h" + +using namespace std; + +static EDealRes rewriterTest(SNode** pNode, void* pContext) { + EDealRes* pRes = (EDealRes*)pContext; + if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { + SOperatorNode* pOp = (SOperatorNode*)(*pNode); + if (QUERY_NODE_VALUE != nodeType(pOp->pLeft) || QUERY_NODE_VALUE != nodeType(pOp->pRight)) { + *pRes = DEAL_RES_ERROR; + } + SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + string tmp = to_string(stoi(((SValueNode*)(pOp->pLeft))->literal) + stoi(((SValueNode*)(pOp->pRight))->literal)); + pVal->literal = strdup(tmp.c_str()); + nodesDestroyNode(*pNode); + *pNode = (SNode*)pVal; + } + return DEAL_RES_CONTINUE; +} + TEST(NodesTest, traverseTest) { - // todo + SNode* pRoot = nodesMakeNode(QUERY_NODE_OPERATOR); + SOperatorNode* pOp = (SOperatorNode*)pRoot; + SOperatorNode* pLeft = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR); + pLeft->pLeft = nodesMakeNode(QUERY_NODE_VALUE); + ((SValueNode*)(pLeft->pLeft))->literal = strdup("10"); + pLeft->pRight = nodesMakeNode(QUERY_NODE_VALUE); + ((SValueNode*)(pLeft->pRight))->literal = strdup("5"); + pOp->pLeft = (SNode*)pLeft; + pOp->pRight = nodesMakeNode(QUERY_NODE_VALUE); + ((SValueNode*)(pOp->pRight))->literal = strdup("3"); + + EXPECT_EQ(nodeType(pRoot), QUERY_NODE_OPERATOR); + EDealRes res = DEAL_RES_CONTINUE; + nodesRewriteNodePostOrder(&pRoot, rewriterTest, &res); + EXPECT_EQ(res, DEAL_RES_CONTINUE); + EXPECT_EQ(nodeType(pRoot), QUERY_NODE_VALUE); + EXPECT_EQ(string(((SValueNode*)pRoot)->literal), "18"); } int main(int argc, char* argv[]) { From 1984e22587d469cd38d414da67d6a7a6e77717c0 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 10 Feb 2022 05:47:59 -0500 Subject: [PATCH 2/6] TD-13338 SELECT statement translate code --- source/libs/planner/src/plannerImpl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/planner/src/plannerImpl.c b/source/libs/planner/src/plannerImpl.c index 804f673fff..6acb927db9 100644 --- a/source/libs/planner/src/plannerImpl.c +++ b/source/libs/planner/src/plannerImpl.c @@ -14,6 +14,6 @@ */ -int32_t getPlan(SNode* pRoot, SQueryPlanNode** pQueryPlan) { +// int32_t getPlan(SNode* pRoot, SQueryPlanNode** pQueryPlan) { -} \ No newline at end of file +// } \ No newline at end of file From 7e3e7fd7868ae3ebfbf869b0e30696dbc804bae2 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 10 Feb 2022 05:52:16 -0500 Subject: [PATCH 3/6] TD-13338 SELECT statement translate code --- source/libs/parser/src/parserImpl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parserImpl.c b/source/libs/parser/src/parserImpl.c index cd42b9ad64..84486079c3 100644 --- a/source/libs/parser/src/parserImpl.c +++ b/source/libs/parser/src/parserImpl.c @@ -533,7 +533,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_BIGINT: { char* endPtr = NULL; - pVal->datum.i = strtoull(pVal->literal, &endPtr, 10); + pVal->datum.i = strtoll(pVal->literal, &endPtr, 10); break; } case TSDB_DATA_TYPE_UTINYINT: @@ -563,7 +563,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { int32_t n = strlen(pVal->literal); char* tmp = calloc(1, n); int32_t len = trimStringCopy(pVal->literal, n, tmp); - if (taosParseTime(tmp, &pVal->datum.u, len, pVal->node.resType.precision, tsDaylight) != TSDB_CODE_SUCCESS) { + if (taosParseTime(tmp, &pVal->datum.i, len, pVal->node.resType.precision, tsDaylight) != TSDB_CODE_SUCCESS) { tfree(tmp); generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); return DEAL_RES_ERROR; From d5228ebd0741666c06850dcf0007a5f8d0232efa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Feb 2022 15:37:13 +0800 Subject: [PATCH 4/6] refactor code --- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 16 +++++--- source/libs/transport/src/transComm.c | 4 +- source/libs/transport/src/transSrv.c | 54 ++++++++++++++++++------- source/libs/transport/test/pushServer.c | 2 +- 5 files changed, 53 insertions(+), 25 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bec0375dbe..6f8da57ee7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -233,7 +233,7 @@ typedef struct { uv_async_t* asyncs; } SAsyncPool; -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb); +SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); void transDestroyAsyncPool(SAsyncPool* pool); int transSendAsync(SAsyncPool* pool, queue* mq); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2a4a1891ed..00d9174e76 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -38,6 +38,7 @@ typedef struct SCliConn { int32_t ref; // debug and log info struct sockaddr_in addr; + struct sockaddr_in locaddr; } SCliConn; typedef struct SCliMsg { @@ -130,8 +131,9 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), - ntohs(conn->addr.sin_port)); + tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), + inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), + ntohs(conn->locaddr.sin_port)); if (conn->push != NULL && conn->notifyCount != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); @@ -417,8 +419,9 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port)); + tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -433,6 +436,9 @@ static void clientConnCb(uv_connect_t* req, int status) { int addrlen = sizeof(pConn->addr); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen); + addrlen = sizeof(pConn->locaddr); + uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); + tTrace("client conn %p create", pConn); assert(pConn->stream == req->handle); @@ -579,7 +585,7 @@ static SCliThrdObj* createThrdObj() { pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); pThrd->timer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->timer); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 05b732b8cb..7aa5aa16f1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -250,9 +250,7 @@ int transDestroyBuffer(SConnBuffer* buf) { transClearBuffer(buf); } -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) { - static int sz = 10; - +SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; pool->nAsync = sz; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 5f4daef344..15561c184c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -31,7 +31,8 @@ typedef struct SSrvConn { void* pTransInst; // rpc init void* ahandle; // void* hostThrd; - void* pSrvMsg; + SArray* srvMsgs; + // void* pSrvMsg; struct sockaddr_in addr; @@ -94,6 +95,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); +static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); @@ -310,14 +312,19 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; - - SSrvMsg* smsg = conn->pSrvMsg; - destroySmsg(smsg); - conn->pSrvMsg = NULL; - transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); + assert(taosArrayGetSize(conn->srvMsgs) >= 1); + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + taosArrayRemove(conn->srvMsgs, 0); + destroySmsg(msg); + + // send second data, just use for push + if (taosArrayGetSize(conn->srvMsgs) > 0) { + msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + uvStartSendRespInternal(msg); + } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); // @@ -361,20 +368,29 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { wb->base = msg; wb->len = len; } -static void uvStartSendResp(SSrvMsg* smsg) { - // impl + +static void uvStartSendRespInternal(SSrvMsg* smsg) { uv_buf_t wb; uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; uv_timer_stop(pConn->pTimer); - pConn->pSrvMsg = smsg; + // pConn->pSrvMsg = smsg; // conn->pWriter->data = smsg; uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); - - // SRpcMsg* rpcMsg = smsg->msg; - +} +static void uvStartSendResp(SSrvMsg* smsg) { + // impl + SSrvConn* pConn = smsg->pConn; + if (taosArrayGetSize(pConn->srvMsgs) > 0) { + tDebug("server conn %p push data to client %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port)); + taosArrayPush(pConn->srvMsgs, &smsg); + return; + } + taosArrayPush(pConn->srvMsgs, &smsg); + uvStartSendRespInternal(smsg); return; } static void destroySmsg(SSrvMsg* smsg) { @@ -531,7 +547,7 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; } @@ -571,6 +587,7 @@ void* workerThread(void* arg) { static SSrvConn* createConn() { SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); + pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); ++pConn->ref; return pConn; @@ -585,8 +602,15 @@ static void destroyConn(SSrvConn* conn, bool clear) { return; } transDestroyBuffer(&conn->readBuf); - destroySmsg(conn->pSrvMsg); - conn->pSrvMsg = NULL; + + for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); + destroySmsg(msg); + } + taosArrayDestroy(conn->srvMsgs); + + // destroySmsg(conn->pSrvMsg); + // conn->pSrvMsg = NULL; if (clear) { uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index f9115d3d4f..0bcc47383b 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -77,7 +77,7 @@ void processShellMsg() { taosFreeQitem(pRpcMsg); { - sleep(1); + // sleep(1); SRpcMsg nRpcMsg = {0}; nRpcMsg.pCont = rpcMallocCont(msgSize); nRpcMsg.contLen = msgSize; From 7c4142dfd1b7965c1c482884b308a4e513bb1038 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Feb 2022 16:13:29 +0800 Subject: [PATCH 5/6] refactor code --- source/libs/transport/src/trans.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 707f23113b..c3d3cfa2ab 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -30,8 +30,11 @@ void* rpcOpen(const SRpcInit* pInit) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->cfp = pInit->cfp; - // pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->numOfThreads = pInit->numOfThreads; + if (pInit->connType == TAOS_CONN_SERVER) { + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + } else { + pRpc->numOfThreads = pInit->numOfThreads; + } pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); From 6ae1bc51dce15df61c1264a2b339b4912ec6eb65 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Feb 2022 16:36:38 +0800 Subject: [PATCH 6/6] add debug info --- source/libs/transport/src/transSrv.c | 29 ++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 15561c184c..7ddeb99c9d 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,6 +35,7 @@ typedef struct SSrvConn { // void* pSrvMsg; struct sockaddr_in addr; + struct sockaddr_in locaddr; // SRpcMsg sendMsg; // del later @@ -265,8 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; - tDebug("server conn %p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port)); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -361,8 +363,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (transCompressMsg(msg, len, NULL)) { // impl later } - tDebug("server conn %p %s is sent to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port)); + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); pHead->msgLen = htonl(len); wb->base = msg; @@ -384,8 +387,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; if (taosArrayGetSize(pConn->srvMsgs) > 0) { - tDebug("server conn %p push data to client %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port)); + tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosArrayPush(pConn->srvMsgs, &smsg); return; } @@ -512,13 +515,23 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tTrace("server conn %p created, fd: %d", pConn, fd); + int addrlen = sizeof(pConn->addr); if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { tError("server conn %p failed to get peer info", pConn); destroyConn(pConn, true); - } else { - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + return; } + + addrlen = sizeof(pConn->locaddr); + if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) { + tError("server conn %p failed to get local info", pConn); + destroyConn(pConn, true); + return; + } + + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + } else { tDebug("failed to create new connection"); destroyConn(pConn, true);