merge from 3.0

This commit is contained in:
Liu Jicong 2022-02-21 10:52:25 +08:00
commit 925a9fbd51
41 changed files with 2766 additions and 651 deletions

View File

@ -62,6 +62,7 @@ typedef enum ENodeType {
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_COLUMN_REF,
QUERY_NODE_TARGET,
// Only be used in parser module.
QUERY_NODE_RAW_EXPR,
@ -93,7 +94,7 @@ typedef struct SListCell {
} SListCell;
typedef struct SNodeList {
int16_t length;
int32_t length;
SListCell* pHead;
SListCell* pTail;
} SNodeList;
@ -103,6 +104,7 @@ void nodesDestroyNode(SNode* pNode);
SNodeList* nodesMakeList();
int32_t nodesListAppend(SNodeList* pList, SNode* pNode);
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
SNode* nodesListGetNode(SNodeList* pList, int32_t index);
void nodesDestroyList(SNodeList* pList);

View File

@ -0,0 +1,65 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PLANN_NODES_H_
#define _TD_PLANN_NODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "querynodes.h"
typedef struct SLogicNode {
ENodeType type;
int32_t id;
SNodeList* pTargets; // SColumnNode
SNode* pConditions;
SNodeList* pChildren;
struct SLogicNode* pParent;
} SLogicNode;
typedef struct SScanLogicNode {
SLogicNode node;
SNodeList* pScanCols;
struct STableMeta* pMeta;
} SScanLogicNode;
typedef struct SJoinLogicNode {
SLogicNode node;
EJoinType joinType;
SNode* pOnConditions;
} SJoinLogicNode;
typedef struct SFilterLogicNode {
SLogicNode node;
} SFilterLogicNode;
typedef struct SAggLogicNode {
SLogicNode node;
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
} SAggLogicNode;
typedef struct SProjectLogicNode {
SLogicNode node;
SNodeList* pProjections;
} SProjectLogicNode;
#ifdef __cplusplus
}
#endif
#endif /*_TD_PLANN_NODES_H_*/

View File

@ -62,8 +62,10 @@ typedef struct SColumnNode {
typedef struct SColumnRefNode {
ENodeType type;
int32_t tupleId;
int32_t slotId;
SDataType dataType;
int16_t tupleId;
int16_t slotId;
int16_t columnId;
} SColumnRefNode;
typedef struct SValueNode {
@ -106,6 +108,12 @@ typedef enum EOperatorType {
OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_IS_TRUE,
OP_TYPE_IS_FALSE,
OP_TYPE_IS_UNKNOWN,
OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE,
OP_TYPE_IS_NOT_UNKNOWN,
// json operator
OP_TYPE_JSON_GET_VALUE,
@ -285,7 +293,7 @@ typedef enum ESqlClause {
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols);
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, SNodeList** pCols);
typedef bool (*FFuncClassifier)(int32_t funcId);
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs);

View File

@ -13,33 +13,31 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TDB_DB_H_
#define _TD_TDB_DB_H_
#include "tdb_mpool.h"
#ifndef _TD_NEW_PARSER_H_
#define _TD_NEW_PARSER_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct TDB TDB;
#include "parser.h"
struct TDB {
char * fname;
char * dbname;
TDB_MPFILE *mpf;
// union {
// TDB_BTREE *btree;
// TDB_HASH * hash;
// TDB_HEAP * heap;
// } dbam; // db access method
};
typedef enum EStmtType {
STMT_TYPE_CMD = 1,
STMT_TYPE_QUERY
} EStmtType;
int tdbOpen(TDB **dbpp, const char *fname, const char *dbname, uint32_t flags);
int tdbClose(TDB *dbp, uint32_t flags);
typedef struct SQuery {
EStmtType stmtType;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
} SQuery;
int32_t parser(SParseContext* pParseCxt, SQuery* pQuery);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TDB_DB_H_*/
#endif /*_TD_NEW_PARSER_H_*/

View File

@ -24,367 +24,371 @@ extern "C" {
#define TSDB__packed
#define TSKEY int64_t
#define TSKEY_MIN INT64_MIN
#define TSKEY_MAX (INT64_MAX - 1)
#define TSKEY int64_t
#define TSKEY_MIN INT64_MIN
#define TSKEY_MAX (INT64_MAX - 1)
#define TSKEY_INITIAL_VAL TSKEY_MIN
// Bytes for each type.
extern const int32_t TYPE_BYTES[15];
// TODO: replace and remove code below
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(int32_t)
#define CHAR_BYTES sizeof(char)
#define SHORT_BYTES sizeof(int16_t)
#define INT_BYTES sizeof(int32_t)
#define LONG_BYTES sizeof(int64_t)
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(int32_t)
// NULL definition
#define TSDB_DATA_BOOL_NULL 0x02
#define TSDB_DATA_TINYINT_NULL 0x80
#define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_BOOL_NULL 0x02
#define TSDB_DATA_TINYINT_NULL 0x80
#define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF
#define TSDB_DATA_BINARY_NULL 0xFF
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define TSDB_DATA_NCHAR_NULL 0xFFFFFFFF
#define TSDB_DATA_BINARY_NULL 0xFF
#define TSDB_DATA_UTINYINT_NULL 0xFF
#define TSDB_DATA_USMALLINT_NULL 0xFFFF
#define TSDB_DATA_UINT_NULL 0xFFFFFFFF
#define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define TSDB_DATA_UTINYINT_NULL 0xFF
#define TSDB_DATA_USMALLINT_NULL 0xFFFF
#define TSDB_DATA_UINT_NULL 0xFFFFFFFF
#define TSDB_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define TSDB_DATA_NULL_STR "NULL"
#define TSDB_DATA_NULL_STR_L "null"
#define TSDB_DATA_NULL_STR "NULL"
#define TSDB_DATA_NULL_STR_L "null"
#define TSDB_NETTEST_USER "nettestinternal"
#define TSDB_DEFAULT_USER "root"
#define TSDB_NETTEST_USER "nettestinternal"
#define TSDB_DEFAULT_USER "root"
#ifdef _TD_POWER_
#define TSDB_DEFAULT_PASS "powerdb"
#define TSDB_DEFAULT_PASS "powerdb"
#elif (_TD_TQ_ == true)
#define TSDB_DEFAULT_PASS "tqueue"
#define TSDB_DEFAULT_PASS "tqueue"
#elif (_TD_PRO_ == true)
#define TSDB_DEFAULT_PASS "prodb"
#define TSDB_DEFAULT_PASS "prodb"
#else
#define TSDB_DEFAULT_PASS "taosdata"
#define TSDB_DEFAULT_PASS "taosdata"
#endif
#define SHELL_MAX_PASSWORD_LEN 20
#define SHELL_MAX_PASSWORD_LEN 20
#define TSDB_TRUE 1
#define TSDB_FALSE 0
#define TSDB_OK 0
#define TSDB_TRUE 1
#define TSDB_FALSE 0
#define TSDB_OK 0
#define TSDB_ERR -1
#define TS_PATH_DELIMITER "."
#define TS_ESCAPE_CHAR '`'
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_MILLI 0
#define TSDB_TIME_PRECISION_MICRO 1
#define TSDB_TIME_PRECISION_NANO 2
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us"
#define TSDB_TIME_PRECISION_NANO_STR "ns"
#define TSDB_TICK_PER_SECOND(precision) \
((int64_t)((precision) == TSDB_TIME_PRECISION_MILLI ? 1e3L \
: ((precision) == TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
#define TSDB_TICK_PER_SECOND(precision) ((int64_t)((precision)==TSDB_TIME_PRECISION_MILLI ? 1e3L : ((precision)==TSDB_TIME_PRECISION_MICRO ? 1e6L : 1e9L)))
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
#define T_APPEND_MEMBER(dst, ptr, type, member) \
do { \
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \
} while (0)
#define T_READ_MEMBER(src, type, target) \
do { \
(target) = *(type *)(src); \
(src) = (void *)((char *)src + sizeof(type)); \
} while (0)
#define T_APPEND_MEMBER(dst, ptr, type, member) \
do {\
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\
} while(0)
#define T_READ_MEMBER(src, type, target) \
do { \
(target) = *(type *)(src); \
(src) = (void *)((char *)src + sizeof(type));\
} while(0)
// TODO: check if below is necessary
#define TSDB_RELATION_INVALID 0
#define TSDB_RELATION_LESS 1
#define TSDB_RELATION_GREATER 2
#define TSDB_RELATION_EQUAL 3
#define TSDB_RELATION_LESS_EQUAL 4
#define TSDB_RELATION_INVALID 0
#define TSDB_RELATION_LESS 1
#define TSDB_RELATION_GREATER 2
#define TSDB_RELATION_EQUAL 3
#define TSDB_RELATION_LESS_EQUAL 4
#define TSDB_RELATION_GREATER_EQUAL 5
#define TSDB_RELATION_NOT_EQUAL 6
#define TSDB_RELATION_LIKE 7
#define TSDB_RELATION_ISNULL 8
#define TSDB_RELATION_NOTNULL 9
#define TSDB_RELATION_IN 10
#define TSDB_RELATION_NOT_EQUAL 6
#define TSDB_RELATION_LIKE 7
#define TSDB_RELATION_ISNULL 8
#define TSDB_RELATION_NOTNULL 9
#define TSDB_RELATION_IN 10
#define TSDB_RELATION_AND 11
#define TSDB_RELATION_OR 12
#define TSDB_RELATION_NOT 13
#define TSDB_RELATION_AND 11
#define TSDB_RELATION_OR 12
#define TSDB_RELATION_NOT 13
#define TSDB_RELATION_MATCH 14
#define TSDB_RELATION_NMATCH 15
#define TSDB_RELATION_MATCH 14
#define TSDB_RELATION_NMATCH 15
#define TSDB_BINARY_OP_ADD 4000
#define TSDB_BINARY_OP_SUBTRACT 4001
#define TSDB_BINARY_OP_MULTIPLY 4002
#define TSDB_BINARY_OP_DIVIDE 4003
#define TSDB_BINARY_OP_REMAINDER 4004
#define TSDB_BINARY_OP_CONCAT 4005
#define TSDB_BINARY_OP_ADD 4000
#define TSDB_BINARY_OP_SUBTRACT 4001
#define TSDB_BINARY_OP_MULTIPLY 4002
#define TSDB_BINARY_OP_DIVIDE 4003
#define TSDB_BINARY_OP_REMAINDER 4004
#define TSDB_BINARY_OP_CONCAT 4005
#define FUNCTION_CEIL 4500
#define FUNCTION_FLOOR 4501
#define FUNCTION_ABS 4502
#define FUNCTION_ROUND 4503
#define FUNCTION_CEIL 4500
#define FUNCTION_FLOOR 4501
#define FUNCTION_ABS 4502
#define FUNCTION_ROUND 4503
#define FUNCTION_LENGTH 4800
#define FUNCTION_CONCAT 4801
#define FUNCTION_LTRIM 4802
#define FUNCTION_RTRIM 4803
#define FUNCTION_LENGTH 4800
#define FUNCTION_CONCAT 4801
#define FUNCTION_LTRIM 4802
#define FUNCTION_RTRIM 4803
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_RELATION_OPTR(op) (((op) >= TSDB_RELATION_LESS) && ((op) < TSDB_RELATION_IN))
#define IS_ARITHMETIC_OPTR(op) (((op) >= TSDB_BINARY_OP_ADD) && ((op) <= TSDB_BINARY_OP_REMAINDER))
#define TSDB_NAME_DELIMITER_LEN 1
#define TSDB_NAME_DELIMITER_LEN 1
#define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN
#define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN
// ACCOUNT is a 32 bit positive integer
// this is the length of its string representation, including the terminator zero
#define TSDB_ACCT_ID_LEN 11
#define TSDB_ACCT_ID_LEN 11
#define TSDB_MAX_COLUMNS 4096
#define TSDB_MIN_COLUMNS 2 // PRIMARY COLUMN(timestamp) + other columns
#define TSDB_MAX_COLUMNS 4096
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 4096
#define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 4096
#define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_MAX_RETRIEVE 1024
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_STB_COMMENT_LEN 1024
/**
* In some scenarios uint16_t (0~65535) is used to store the row len.
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
* - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus
* the final value is 65531-(4096-1)*4 = 49151.
*/
#define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_STB_COMMENT_LEN 1024
/**
* In some scenarios uint16_t (0~65535) is used to store the row len.
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
* - Secondly, if all cols are VarDataT type except primary key, we need 4 bits to store the offset, thus
* the final value is 65531-(4096-1)*4 = 49151.
*/
#define TSDB_MAX_BYTES_PER_ROW 49151
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_AUTH_LEN 16
#define TSDB_PASSWORD_LEN 32
#define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8
#define TSDB_AUTH_LEN 16
#define TSDB_PASSWORD_LEN 32
#define TSDB_USET_PASSWORD_LEN 129
#define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8
#define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_DESC_LEN 16
#define TSDB_TRANS_ERROR_LEN 128
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_ERROR_MSG_LEN 1024
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_ERROR_MSG_LEN 1024
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24
#define TSDB_MQTT_PASS_LEN 24
#define TSDB_MQTT_TOPIC_LEN 64
#define TSDB_MQTT_CLIENT_ID_LEN 32
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24
#define TSDB_MQTT_PASS_LEN 24
#define TSDB_MQTT_TOPIC_LEN 64
#define TSDB_MQTT_CLIENT_ID_LEN 32
#define TSDB_DB_TYPE_DEFAULT 0
#define TSDB_DB_TYPE_TOPIC 1
#define TSDB_DB_TYPE_DEFAULT 0
#define TSDB_DB_TYPE_TOPIC 1
#define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 16
#define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096
#define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_DEFAULT_VN_PER_DB 2
#define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
#define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
#define TSDB_MIN_TOTAL_BLOCKS 3
#define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_TOTAL_BLOCKS 3
#define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650
#define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650
#define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_KEEP 1 // data in db to be reserved.
#define TSDB_MAX_KEEP 365000 // data in db to be reserved.
#define TSDB_DEFAULT_KEEP 3650 // ten years
#define TSDB_MIN_KEEP 1 // data in db to be reserved.
#define TSDB_MAX_KEEP 365000 // data in db to be reserved.
#define TSDB_DEFAULT_KEEP 3650 // ten years
#define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MIN_MIN_ROW_FBLOCK 10
#define TSDB_MAX_MIN_ROW_FBLOCK 1000
#define TSDB_DEFAULT_MIN_ROW_FBLOCK 100
#define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MIN_MAX_ROW_FBLOCK 200
#define TSDB_MAX_MAX_ROW_FBLOCK 10000
#define TSDB_DEFAULT_MAX_ROW_FBLOCK 4096
#define TSDB_MIN_COMMIT_TIME 30
#define TSDB_MAX_COMMIT_TIME 40960
#define TSDB_DEFAULT_COMMIT_TIME 3600
#define TSDB_MIN_COMMIT_TIME 30
#define TSDB_MAX_COMMIT_TIME 40960
#define TSDB_DEFAULT_COMMIT_TIME 3600
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_WAL_LEVEL 0
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO
#define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MIN_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MAX_PRECISION TSDB_TIME_PRECISION_NANO
#define TSDB_DEFAULT_PRECISION TSDB_TIME_PRECISION_MILLI
#define TSDB_MIN_COMP_LEVEL 0
#define TSDB_MAX_COMP_LEVEL 2
#define TSDB_DEFAULT_COMP_LEVEL 2
#define TSDB_MIN_COMP_LEVEL 0
#define TSDB_MAX_COMP_LEVEL 2
#define TSDB_DEFAULT_COMP_LEVEL 2
#define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
#define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
#define TSDB_MIN_DB_QUORUM_OPTION 1
#define TSDB_MAX_DB_QUORUM_OPTION 2
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
#define TSDB_MIN_DB_QUORUM_OPTION 1
#define TSDB_MAX_DB_QUORUM_OPTION 2
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 2
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_UPDATE 0
#define TSDB_MAX_DB_UPDATE 2
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_FIELD_LEN 16384
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
#define TSDB_MAX_FIELD_LEN 16384
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN - TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
/*
* 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag
*/
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_TYPE_NEST_SUBQUERY 0x1000u // nested sub query
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_ORDER_ASC 1
#define TSDB_ORDER_DESC 2
#define TSDB_ORDER_ASC 1
#define TSDB_ORDER_DESC 2
#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1
#define TSDB_DEFAULT_MNODES_HASH_SIZE 5
#define TSDB_DEFAULT_DNODES_HASH_SIZE 10
#define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10
#define TSDB_DEFAULT_USERS_HASH_SIZE 20
#define TSDB_DEFAULT_DBS_HASH_SIZE 100
#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_DEFAULT_CLUSTER_HASH_SIZE 1
#define TSDB_DEFAULT_MNODES_HASH_SIZE 5
#define TSDB_DEFAULT_DNODES_HASH_SIZE 10
#define TSDB_DEFAULT_ACCOUNTS_HASH_SIZE 10
#define TSDB_DEFAULT_USERS_HASH_SIZE 20
#define TSDB_DEFAULT_DBS_HASH_SIZE 100
#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
#define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_MAX_WAL_SIZE (1024 * 1024 * 3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TFS_MAX_TIERS 3
#define TFS_MAX_TIERS 3
#define TFS_MAX_DISKS_PER_TIER 16
#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };

50
include/util/tjson.h Normal file
View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_JSON_H_
#define _TD_UTIL_JSON_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
typedef void SJson;
SJson* tjsonCreateObject();
void tjsonDelete(SJson* pJson);
SJson* tjsonAddArrayToObject(SJson* pJson, const char* pName);
int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t number);
int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal);
int32_t tjsonAddItemToObject(SJson* pJson, const char* pName, SJson* pItem);
int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem);
typedef int32_t (*FToJson)(const void* pObj, SJson* pJson);
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj);
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj);
typedef int32_t (*FFromJson)(const SJson* pJson, void* pObj);
char* tjsonToString(const SJson* pJson);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_JSON_H_*/

View File

@ -104,6 +104,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;

View File

@ -85,6 +85,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
mndTransPullup(pMnode);
sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer);
if (sdbVer != lastSdbVer) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);

View File

@ -221,7 +221,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto TRANS_DECODE_OVER;
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
int16_t type = 0;
@ -353,8 +352,60 @@ static const char *mndTransStr(ETrnStage stage) {
static const char *mndTransType(ETrnType type) {
switch (type) {
case TRN_TYPE_CREATE_USER:
return "create-user";
case TRN_TYPE_ALTER_USER:
return "alter-user";
case TRN_TYPE_DROP_USER:
return "drop-user";
case TRN_TYPE_CREATE_FUNC:
return "create-func";
case TRN_TYPE_DROP_FUNC:
return "drop-func";
case TRN_TYPE_CREATE_SNODE:
return "create-snode";
case TRN_TYPE_DROP_SNODE:
return "drop-snode";
case TRN_TYPE_CREATE_QNODE:
return "create-qnode";
case TRN_TYPE_DROP_QNODE:
return "drop-qnode";
case TRN_TYPE_CREATE_BNODE:
return "create-bnode";
case TRN_TYPE_DROP_BNODE:
return "drop-bnode";
case TRN_TYPE_CREATE_MNODE:
return "create-mnode";
case TRN_TYPE_DROP_MNODE:
return "drop-mnode";
case TRN_TYPE_CREATE_TOPIC:
return "create-topic";
case TRN_TYPE_DROP_TOPIC:
return "drop-topic";
case TRN_TYPE_SUBSCRIBE:
return "subscribe";
case TRN_TYPE_REBALANCE:
return "rebalance";
case TRN_TYPE_CREATE_DNODE:
return "create-qnode";
case TRN_TYPE_DROP_DNODE:
return "drop-qnode";
case TRN_TYPE_CREATE_DB:
return "create-db";
case TRN_TYPE_ALTER_DB:
return "alter-db";
case TRN_TYPE_DROP_DB:
return "drop-db";
case TRN_TYPE_SPLIT_VGROUP:
return "split-vgroup";
case TRN_TYPE_MERGE_VGROUP:
return "merge-vgroup";
case TRN_TYPE_CREATE_STB:
return "create-stb";
case TRN_TYPE_ALTER_STB:
return "alter-stb";
case TRN_TYPE_DROP_STB:
return "drop-stb";
default:
return "invalid";
}
@ -391,6 +442,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
}
if (pNew->stage == TRN_STAGE_ROLLBACK) {
pNew->stage = TRN_STAGE_FINISHED;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED));
}
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
pOld->stage = pNew->stage;
@ -423,6 +479,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
pTrans->transType = type;
pTrans->createdTime = taosGetTimestampMs();
pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
@ -754,6 +811,9 @@ void mndTransProcessRsp(SMnodeMsg *pRsp) {
if (pAction != NULL) {
pAction->msgReceived = 1;
pAction->errCode = pRsp->rpcMsg.code;
if (pAction->errCode != 0) {
tstrncpy(pTrans->lastError, tstrerror(pAction->errCode), TSDB_TRANS_ERROR_LEN);
}
}
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
@ -1059,6 +1119,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
while (continueExec) {
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
@ -1170,10 +1231,9 @@ static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
}
code = mndKillTrans(pMnode, pTrans);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
KILL_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
}
@ -1228,7 +1288,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE;
pShow->bytes[cols] = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "type");
pSchema[cols].bytes = pShow->bytes[cols];
@ -1288,11 +1348,7 @@ static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *name = mnGetDbStr(pTrans->dbname);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
} else {
STR_TO_VARSTR(pWrite, "-");
}
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;

View File

@ -28,7 +28,7 @@ class MndTestTrans : public ::testing::Test {
static void KillThenRestartServer() {
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
FileFd fd = taosOpenFileRead(file);
int32_t size = 1024 * 1024;
int32_t size = 3 * 1024 * 1024;
void* buffer = malloc(size);
int32_t readLen = taosReadFile(fd, buffer, size);
if (readLen < 0 || readLen == size) {
@ -61,6 +61,37 @@ class MndTestTrans : public ::testing::Test {
Testbase MndTestTrans::test;
TestServer MndTestTrans::server2;
TEST_F(MndTestTrans, 00_Create_User_Crash) {
{
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
CHECK_META("show trans", 7);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
{
SKillTransReq killReq = {0};
killReq.transId = 3;
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSKillTransReq(pReq, contLen, &killReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TRANS_NOT_EXIST);
}
}
TEST_F(MndTestTrans, 01_Create_User_Crash) {
{
SCreateUserReq createReq = {0};
@ -171,6 +202,57 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// show trans
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
CHECK_META("show trans", 7);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt32(4);
CheckTimestamp();
CheckBinary("undoAction", TSDB_TRANS_STAGE_LEN);
CheckBinary("", TSDB_DB_NAME_LEN - 1);
CheckBinary("create-qnode", TSDB_TRANS_TYPE_LEN);
CheckTimestamp();
CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1);
}
// kill trans
{
SKillTransReq killReq = {0};
killReq.transId = 4;
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSKillTransReq(pReq, contLen, &killReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
// show trans
{
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
// re-create trans
{
SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 2;
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
KillThenRestartServer();
server2.DoStart();
@ -200,4 +282,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
}
}
}
// create db
// partial create stb
// drop db failed
// create stb failed
// start
// create stb success
// drop db success

View File

@ -1,7 +1,24 @@
set(META_DB_IMPL_LIST "BDB" "TDB")
set(META_DB_IMPL "BDB" CACHE STRING "Use BDB as the default META implementation")
set_property(CACHE META_DB_IMPL PROPERTY STRINGS ${META_DB_IMPL_LIST})
if(META_DB_IMPL IN_LIST META_DB_IMPL_LIST)
message(STATUS "META DB Impl: ${META_DB_IMPL}==============")
else()
message(FATAL_ERROR "Invalid META DB IMPL: ${META_DB_IMPL}==============")
endif()
aux_source_directory(src/meta META_SRC)
if(${META_DB_IMPL} STREQUAL "BDB")
list(REMOVE_ITEM META_SRC "src/meta/metaTDBImpl.c")
elseif(${META_DB_IMPL} STREQUAL "TDB")
list(REMOVE_ITEM META_SRC "src/meta/metaBDBImpl.c")
endif()
aux_source_directory(src/tq TQ_SRC)
aux_source_directory(src/tsdb TSDB_SRC)
aux_source_directory(src/vnd VND_SRC)
list(APPEND
VNODE_SRC
${META_SRC}
@ -22,7 +39,6 @@ target_link_libraries(
PUBLIC util
PUBLIC common
PUBLIC transport
PUBLIC bdb
PUBLIC tfs
PUBLIC wal
PUBLIC scheduler
@ -31,6 +47,12 @@ target_link_libraries(
PUBLIC sync
)
if(${META_DB_IMPL} STREQUAL "BDB")
target_link_libraries(vnode PUBLIC bdb)
elseif(${META_DB_IMPL} STREQUAL "TDB")
target_link_libraries(vnode PUBLIC tdb)
endif()
if(${BUILD_TEST})
# add_subdirectory(test)
endif(${BUILD_TEST})

View File

@ -23,8 +23,8 @@
#include "tlist.h"
#include "tlockfree.h"
#include "tmacro.h"
#include "wal.h"
#include "tq.h"
#include "wal.h"
#include "vnode.h"
@ -175,7 +175,6 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
#ifdef __cplusplus
}
#endif

View File

@ -0,0 +1,145 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaDef.h"
#include "tdb.h"
struct SMetaDB {
TENV *pEnv;
TDB * pTbDB;
TDB * pSchemaDB;
TDB * pNameIdx;
TDB * pStbIdx;
TDB * pNtbIdx;
TDB * pCtbIdx;
// tag index hash table
// suid+colid --> TDB *
struct {
} tagIdxHt;
};
#define A(op, flag) \
do { \
if ((ret = op) != 0) goto flag; \
} while (0)
int metaOpenDB(SMeta *pMeta) {
SMetaDB *pDb;
TENV * pEnv;
TDB * pTbDB;
TDB * pSchemaDB;
TDB * pNameIdx;
TDB * pStbIdx;
TDB * pNtbIdx;
TDB * pCtbIdx;
int ret;
pDb = (SMetaDB *)calloc(1, sizeof(*pDb));
if (pDb == NULL) {
return -1;
}
// Create and open the ENV
A((tdbEnvCreate(&pEnv)), _err);
#if 0
// Set options of the environment
A(tdbEnvSetPageSize(pEnv, 8192), _err);
A(tdbEnvSetCacheSize(pEnv, 16 * 1024 * 1024), _err);
#endif
A((tdbEnvOpen(&pEnv)), _err);
// Create and open each DB
A(tdbCreate(&pTbDB), _err);
A(tdbOpen(&pTbDB, "table.db", NULL, pEnv), _err);
A(tdbCreate(&pSchemaDB), _err);
A(tdbOpen(&pSchemaDB, "schema.db", NULL, pEnv), _err);
A(tdbCreate(&pNameIdx), _err);
A(tdbOpen(&pNameIdx, "name.db", NULL, pEnv), _err);
// tdbAssociate();
pDb->pEnv = pEnv;
pDb->pTbDB = pTbDB;
pDb->pSchemaDB = pSchemaDB;
pMeta->pDB = pDb;
return 0;
_err:
return -1;
}
void metaCloseDB(SMeta *pMeta) {
// TODO
}
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
// TODO
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
// TODO
return 0;
}
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
// TODO
return NULL;
}
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
// TODO
return NULL;
}
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
// TODO
return NULL;
}
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
// TODO
return NULL;
}
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
// TODO
return NULL;
}
void metaCloseTbCursor(SMTbCursor *pTbCur) {
// TODO
}
char *metaTbCursorNext(SMTbCursor *pTbCur) {
// TODO
return NULL;
}
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
// TODO
return NULL;
}
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
// TODO
}
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
// TODO
return 0;
}

View File

@ -7,7 +7,7 @@ target_include_directories(
)
target_link_libraries(
nodes
PRIVATE os util
PRIVATE os util common qcom
)
if(${BUILD_TEST})

View File

@ -13,10 +13,179 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "plannodes.h"
#include "querynodes.h"
#include "query.h"
#include "taoserror.h"
#include "tjson.h"
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen) {
switch (nodeType(pNode)) {
static int32_t nodeToJson(const void* pObj, SJson* pJson);
static char* nodeName(ENodeType type) {
switch (type) {
case QUERY_NODE_COLUMN:
return "Column";
case QUERY_NODE_VALUE:
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION:
case QUERY_NODE_REAL_TABLE:
case QUERY_NODE_TEMP_TABLE:
case QUERY_NODE_JOIN_TABLE:
case QUERY_NODE_GROUPING_SET:
case QUERY_NODE_ORDER_BY_EXPR:
case QUERY_NODE_LIMIT:
case QUERY_NODE_STATE_WINDOW:
case QUERY_NODE_SESSION_WINDOW:
case QUERY_NODE_INTERVAL_WINDOW:
case QUERY_NODE_NODE_LIST:
case QUERY_NODE_FILL:
case QUERY_NODE_COLUMN_REF:
case QUERY_NODE_TARGET:
case QUERY_NODE_RAW_EXPR:
case QUERY_NODE_SET_OPERATOR:
case QUERY_NODE_SELECT_STMT:
case QUERY_NODE_SHOW_STMT:
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
return "LogicJoin";
case QUERY_NODE_LOGIC_PLAN_FILTER:
return "LogicFilter";
case QUERY_NODE_LOGIC_PLAN_AGG:
return "LogicAgg";
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return "LogicProject";
default:
break;
}
return "Unknown";
}
static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) {
if (LIST_LENGTH(pList) > 0) {
SJson* jList = tjsonAddArrayToObject(pJson, pName);
if (NULL == jList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pNode;
FOREACH(pNode, pList) {
int32_t code = tjsonAddItem(jList, func, pNode);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
static const char* jkTableMetaUid = "TableMetaUid";
static const char* jkTableMetaSuid = "TableMetaSuid";
static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
const STableMeta* pNode = (const STableMeta*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableMetaUid, pNode->uid);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableMetaSuid, pNode->suid);
}
return code;
}
static const char* jkLogicPlanId = "Id";
static const char* jkLogicPlanTargets = "Targets";
static const char* jkLogicPlanConditions = "Conditions";
static const char* jkLogicPlanChildren = "Children";
static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
const SLogicNode* pNode = (const SLogicNode*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkLogicPlanId, pNode->id);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkLogicPlanTargets, nodeToJson, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkLogicPlanConditions, nodeToJson, pNode->pConditions);
}
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkLogicPlanChildren, nodeToJson, pNode->pChildren);
}
return code;
}
static const char* jkScanLogicPlanScanCols = "ScanCols";
static const char* jkScanLogicPlanTableMeta = "TableMeta";
static int32_t logicScanToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkScanLogicPlanScanCols, nodeToJson, pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
}
return code;
}
static const char* jkProjectLogicPlanProjections = "Projections";
static int32_t logicProjectToJson(const void* pObj, SJson* pJson) {
const SProjectLogicNode* pNode = (const SProjectLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkProjectLogicPlanProjections, nodeToJson, pNode->pProjections);
}
return code;
}
static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
static int32_t logicJoinToJson(const void* pObj, SJson* pJson) {
const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions);
}
return code;
}
static int32_t logicFilterToJson(const void* pObj, SJson* pJson) {
return logicPlanNodeToJson(pObj, pJson);
}
static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
static const char* jkAggLogicPlanAggFuncs = "AggFuncs";
static int32_t logicAggToJson(const void* pObj, SJson* pJson) {
const SAggLogicNode* pNode = (const SAggLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkAggLogicPlanGroupKeys, nodeToJson, pNode->pGroupKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = addNodeList(pJson, jkAggLogicPlanAggFuncs, nodeToJson, pNode->pAggFuncs);
}
return code;
}
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:
case QUERY_NODE_VALUE:
case QUERY_NODE_OPERATOR:
@ -31,14 +200,68 @@ int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen) {
case QUERY_NODE_STATE_WINDOW:
case QUERY_NODE_SESSION_WINDOW:
case QUERY_NODE_INTERVAL_WINDOW:
case QUERY_NODE_NODE_LIST:
case QUERY_NODE_FILL:
case QUERY_NODE_COLUMN_REF:
case QUERY_NODE_TARGET:
case QUERY_NODE_RAW_EXPR:
case QUERY_NODE_SET_OPERATOR:
case QUERY_NODE_SELECT_STMT:
case QUERY_NODE_SHOW_STMT:
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_JOIN:
return logicJoinToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FILTER:
return logicFilterToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_AGG:
return logicAggToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return logicProjectToJson(pObj, pJson);
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static const char* jkNodeType = "Type";
static int32_t nodeToJson(const void* pObj, SJson* pJson) {
const SNode* pNode = (const SNode*)pObj;
char* pNodeName = nodeName(nodeType(pNode));
int32_t code = tjsonAddStringToObject(pJson, jkNodeType, pNodeName);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, pNodeName, specificNodeToJson, pNode);
}
return code;
}
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen) {
if (NULL == pNode || NULL == pStr || NULL == pLen) {
return TSDB_CODE_SUCCESS;
}
SJson* pJson = tjsonCreateObject();
if (NULL == pJson) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodeToJson(pNode, pJson);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
return code;
}
*pStr = tjsonToString(pJson);
tjsonDelete(pJson);
*pLen = strlen(*pStr) + 1;
return TSDB_CODE_SUCCESS;
}
int32_t nodesStringToNode(const char* pStr, SNode** pNode) {
return TSDB_CODE_SUCCESS;
}

View File

@ -14,7 +14,7 @@
*/
#include "querynodes.h"
#include "nodesShowStmts.h"
#include "plannodes.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
@ -68,8 +68,18 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
return makeNode(type, sizeof(SSelectStmt));
case QUERY_NODE_SHOW_STMT:
return makeNode(type, sizeof(SShowStmt));
// case QUERY_NODE_SHOW_STMT:
// return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
case QUERY_NODE_LOGIC_PLAN_JOIN:
return makeNode(type, sizeof(SJoinLogicNode));
case QUERY_NODE_LOGIC_PLAN_FILTER:
return makeNode(type, sizeof(SFilterLogicNode));
case QUERY_NODE_LOGIC_PLAN_AGG:
return makeNode(type, sizeof(SAggLogicNode));
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectLogicNode));
default:
break;
}
@ -121,6 +131,15 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
return TSDB_CODE_SUCCESS;
}
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
pTarget->pTail->pNext = pSrc->pHead;
pSrc->pHead->pPrev = pTarget->pTail;
pTarget->pTail = pSrc->pTail;
pTarget->length += pSrc->length;
tfree(pSrc);
return TSDB_CODE_SUCCESS;
}
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
if (NULL == pCell->pPrev) {
pList->pHead = pCell->pNext;
@ -129,6 +148,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
pCell->pNext->pPrev = pCell->pPrev;
}
SListCell* pNext = pCell->pNext;
nodesDestroyNode(pCell->pNode);
tfree(pCell);
--(pList->length);
return pNext;
@ -185,6 +205,14 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp) {
case OP_TYPE_NOT_LIKE:
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
case OP_TYPE_IS_NULL:
case OP_TYPE_IS_NOT_NULL:
case OP_TYPE_IS_TRUE:
case OP_TYPE_IS_FALSE:
case OP_TYPE_IS_UNKNOWN:
case OP_TYPE_IS_NOT_TRUE:
case OP_TYPE_IS_NOT_FALSE:
case OP_TYPE_IS_NOT_UNKNOWN:
return true;
default:
break;
@ -213,8 +241,7 @@ bool nodesIsTimelineQuery(const SNode* pQuery) {
typedef struct SCollectColumnsCxt {
int32_t errCode;
uint64_t tableId;
bool realCol;
const char* pTableAlias;
SNodeList* pCols;
SHashObj* pColIdHash;
} SCollectColumnsCxt;
@ -232,27 +259,24 @@ static EDealRes doCollect(SCollectColumnsCxt* pCxt, int32_t id, SNode* pNode) {
static EDealRes collectColumns(SNode* pNode, void* pContext) {
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
if (pCxt->realCol && QUERY_NODE_COLUMN == nodeType(pNode)) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
int32_t colId = pCol->colId;
if (pCxt->tableId == pCol->tableId && colId > 0) {
if (0 == strcmp(pCxt->pTableAlias, pCol->tableAlias)) {
return doCollect(pCxt, colId, pNode);
}
} else if (!pCxt->realCol && QUERY_NODE_COLUMN_REF == nodeType(pNode)) {
return doCollect(pCxt, ((SColumnRefNode*)pNode)->slotId, pNode);
}
return DEAL_RES_CONTINUE;
}
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols) {
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, SNodeList** pCols) {
if (NULL == pSelect || NULL == pCols) {
return TSDB_CODE_SUCCESS;
}
SCollectColumnsCxt cxt = {
.errCode = TSDB_CODE_SUCCESS,
.realCol = realCol,
.pTableAlias = pTableAlias,
.pCols = nodesMakeList(),
.pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK)
};
@ -303,6 +327,12 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
nodesDestroyList(cxt.pFuncs);
return cxt.errCode;
}
*pFuncs = cxt.pFuncs;
if (LIST_LENGTH(cxt.pFuncs) > 0) {
*pFuncs = cxt.pFuncs;
} else {
nodesDestroyList(cxt.pFuncs);
*pFuncs = NULL;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -13,27 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_AST_CREATE_FUNCS_H_
#define _TD_AST_CREATE_FUNCS_H_
#ifndef _TD_PARSER_IMPL_H_
#define _TD_PARSER_IMPL_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "querynodes.h"
#include "parser.h"
typedef enum EStmtType {
STMT_TYPE_CMD = 1,
STMT_TYPE_QUERY
} EStmtType;
typedef struct SQuery {
EStmtType stmtType;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
} SQuery;
#include "newParser.h"
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
@ -42,4 +30,4 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
}
#endif
#endif /*_TD_AST_CREATE_FUNCS_H_*/
#endif /*_TD_PARSER_IMPL_H_*/

View File

@ -880,7 +880,6 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
int32_t pos = getPositionValue(pVal);
if (pos < 0) {
ERASE_NODE(pOrderByList);
nodesDestroyNode(pNode);
continue;
} else if (0 == pos || pos > LIST_LENGTH(pProjectionList)) {
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT);
@ -1058,3 +1057,11 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
}
return code;
}
int32_t parser(SParseContext* pParseCxt, SQuery* pQuery) {
int32_t code = doParse(pParseCxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = doTranslate(pParseCxt, pQuery);
}
return code;
}

View File

@ -20,43 +20,10 @@
extern "C" {
#endif
#include "querynodes.h"
#include "plannodes.h"
#include "planner.h"
typedef struct SLogicNode {
ENodeType type;
int32_t id;
SNodeList* pTargets;
SNode* pConditions;
SNodeList* pChildren;
struct SLogicNode* pParent;
} SLogicNode;
typedef struct SScanLogicNode {
SLogicNode node;
SNodeList* pScanCols;
struct STableMeta* pMeta;
} SScanLogicNode;
typedef struct SJoinLogicNode {
SLogicNode node;
EJoinType joinType;
SNode* pOnConditions;
} SJoinLogicNode;
typedef struct SFilterLogicNode {
SLogicNode node;
} SFilterLogicNode;
typedef struct SAggLogicNode {
SLogicNode node;
SNodeList* pGroupKeys;
SNodeList* pAggFuncs;
} SAggLogicNode;
typedef struct SProjectLogicNode {
SLogicNode node;
} SProjectLogicNode;
int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode);
#ifdef __cplusplus
}

View File

@ -19,6 +19,7 @@
#define CHECK_ALLOC(p, res) \
do { \
if (NULL == p) { \
printf("%s : %d\n", __FUNCTION__, __LINE__); \
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
return res; \
} \
@ -28,6 +29,7 @@
do { \
int32_t code = exec; \
if (TSDB_CODE_SUCCESS != code) { \
printf("%s : %d\n", __FUNCTION__, __LINE__); \
pCxt->errCode = code; \
return res; \
} \
@ -44,8 +46,7 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele
typedef struct SRewriteExprCxt {
int32_t errCode;
int32_t planNodeId;
SNodeList* pTargets;
SNodeList* pExprs;
} SRewriteExprCxt;
static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
@ -53,34 +54,44 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
case QUERY_NODE_OPERATOR:
case QUERY_NODE_LOGIC_CONDITION:
case QUERY_NODE_FUNCTION: {
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
SNode* pExpr;
int32_t index = 0;
FOREACH(pExpr, pCxt->pExprs) {
if (nodesEqualNode(pExpr, *pNode)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode);
pCol->node.resType = pToBeRewrittenExpr->resType;
strcpy(pCol->node.aliasName, pToBeRewrittenExpr->aliasName);
strcpy(pCol->colName, ((SExprNode*)pExpr)->aliasName);
nodesDestroyNode(*pNode);
*pNode = (SNode*)pCol;
return DEAL_RES_IGNORE_CHILD;
}
++index;
}
break;
}
default:
break;
}
SRewriteExprCxt* pCxt = (SRewriteExprCxt*)pContext;
SNode* pTarget;
int32_t index = 0;
FOREACH(pTarget, pCxt->pTargets) {
if (nodesEqualNode(pTarget, *pNode)) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
pCol->tupleId = pCxt->planNodeId;
pCol->slotId = index;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pCol;
return DEAL_RES_IGNORE_CHILD;
}
++index;
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteExpr(int32_t planNodeId, SNodeList* pTargets, SSelectStmt* pSelect, ESqlClause clause) {
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = planNodeId, .pTargets = pTargets };
static int32_t rewriteExpr(int32_t planNodeId, int32_t rewriteId, SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
SNode* pNode;
FOREACH(pNode, pExprs) {
if (QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_VALUE == nodeType(pNode)) {
continue;
}
sprintf(((SExprNode*)pNode)->aliasName, "#expr_%d_%d", planNodeId, rewriteId);
}
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
return cxt.errCode;
}
@ -114,23 +125,6 @@ error:
return pRoot;
}
static SNodeList* createScanTargets(int32_t planNodeId, int32_t numOfScanCols) {
SNodeList* pTargets = nodesMakeList();
if (NULL == pTargets) {
return NULL;
}
for (int32_t i = 0; i < numOfScanCols; ++i) {
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pTargets, (SNode*)pCol)) {
nodesDestroyList(pTargets);
return NULL;
}
pCol->tupleId = planNodeId;
pCol->slotId = i;
}
return pTargets;
}
static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
CHECK_ALLOC(pScan, NULL);
@ -140,22 +134,25 @@ static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
// set columns to scan
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pScan->pMeta->uid, true, &pCols), (SLogicNode*)pScan);
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, &pCols), (SLogicNode*)pScan);
pScan->pScanCols = nodesCloneList(pCols);
CHECK_ALLOC(pScan->pScanCols, (SLogicNode*)pScan);
// pScanCols of SScanLogicNode is equivalent to pTargets of other logic nodes
CHECK_CODE(rewriteExpr(pScan->node.id, pScan->pScanCols, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pScan);
// set output
pScan->node.pTargets = createScanTargets(pScan->node.id, LIST_LENGTH(pScan->pScanCols));
pScan->node.pTargets = nodesCloneList(pCols);
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
return (SLogicNode*)pScan;
}
static SLogicNode* createSubqueryLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) {
return createQueryLogicNode(pCxt, pTable->pSubquery);
SLogicNode* pRoot = createQueryLogicNode(pCxt, pTable->pSubquery);
CHECK_ALLOC(pRoot, NULL);
SNode* pNode;
FOREACH(pNode, pRoot->pTargets) {
strcpy(((SColumnNode*)pNode)->tableAlias, pTable->table.tableAlias);
}
return pRoot;
}
static SLogicNode* createJoinLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable) {
@ -179,12 +176,12 @@ static SLogicNode* createJoinLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect,
pJoin->pOnConditions = nodesCloneNode(pJoinTable->pOnCond);
CHECK_ALLOC(pJoin->pOnConditions, (SLogicNode*)pJoin);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, 0, false, &pCols), (SLogicNode*)pJoin);
pJoin->node.pTargets = nodesCloneList(pCols);
// set the output
pJoin->node.pTargets = nodesCloneList(pLeft->pTargets);
CHECK_ALLOC(pJoin->node.pTargets, (SLogicNode*)pJoin);
CHECK_CODE(rewriteExpr(pJoin->node.id, pJoin->node.pTargets, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pJoin);
SNodeList* pTargets = nodesCloneList(pRight->pTargets);
CHECK_ALLOC(pTargets, (SLogicNode*)pJoin);
nodesListAppendList(pJoin->node.pTargets, pTargets);
return (SLogicNode*)pJoin;
}
@ -203,7 +200,7 @@ static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSele
return NULL;
}
static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SLogicNode* pChild, SSelectStmt* pSelect) {
if (NULL == pSelect->pWhere) {
return NULL;
}
@ -216,16 +213,44 @@ static SLogicNode* createWhereFilterLogicNode(SPlanContext* pCxt, SSelectStmt* p
pFilter->node.pConditions = nodesCloneNode(pSelect->pWhere);
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, 0, false, &pCols), (SLogicNode*)pFilter);
pFilter->node.pTargets = nodesCloneList(pCols);
// set the output
pFilter->node.pTargets = nodesCloneList(pChild->pTargets);
CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter);
CHECK_CODE(rewriteExpr(pFilter->node.id, pFilter->node.pTargets, pSelect, SQL_CLAUSE_WHERE), (SLogicNode*)pFilter);
return (SLogicNode*)pFilter;
}
static SNodeList* createColumnByRewriteExps(SPlanContext* pCxt, SNodeList* pExprs) {
SNodeList* pList = nodesMakeList();
CHECK_ALLOC(pList, NULL);
SNode* pNode;
FOREACH(pNode, pExprs) {
if (QUERY_NODE_VALUE == nodeType(pNode)) {
continue;
} else if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SNode* pCol = nodesCloneNode(pNode);
if (NULL == pCol) {
goto error;
}
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, pCol)) {
goto error;
}
} else {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
goto error;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
}
}
return pList;
error:
nodesDestroyList(pList);
return NULL;
}
static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SNodeList* pAggFuncs = NULL;
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
@ -242,25 +267,54 @@ static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect)
CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg);
pAgg->pAggFuncs = nodesCloneList(pAggFuncs);
CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg);
// rewrite the expression in subsequent clauses
CHECK_CODE(rewriteExpr(pAgg->node.id, 1, pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY), (SLogicNode*)pAgg);
CHECK_CODE(rewriteExpr(pAgg->node.id, 1 + LIST_LENGTH(pAgg->pGroupKeys), pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY), (SLogicNode*)pAgg);
pAgg->node.pConditions = nodesCloneNode(pSelect->pHaving);
CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg);
// set the output and rewrite the expression in subsequent clauses with the output
SNodeList* pCols = NULL;
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_HAVING, 0, false, &pCols), (SLogicNode*)pAgg);
pAgg->node.pTargets = nodesCloneList(pCols);
// set the output
pAgg->node.pTargets = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys);
CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg);
CHECK_CODE(rewriteExpr(pAgg->node.id, pAgg->node.pTargets, pSelect, SQL_CLAUSE_HAVING), (SLogicNode*)pAgg);
SNodeList* pTargets = createColumnByRewriteExps(pCxt, pAgg->pAggFuncs);
CHECK_ALLOC(pTargets, (SLogicNode*)pAgg);
nodesListAppendList(pAgg->node.pTargets, pTargets);
return (SLogicNode*)pAgg;
}
static SNodeList* createColumnByProjections(SPlanContext* pCxt, SNodeList* pExprs) {
SNodeList* pList = nodesMakeList();
CHECK_ALLOC(pList, NULL);
SNode* pNode;
FOREACH(pNode, pExprs) {
SExprNode* pExpr = (SExprNode*)pNode;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
goto error;
}
pCol->node.resType = pExpr->resType;
strcpy(pCol->colName, pExpr->aliasName);
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, (SNode*)pCol)) {
goto error;
}
}
return pList;
error:
nodesDestroyList(pList);
return NULL;
}
static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT);
CHECK_ALLOC(pProject, NULL);
pProject->node.id = pCxt->planNodeId++;
pProject->node.pTargets = nodesCloneList(pSelect->pProjectionList);
pProject->pProjections = nodesCloneList(pSelect->pProjectionList);
pProject->node.pTargets = createColumnByProjections(pCxt,pSelect->pProjectionList);
CHECK_ALLOC(pProject->node.pTargets, (SLogicNode*)pProject);
return (SLogicNode*)pProject;
@ -269,7 +323,7 @@ static SLogicNode* createProjectLogicNode(SPlanContext* pCxt, SSelectStmt* pSele
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pSelect));
pRoot = pushLogicNode(pCxt, pRoot, createWhereFilterLogicNode(pCxt, pRoot, pSelect));
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect));

View File

@ -13,7 +13,7 @@ ADD_EXECUTABLE(plannerTest
TARGET_LINK_LIBRARIES(
plannerTest
PUBLIC os util common planner parser catalog transport gtest function qcom
PUBLIC os util common nodes planner parser catalog transport gtest function qcom
)
TARGET_INCLUDE_DIRECTORIES(

View File

@ -0,0 +1,91 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include <gtest/gtest.h>
#include "plannerImpl.h"
#include "newParser.h"
using namespace std;
using namespace testing;
class NewPlannerTest : public Test {
protected:
void setDatabase(const string& acctId, const string& db) {
acctId_ = acctId;
db_ = db;
}
void bind(const char* sql) {
reset();
cxt_.acctId = atoi(acctId_.c_str());
cxt_.db = db_.c_str();
sqlBuf_ = string(sql);
transform(sqlBuf_.begin(), sqlBuf_.end(), sqlBuf_.begin(), ::tolower);
cxt_.sqlLen = strlen(sql);
cxt_.pSql = sqlBuf_.c_str();
}
bool run() {
int32_t code = parser(&cxt_, &query_);
// cout << "parser return " << code << endl;
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] parser code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return false;
}
SLogicNode* pLogicPlan = nullptr;
code = createLogicPlan(query_.pRoot, &pLogicPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] plan code:" << tstrerror(code) << endl;
return false;
}
char* pStr = NULL;
int32_t len = 0;
code = nodesNodeToString((const SNode*)pLogicPlan, &pStr, &len);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] toString code:" << tstrerror(code) << endl;
return false;
}
cout << "logic plan : " << endl;
cout << pStr << endl;
return true;
}
private:
static const int max_err_len = 1024;
void reset() {
memset(&cxt_, 0, sizeof(cxt_));
memset(errMagBuf_, 0, max_err_len);
cxt_.pMsg = errMagBuf_;
cxt_.msgLen = max_err_len;
}
string acctId_;
string db_;
char errMagBuf_[max_err_len];
string sqlBuf_;
SParseContext cxt_;
SQuery query_;
};
TEST_F(NewPlannerTest, simple) {
setDatabase("root", "test");
bind("SELECT * FROM t1");
ASSERT_TRUE(run());
}

View File

@ -22,41 +22,44 @@
extern "C" {
#endif
// #define TDB_EXTERN
// #define TDB_PUBLIC
// #define TDB_STATIC static
typedef struct STDb TDB;
typedef struct STDbEnv TENV;
typedef struct STDbCurosr TDBC;
// typedef enum { TDB_BTREE_T = 0, TDB_HASH_T = 1, TDB_HEAP_T = 2 } tdb_db_t;
typedef int32_t pgsz_t;
typedef int32_t cachesz_t;
// // Forward declarations
// typedef struct TDB TDB;
// // typedef struct TDB_MPOOL TDB_MPOOL;
// // typedef struct TDB_MPFILE TDB_MPFILE;
// // typedef struct TDB_CURSOR TDB_CURSOR;
typedef int (*TdbKeyCmprFn)(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
// typedef struct {
// void* bdata;
// uint32_t size;
// } TDB_KEY, TDB_VALUE;
// TEVN
int tdbEnvCreate(TENV **ppEnv, const char *rootDir);
int tdbEnvOpen(TENV *ppEnv);
int tdbEnvClose(TENV *pEnv);
// // TDB Operations
// int tdbCreateDB(TDB** dbpp, tdb_db_t type);
// int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags);
// int tdbCloseDB(TDB* dbp, uint32_t flags);
// int tdbPut(TDB* dbp, const TDB_KEY* key, const TDB_VALUE* value, uint32_t flags);
// int tdbGet(TDB* dbp, const TDB_KEY* key, TDB_VALUE* value, uint32_t flags);
int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize);
pgsz_t tdbEnvGetPageSize(TENV *pEnv);
cachesz_t tdbEnvGetCacheSize(TENV *pEnv);
// // TDB_MPOOL
// int tdbOpenMPool(TDB_MPOOL** mp);
// int tdbCloseMPool(TDB_MPOOL* mp);
int tdbEnvBeginTxn(TENV *pEnv);
int tdbEnvCommit(TENV *pEnv);
// // TDB_MPFILE
// int tdbOpenMPFile(TDB_MPFILE** mpf, TDB_MPOOL* mp);
// int tdbCloseMPFile(TDB_MPFILE** mpf);
// TDB
int tdbCreate(TDB **ppDb);
int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv);
int tdbClose(TDB *pDb);
int tdbDrop(TDB *pDb);
// // TDB_CURSOR
// int tdbOpenCursor(TDB* dbp, TDB_CURSOR** tdbcpp);
// int tdbCloseCurosr(TDB_CURSOR* tdbcp);
int tdbSetKeyLen(TDB *pDb, int klen);
int tdbSetValLen(TDB *pDb, int vlen);
int tdbSetDup(TDB *pDb, int dup);
int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn);
int tdbGetKeyLen(TDB *pDb);
int tdbGetValLen(TDB *pDb);
int tdbGetDup(TDB *pDb);
int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData);
// TDBC
#ifdef __cplusplus
}

View File

@ -0,0 +1,205 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct STDb {
char dbname[TDB_MAX_DBNAME_LEN];
SBTree * pBt; // current access method (may extend)
SPgFile * pPgFile; // backend page file this DB is using
TENV * pEnv; // TENV containing the DB
int klen; // key length if know
int vlen; // value length if know
bool dup; // dup mode
TdbKeyCmprFn cFn; // compare function
};
struct STDbCurosr {
SBtCursor *pBtCur;
};
static int tdbDefaultKeyCmprFn(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
int tdbCreate(TDB **ppDb) {
TDB *pDb;
// create the handle
pDb = (TDB *)calloc(1, sizeof(*pDb));
if (pDb == NULL) {
return -1;
}
pDb->klen = TDB_VARIANT_LEN;
pDb->vlen = TDB_VARIANT_LEN;
pDb->dup = false;
pDb->cFn = tdbDefaultKeyCmprFn;
*ppDb = pDb;
return 0;
}
static int tdbDestroy(TDB *pDb) {
if (pDb) {
free(pDb);
}
return 0;
}
int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv) {
int ret;
uint8_t fileid[TDB_FILE_ID_LEN];
SPgFile * pPgFile;
SPgCache *pPgCache;
SBTree * pBt;
bool fileExist;
size_t dbNameLen;
pgno_t dbRootPgno;
char dbfname[128]; // TODO: make this as a macro or malloc on the heap
ASSERT(pDb != NULL);
ASSERT(fname != NULL);
// TODO: Here we simply put an assert here. In the future, make `pEnv`
// can be set as NULL.
ASSERT(pEnv != NULL);
// check the DB name
dbNameLen = 0;
if (dbname) {
dbNameLen = strlen(dbname);
if (dbNameLen >= TDB_MAX_DBNAME_LEN) {
return -1;
}
memcpy(pDb->dbname, dbname, dbNameLen);
}
pDb->dbname[dbNameLen] = '\0';
// get page file from the env, if not opened yet, open it
pPgFile = NULL;
snprintf(dbfname, 128, "%s/%s", tdbEnvGetRootDir(pEnv), fname);
fileExist = (tdbCheckFileAccess(fname, TDB_F_OK) == 0);
if (fileExist) {
tdbGnrtFileID(dbfname, fileid, false);
pPgFile = tdbEnvGetPageFile(pEnv, fileid);
}
if (pPgFile == NULL) {
ret = pgFileOpen(&pPgFile, dbfname, pEnv);
if (ret != 0) {
// TODO: handle error
return -1;
}
}
// TODO: get the root page number from the master DB of the page file
// tdbGet(&dbRootPgno);
if (dbRootPgno == 0) {
// DB not exist, create one
ret = pgFileAllocatePage(pPgFile, &dbRootPgno);
if (ret != 0) {
// TODO: handle error
}
// tdbInsert(pPgFile->pMasterDB, dbname, strlen(dbname), &dbRootPgno, sizeof(dbRootPgno));
}
ASSERT(dbRootPgno > 1);
// pDb->pBt->root = dbRootPgno;
// register
pDb->pPgFile = pPgFile;
tdbEnvRgstDB(pEnv, pDb);
pDb->pEnv = pEnv;
return 0;
}
int tdbClose(TDB *pDb) {
if (pDb == NULL) return 0;
return tdbDestroy(pDb);
}
int tdbDrop(TDB *pDb) {
// TODO
return 0;
}
int tdbSetKeyLen(TDB *pDb, int klen) {
// TODO: check `klen`
pDb->klen = klen;
return 0;
}
int tdbSetValLen(TDB *pDb, int vlen) {
// TODO: check `vlen`
pDb->vlen = vlen;
return 0;
}
int tdbSetDup(TDB *pDb, int dup) {
if (dup) {
pDb->dup = true;
} else {
pDb->dup = false;
}
return 0;
}
int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn) {
if (fn == NULL) {
return -1;
} else {
pDb->cFn = fn;
}
return 0;
}
int tdbGetKeyLen(TDB *pDb) { return pDb->klen; }
int tdbGetValLen(TDB *pDb) { return pDb->vlen; }
int tdbGetDup(TDB *pDb) {
if (pDb->dup) {
return 1;
} else {
return 0;
}
}
int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData) {
// TODO
return 0;
}
static int tdbDefaultKeyCmprFn(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}

View File

@ -0,0 +1,164 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct SBtCursor {
SBTree *pBtree;
pgno_t pgno;
SPage * pPage; // current page traversing
};
typedef struct {
pgno_t pgno;
pgsz_t offset;
} SBtIdx;
// Btree page header definition
typedef struct __attribute__((__packed__)) {
uint8_t flag; // page flag
int32_t vlen; // value length of current page, TDB_VARIANT_LEN for variant length
uint16_t nPayloads; // number of total payloads
pgoff_t freeOff; // free payload offset
pgsz_t fragSize; // total fragment size
pgoff_t offPayload; // payload offset
pgno_t rChildPgno; // right most child page number
} SBtPgHdr;
typedef int (*BtreeCmprFn)(const void *, const void *);
#define BTREE_PAGE_HDR(pPage) NULL /* TODO */
#define BTREE_PAGE_PAYLOAD_AT(pPage, idx) NULL /*TODO*/
#define BTREE_PAGE_IS_LEAF(pPage) 0 /* TODO */
static int btreeCreate(SBTree **ppBt);
static int btreeDestroy(SBTree *pBt);
static int btreeCursorMoveToChild(SBtCursor *pBtCur, pgno_t pgno);
int btreeOpen(SBTree **ppBt, SPgFile *pPgFile) {
SBTree *pBt;
int ret;
ret = btreeCreate(&pBt);
if (ret != 0) {
return -1;
}
*ppBt = pBt;
return 0;
}
int btreeClose(SBTree *pBt) {
// TODO
return 0;
}
static int btreeCreate(SBTree **ppBt) {
SBTree *pBt;
pBt = (SBTree *)calloc(1, sizeof(*pBt));
if (pBt == NULL) {
return -1;
}
// TODO
return 0;
}
static int btreeDestroy(SBTree *pBt) {
if (pBt) {
free(pBt);
}
return 0;
}
int btreeCursorOpen(SBtCursor *pBtCur, SBTree *pBt) {
// TODO
return 0;
}
int btreeCursorClose(SBtCursor *pBtCur) {
// TODO
return 0;
}
int btreeCursorMoveTo(SBtCursor *pBtCur, int kLen, const void *pKey) {
SPage * pPage;
SBtPgHdr * pBtPgHdr;
SPgFile * pPgFile;
pgno_t childPgno;
pgno_t rootPgno;
int nPayloads;
void * pPayload;
BtreeCmprFn cmpFn;
// 1. Move the cursor to the root page
if (rootPgno == TDB_IVLD_PGNO) {
// No any data in this btree, just return not found (TODO)
return 0;
} else {
// Load the page from the file by the SPgFile handle
pPage = pgFileFetch(pPgFile, rootPgno);
pBtCur->pPage = pPage;
}
// 2. Loop to search over the whole tree
for (;;) {
int lidx, ridx, midx, cret;
pPage = pBtCur->pPage;
pBtPgHdr = BTREE_PAGE_HDR(pPage);
nPayloads = pBtPgHdr->nPayloads;
// Binary search the page
lidx = 0;
ridx = nPayloads - 1;
midx = (lidx + ridx) >> 1;
for (;;) {
// get the payload ptr at midx
pPayload = BTREE_PAGE_PAYLOAD_AT(pPage, midx);
// the payload and the key
cret = cmpFn(pKey, pPayload);
if (cret < 0) {
/* TODO */
} else if (cret > 0) {
/* TODO */
} else {
/* TODO */
}
if (lidx > ridx) break;
midx = (lidx + ridx) >> 1;
}
if (BTREE_PAGE_IS_LEAF(pPage)) {
/* TODO */
break;
} else {
/* TODO */
btreeCursorMoveToChild(pBtCur, childPgno);
}
}
return 0;
}
static int btreeCursorMoveToChild(SBtCursor *pBtCur, pgno_t pgno) {
SPgFile *pPgFile;
// TODO
return 0;
}

View File

@ -0,0 +1,173 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct STDbEnv {
char * rootDir; // root directory of the environment
char * jname; // journal file name
int jfd; // journal file fd
pgsz_t pgSize; // page size
cachesz_t cacheSize; // total cache size
STDbList dbList; // TDB List
SPgFileList pgfList; // SPgFile List
SPgCache * pPgCache; // page cache
struct {
#define TDB_ENV_PGF_HASH_BUCKETS 17
SPgFileList buckets[TDB_ENV_PGF_HASH_BUCKETS];
} pgfht; // page file hash table;
};
#define TDB_ENV_PGF_HASH(fileid) \
({ \
uint8_t *tmp = (uint8_t *)(fileid); \
tmp[0] + tmp[1] + tmp[2]; \
})
static int tdbEnvDestroy(TENV *pEnv);
int tdbEnvCreate(TENV **ppEnv, const char *rootDir) {
TENV * pEnv;
size_t slen;
size_t jlen;
ASSERT(rootDir != NULL);
*ppEnv = NULL;
slen = strlen(rootDir);
jlen = slen + strlen(TDB_JOURNAL_NAME) + 1;
pEnv = (TENV *)calloc(1, sizeof(*pEnv) + slen + 1 + jlen + 1);
if (pEnv == NULL) {
return -1;
}
pEnv->rootDir = (char *)(&pEnv[1]);
pEnv->jname = pEnv->rootDir + slen + 1;
pEnv->jfd = -1;
pEnv->pgSize = TDB_DEFAULT_PGSIZE;
pEnv->cacheSize = TDB_DEFAULT_CACHE_SIZE;
memcpy(pEnv->rootDir, rootDir, slen);
pEnv->rootDir[slen] = '\0';
sprintf(pEnv->jname, "%s/%s", rootDir, TDB_JOURNAL_NAME);
TD_DLIST_INIT(&(pEnv->dbList));
TD_DLIST_INIT(&(pEnv->pgfList));
/* TODO */
*ppEnv = pEnv;
return 0;
}
int tdbEnvOpen(TENV *pEnv) {
SPgCache *pPgCache;
int ret;
ASSERT(pEnv != NULL);
/* TODO: here we do not need to create the root directory, more
* work should be done here
*/
mkdir(pEnv->rootDir, 0755);
ret = pgCacheOpen(&pPgCache, pEnv);
if (ret != 0) {
goto _err;
}
pEnv->pPgCache = pPgCache;
return 0;
_err:
return -1;
}
int tdbEnvClose(TENV *pEnv) {
if (pEnv == NULL) return 0;
pgCacheClose(pEnv->pPgCache);
tdbEnvDestroy(pEnv);
return 0;
}
int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize) {
if (!TDB_IS_PGSIZE_VLD(pgSize) || cacheSize / pgSize < 10) {
return -1;
}
/* TODO */
pEnv->pgSize = pgSize;
pEnv->cacheSize = cacheSize;
return 0;
}
pgsz_t tdbEnvGetPageSize(TENV *pEnv) { return pEnv->pgSize; }
cachesz_t tdbEnvGetCacheSize(TENV *pEnv) { return pEnv->cacheSize; }
SPgFile *tdbEnvGetPageFile(TENV *pEnv, const uint8_t fileid[]) {
SPgFileList *pBucket;
SPgFile * pPgFile;
pBucket = pEnv->pgfht.buckets + (TDB_ENV_PGF_HASH(fileid) % TDB_ENV_PGF_HASH_BUCKETS); // TODO
for (pPgFile = TD_DLIST_HEAD(pBucket); pPgFile != NULL; pPgFile = TD_DLIST_NODE_NEXT_WITH_FIELD(pPgFile, envHash)) {
if (memcmp(fileid, pPgFile->fileid, TDB_FILE_ID_LEN) == 0) break;
};
return pPgFile;
}
SPgCache *tdbEnvGetPgCache(TENV *pEnv) { return pEnv->pPgCache; }
static int tdbEnvDestroy(TENV *pEnv) {
// TODO
return 0;
}
int tdbEnvBeginTxn(TENV *pEnv) {
pEnv->jfd = open(pEnv->jname, O_CREAT | O_RDWR, 0755);
if (pEnv->jfd < 0) {
return -1;
}
return 0;
}
int tdbEnvCommit(TENV *pEnv) {
/* TODO */
close(pEnv->jfd);
pEnv->jfd = -1;
return 0;
}
const char *tdbEnvGetRootDir(TENV *pEnv) { return pEnv->rootDir; }
int tdbEnvRgstPageFile(TENV *pEnv, SPgFile *pPgFile) {
SPgFileList *pBucket;
TD_DLIST_APPEND_WITH_FIELD(&(pEnv->pgfList), pPgFile, envPgfList);
pBucket = pEnv->pgfht.buckets + (TDB_ENV_PGF_HASH(pPgFile->fileid) % TDB_ENV_PGF_HASH_BUCKETS); // TODO
TD_DLIST_APPEND_WITH_FIELD(pBucket, pPgFile, envHash);
return 0;
}
int tdbEnvRgstDB(TENV *pEnv, TDB *pDb) {
// TODO
return 0;
}

View File

@ -12,10 +12,225 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
#include "tdb_mpool.h"
typedef TD_DLIST(SPage) SPgList;
struct SPgCache {
TENV * pEnv; // TENV containing this page cache
pgsz_t pgsize;
int32_t npage;
SPage **pages;
SPgList freeList;
SPgList lru;
struct {
int32_t nbucket;
SPgList *buckets;
} pght; // page hash table
};
static void pgCachePinPage(SPage *pPage);
static void pgCacheUnpinPage(SPage *pPage);
int pgCacheOpen(SPgCache **ppPgCache, TENV *pEnv) {
SPgCache *pPgCache;
SPage * pPage;
void * pData;
pgsz_t pgSize;
cachesz_t cacheSize;
int32_t npage;
int32_t nbucket;
size_t msize;
*ppPgCache = NULL;
pgSize = tdbEnvGetPageSize(pEnv);
cacheSize = tdbEnvGetCacheSize(pEnv);
npage = cacheSize / pgSize;
nbucket = npage;
msize = sizeof(*pPgCache) + sizeof(SPage *) * npage + sizeof(SPgList) * nbucket;
// Allocate the handle
pPgCache = (SPgCache *)calloc(1, msize);
if (pPgCache == NULL) {
return -1;
}
// Init the handle
pPgCache->pEnv = pEnv;
pPgCache->pgsize = pgSize;
pPgCache->npage = npage;
pPgCache->pages = (SPage **)(&pPgCache[1]);
pPgCache->pght.nbucket = nbucket;
pPgCache->pght.buckets = (SPgList *)(&(pPgCache->pages[npage]));
TD_DLIST_INIT(&(pPgCache->freeList));
for (int32_t i = 0; i < npage; i++) {
pData = malloc(pgSize + sizeof(SPage));
if (pData == NULL) {
return -1;
// TODO: handle error
}
pPage = POINTER_SHIFT(pData, pgSize);
pPage->pgid = TDB_IVLD_PGID;
pPage->frameid = i;
pPage->pData = pData;
// add current page to the page cache
pPgCache->pages[i] = pPage;
TD_DLIST_APPEND_WITH_FIELD(&(pPgCache->freeList), pPage, freeNode);
}
#if 0
for (int32_t i = 0; i < nbucket; i++) {
TD_DLIST_INIT(pPgCache->pght.buckets + i);
}
#endif
*ppPgCache = pPgCache;
return 0;
}
int pgCacheClose(SPgCache *pPgCache) {
SPage *pPage;
if (pPgCache) {
for (int32_t i = 0; i < pPgCache->npage; i++) {
pPage = pPgCache->pages[i];
tfree(pPage->pData);
}
free(pPgCache);
}
return 0;
}
#define PG_CACHE_HASH(fileid, pgno) \
({ \
uint64_t *tmp = (uint64_t *)(fileid); \
(tmp[0] + tmp[1] + tmp[2] + (pgno)); \
})
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid) {
SPage * pPage;
SPgFile *pPgFile;
SPgList *pBucket;
// 1. Search the page hash table SPgCache.pght
pBucket = pPgCache->pght.buckets + (PG_CACHE_HASH(pgid.fileid, pgid.pgno) % pPgCache->pght.nbucket);
pPage = TD_DLIST_HEAD(pBucket);
while (pPage && tdbCmprPgId(&(pPage->pgid), &pgid)) {
pPage = TD_DLIST_NODE_NEXT_WITH_FIELD(pPage, pghtNode);
}
if (pPage) {
// Page is found, pin the page and return the page
pgCachePinPage(pPage);
return pPage;
}
// 2. Check the free list
pPage = TD_DLIST_HEAD(&(pPgCache->freeList));
if (pPage) {
TD_DLIST_POP_WITH_FIELD(&(pPgCache->freeList), pPage, freeNode);
pgCachePinPage(pPage);
return pPage;
}
// 3. Try to recycle a page from the LRU list
pPage = TD_DLIST_HEAD(&(pPgCache->lru));
if (pPage) {
TD_DLIST_POP_WITH_FIELD(&(pPgCache->lru), pPage, lruNode);
// TODO: remove from the hash table
pgCachePinPage(pPage);
return pPage;
}
// 4. If a memory allocator is set, try to allocate from the allocator (TODO)
return NULL;
}
int pgCacheRelease(SPage *pPage) {
// TODO
return 0;
}
static void pgCachePinPage(SPage *pPage) {
// TODO
}
static void pgCacheUnpinPage(SPage *pPage) {
// TODO
}
#if 0
// Exposed handle
typedef struct TDB_MPOOL TDB_MPOOL;
typedef struct TDB_MPFILE TDB_MPFILE;
typedef TD_DLIST_NODE(pg_t) pg_free_dlist_node_t, pg_hash_dlist_node_t;
typedef struct pg_t {
SRWLatch rwLatch;
frame_id_t frameid;
pgid_t pgid;
uint8_t dirty;
uint8_t rbit;
int32_t pinRef;
pg_free_dlist_node_t free;
pg_hash_dlist_node_t hash;
void * p;
} pg_t;
typedef TD_DLIST(pg_t) pg_list_t;
typedef struct {
SRWLatch latch;
TD_DLIST(TDB_MPFILE);
} mpf_bucket_t;
struct TDB_MPOOL {
int64_t cachesize;
pgsz_t pgsize;
int32_t npages;
pg_t * pages;
pg_list_t freeList;
frame_id_t clockHand;
struct {
int32_t nbucket;
pg_list_t *hashtab;
} pgtab; // page table, hash<pgid_t, pg_t>
struct {
#define MPF_HASH_BUCKETS 16
mpf_bucket_t buckets[MPF_HASH_BUCKETS];
} mpfht; // MPF hash table. MPFs using this MP will be put in this hash table
};
#define MP_PAGE_AT(mp, idx) (mp)->pages[idx]
typedef TD_DLIST_NODE(TDB_MPFILE) td_mpf_dlist_node_t;
struct TDB_MPFILE {
char * fname; // file name
int fd; // fd
uint8_t fileid[TDB_FILE_ID_LEN]; // file ID
TDB_MPOOL * mp; // underlying memory pool
td_mpf_dlist_node_t node;
};
/*=================================================== Exposed apis ==================================================*/
// TDB_MPOOL
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsz_t pgsize);
int tdbMPoolClose(TDB_MPOOL *mp);
int tdbMPoolSync(TDB_MPOOL *mp);
// TDB_MPFILE
int tdbMPoolFileOpen(TDB_MPFILE **mpfp, const char *fname, TDB_MPOOL *mp);
int tdbMPoolFileClose(TDB_MPFILE *mpf);
int tdbMPoolFileNewPage(TDB_MPFILE *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileFreePage(TDB_MPOOL *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileGetPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFilePutPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFileSync(TDB_MPFILE *mpf);
static int tdbGnrtFileID(const char *fname, uint8_t *fileid);
static void tdbMPoolRegFile(TDB_MPOOL *mp, TDB_MPFILE *mpf);
static void tdbMPoolUnregFile(TDB_MPOOL *mp, TDB_MPFILE *mpf);
static TDB_MPFILE *tdbMPoolGetFile(TDB_MPOOL *mp, uint8_t *fileid);
@ -23,7 +238,7 @@ static int tdbMPoolFileReadPage(TDB_MPFILE *mpf, pgno_t pgno, void *p);
static int tdbMPoolFileWritePage(TDB_MPFILE *mpf, pgno_t pgno, const void *p);
static void tdbMPoolClockEvictPage(TDB_MPOOL *mp, pg_t **pagepp);
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsize_t pgsize) {
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsz_t pgsize) {
TDB_MPOOL *mp = NULL;
size_t tsize;
pg_t * pagep;
@ -120,7 +335,7 @@ int tdbMPoolFileOpen(TDB_MPFILE **mpfp, const char *fname, TDB_MPOOL *mp) {
goto _err;
}
if (tdbGnrtFileID(fname, mpf->fileid) < 0) {
if (tdbGnrtFileID(fname, mpf->fileid, false) < 0) {
goto _err;
}
@ -230,22 +445,6 @@ int tdbMPoolFilePutPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr) {
return 0;
}
static int tdbGnrtFileID(const char *fname, uint8_t *fileid) {
struct stat statbuf;
if (stat(fname, &statbuf) < 0) {
return -1;
}
memset(fileid, 0, TDB_FILE_ID_LEN);
((uint64_t *)fileid)[0] = (uint64_t)statbuf.st_ino;
((uint64_t *)fileid)[1] = (uint64_t)statbuf.st_dev;
((uint64_t *)fileid)[2] = rand();
return 0;
}
#define MPF_GET_BUCKETID(fileid) \
({ \
uint64_t *tmp = (uint64_t *)fileid; \
@ -317,7 +516,7 @@ static void tdbMPoolUnregFile(TDB_MPOOL *mp, TDB_MPFILE *mpf) {
}
static int tdbMPoolFileReadPage(TDB_MPFILE *mpf, pgno_t pgno, void *p) {
pgsize_t pgsize;
pgsz_t pgsize;
TDB_MPOOL *mp;
off_t offset;
size_t rsize;
@ -334,7 +533,7 @@ static int tdbMPoolFileReadPage(TDB_MPFILE *mpf, pgno_t pgno, void *p) {
}
static int tdbMPoolFileWritePage(TDB_MPFILE *mpf, pgno_t pgno, const void *p) {
pgsize_t pgsize;
pgsz_t pgsize;
TDB_MPOOL *mp;
off_t offset;
@ -376,4 +575,6 @@ static void tdbMPoolClockEvictPage(TDB_MPOOL *mp, pg_t **pagepp) {
} while (1);
*pagepp = pagep;
}
}
#endif

View File

@ -0,0 +1,221 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef struct SPage1 {
char magic[64];
pgno_t mdbRootPgno; // master DB root page number
pgno_t freePgno; // free list page number
uint32_t nFree; // number of free pages
} SPage1;
typedef struct SFreePage {
/* TODO */
} SFreePage;
TDB_STATIC_ASSERT(sizeof(SPage1) <= TDB_MIN_PGSIZE, "TDB Page1 definition too large");
static int pgFileRead(SPgFile *pPgFile, pgno_t pgno, uint8_t *pData);
int pgFileOpen(SPgFile **ppPgFile, const char *fname, TENV *pEnv) {
SPgFile * pPgFile;
SPgCache *pPgCache;
size_t fnameLen;
pgno_t fsize;
*ppPgFile = NULL;
// create the handle
fnameLen = strlen(fname);
pPgFile = (SPgFile *)calloc(1, sizeof(*pPgFile) + fnameLen + 1);
if (pPgFile == NULL) {
return -1;
}
ASSERT(pEnv != NULL);
// init the handle
pPgFile->fname = (char *)(&(pPgFile[1]));
memcpy(pPgFile->fname, fname, fnameLen);
pPgFile->fname[fnameLen] = '\0';
pPgFile->fd = -1;
pPgFile->fd = open(fname, O_CREAT | O_RDWR, 0755);
if (pPgFile->fd < 0) {
// TODO: handle error
return -1;
}
tdbGnrtFileID(fname, pPgFile->fileid, false);
tdbGetFileSize(fname, tdbEnvGetPageSize(pEnv), &fsize);
pPgFile->fsize = fsize;
pPgFile->lsize = fsize;
if (pPgFile->fsize == 0) {
// A created file
pgno_t pgno;
pgid_t pgid;
pgFileAllocatePage(pPgFile, &pgno);
ASSERT(pgno == 1);
memcpy(pgid.fileid, pPgFile->fileid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
pgCacheFetch(pPgCache, pgid);
// Need to allocate the first page as a description page
} else {
// An existing file
}
/* TODO: other open operations */
// add the page file to the environment
tdbEnvRgstPageFile(pEnv, pPgFile);
pPgFile->pEnv = pEnv;
*ppPgFile = pPgFile;
return 0;
}
int pgFileClose(SPgFile *pPgFile) {
if (pPgFile) {
if (pPgFile->fd >= 0) {
close(pPgFile->fd);
}
tfree(pPgFile->fname);
free(pPgFile);
}
return 0;
}
SPage *pgFileFetch(SPgFile *pPgFile, pgno_t pgno) {
SPgCache *pPgCache;
SPage * pPage;
pgid_t pgid;
// 1. Fetch from the page cache
// pgCacheFetch(pPgCache, pgid);
// 2. If only get a page frame, no content, maybe
// need to load from the file
if (1 /*page not initialized*/) {
if (pgno < pPgFile->fsize) {
// load the page content from the disk
// ?? How about the freed pages ??
} else {
// zero the page, make the page as a empty
// page with zero records.
}
}
#if 0
pPgCache = pPgFile->pPgCache;
pPage = NULL;
memcpy(pgid.fileid, pPgFile->fileid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
if (pgno > pPgFile->pgFileSize) {
// TODO
} else {
pPage = pgCacheFetch(pPgCache, pgid);
if (1 /*Page is cached, no need to load from file*/) {
return pPage;
} else {
// TODO: handle error
if (pgFileRead(pPgFile, pgno, (void *)pPage) < 0) {
// todoerr
}
return pPage;
}
}
#endif
return pPage;
}
int pgFileRelease(SPage *pPage) {
pgCacheRelease(pPage);
return 0;
}
int pgFileWrite(SPage *pPage) {
// TODO
return 0;
}
int pgFileAllocatePage(SPgFile *pPgFile, pgno_t *pPgno) {
pgno_t pgno;
SPage1 * pPage1;
SPgCache *pPgCache;
pgid_t pgid;
SPage * pPage;
if (pPgFile->lsize == 0) {
pgno = ++(pPgFile->lsize);
} else {
if (0) {
// TODO: allocate from the free list
pPage = pgCacheFetch(pPgCache, pgid);
if (pPage1->nFree > 0) {
// TODO
} else {
pgno = ++(pPgFile->lsize);
}
} else {
pgno = ++(pPgFile->lsize);
}
}
*pPgno = pgno;
return 0;
}
static int pgFileRead(SPgFile *pPgFile, pgno_t pgno, uint8_t *pData) {
pgsz_t pgSize;
ssize_t rsize;
uint8_t *pTData;
size_t szToRead;
#if 0
// pgSize = ; (TODO)
pTData = pData;
szToRead = pgSize;
for (; szToRead > 0;) {
rsize = pread(pPgFile->fd, pTData, szToRead, pgno * pgSize);
if (rsize < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (rsize == 0) {
return -1;
}
szToRead -= rsize;
pTData += rsize;
}
#endif
return 0;
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
struct stat statbuf;
if (stat(fname, &statbuf) < 0) {
return -1;
}
memset(fileid, 0, TDB_FILE_ID_LEN);
((uint64_t *)fileid)[0] = (uint64_t)statbuf.st_ino;
((uint64_t *)fileid)[1] = (uint64_t)statbuf.st_dev;
if (unique) {
((uint64_t *)fileid)[2] = rand();
}
return 0;
}
int tdbCheckFileAccess(const char *pathname, int mode) {
int flags = 0;
if (mode & TDB_F_OK) {
flags |= F_OK;
}
if (mode & TDB_R_OK) {
flags |= R_OK;
}
if (mode & TDB_W_OK) {
flags |= W_OK;
}
return access(pathname, flags);
}
int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize) {
struct stat st;
int ret;
ret = stat(fname, &st);
if (ret != 0) {
return -1;
}
ASSERT(st.st_size % pgSize == 0);
*pSize = st.st_size / pgSize;
return 0;
}

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_BTREE_H_
#define _TD_BTREE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SBTree SBTree;
typedef struct SBtCursor SBtCursor;
// SBTree
int btreeOpen(SBTree **ppBt, SPgFile *pPgFile);
int btreeClose(SBTree *pBt);
// SBtCursor
int btreeCursorOpen(SBtCursor *pBtCur, SBTree *pBt);
int btreeCursorClose(SBtCursor *pBtCur);
int btreeCursorMoveTo(SBtCursor *pBtCur, int kLen, const void *pKey);
int btreeCursorNext(SBtCursor *pBtCur);
struct SBTree {
pgno_t root;
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_BTREE_H_*/

View File

@ -13,14 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdb_db.h"
#ifndef _TDB_ENV_H_
#define _TDB_ENV_H_
int tdbOpen(TDB **dbpp, const char *fname, const char *dbname, uint32_t flags) {
// TODO
return 0;
#ifdef __cplusplus
extern "C" {
#endif
const char* tdbEnvGetRootDir(TENV* pEnv);
SPgFile* tdbEnvGetPageFile(TENV* pEnv, const uint8_t fileid[]);
SPgCache* tdbEnvGetPgCache(TENV* pEnv);
int tdbEnvRgstPageFile(TENV* pEnv, SPgFile* pPgFile);
int tdbEnvRgstDB(TENV* pEnv, TDB* pDb);
#ifdef __cplusplus
}
#endif
int tdbClose(TDB *dbp, uint32_t flags) {
// TODO
return 0;
}
#endif /*_TDB_ENV_H_*/

View File

@ -0,0 +1,134 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TDB_INTERNAL_H_
#define _TD_TDB_INTERNAL_H_
#include "tlist.h"
#include "tlockfree.h"
#include "tdb.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SPgFile SPgFile;
// pgno_t
typedef int32_t pgno_t;
#define TDB_IVLD_PGNO ((pgno_t)0)
// fileid
#define TDB_FILE_ID_LEN 24
// pgid_t
typedef struct {
uint8_t fileid[TDB_FILE_ID_LEN];
pgno_t pgno;
} pgid_t;
#define TDB_IVLD_PGID (pgid_t){0, TDB_IVLD_PGNO};
static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
pgid_t *pgid1 = (pgid_t *)p1;
pgid_t *pgid2 = (pgid_t *)p2;
int rcode;
rcode = memcmp(pgid1->fileid, pgid2->fileid, TDB_FILE_ID_LEN);
if (rcode) {
return rcode;
} else {
if (pgid1->pgno > pgid2->pgno) {
return 1;
} else if (pgid1->pgno < pgid2->pgno) {
return -1;
} else {
return 0;
}
}
}
// framd_id_t
typedef int32_t frame_id_t;
// pgsz_t
#define TDB_MIN_PGSIZE 512
#define TDB_MAX_PGSIZE 65536
#define TDB_DEFAULT_PGSIZE 4096
#define TDB_IS_PGSIZE_VLD(s) (((s) >= TDB_MIN_PGSIZE) && ((s) <= TDB_MAX_PGSIZE))
// pgoff_t
typedef pgsz_t pgoff_t;
// cache
#define TDB_DEFAULT_CACHE_SIZE (256 * 4096) // 1M
// dbname
#define TDB_MAX_DBNAME_LEN 24
// tdb_log
#define tdbError(var)
typedef TD_DLIST(STDb) STDbList;
typedef TD_DLIST(SPgFile) SPgFileList;
typedef TD_DLIST_NODE(SPgFile) SPgFileListNode;
#define TERR_A(val, op, flag) \
do { \
if (((val) = (op)) != 0) { \
goto flag; \
} \
} while (0)
#define TERR_B(val, op, flag) \
do { \
if (((val) = (op)) == NULL) { \
goto flag; \
} \
} while (0)
#define TDB_VARIANT_LEN (int)-1
// page payload format
// <keyLen> + <valLen> + [key] + [value]
#define TDB_DECODE_PAYLOAD(pPayload, keyLen, pKey, valLen, pVal) \
do { \
if ((keyLen) == TDB_VARIANT_LEN) { \
/* TODO: decode the keyLen */ \
} \
if ((valLen) == TDB_VARIANT_LEN) { \
/* TODO: decode the valLen */ \
} \
/* TODO */ \
} while (0)
#define TDB_JOURNAL_NAME "tdb.journal"
#include "tdbUtil.h"
#include "tdbBtree.h"
#include "tdbPgCache.h"
#include "tdbPgFile.h"
#include "tdbEnv.h"
#ifdef __cplusplus
}
#endif
#endif /*_TD_TDB_INTERNAL_H_*/

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PAGE_CACHE_H_
#define _TD_PAGE_CACHE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SPgCache SPgCache;
typedef struct SPage SPage;
// SPgCache
int pgCacheOpen(SPgCache **ppPgCache, TENV *pEnv);
int pgCacheClose(SPgCache *pPgCache);
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid);
int pgCacheRelease(SPage *pPage);
// SPage
typedef TD_DLIST_NODE(SPage) SPgListNode;
struct SPage {
pgid_t pgid; // page id
frame_id_t frameid; // frame id
uint8_t * pData; // real data
SPgListNode freeNode; // for SPgCache.freeList
SPgListNode pghtNode; // for pght
SPgListNode lruNode; // for LRU
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_PAGE_CACHE_H_*/

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PAGE_FILE_H_
#define _TD_PAGE_FILE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct __attribute__((__packed__)) {
char hdrInfo[16]; // info string
pgsz_t szPage; // page size of current file
int32_t cno; // commit number counter
pgno_t freePgno; // freelist page number
uint8_t resv[100]; // reserved space
} SPgFileHdr;
#define TDB_PG_FILE_HDR_SIZE 128
TDB_STATIC_ASSERT(sizeof(SPgFileHdr) == TDB_PG_FILE_HDR_SIZE, "Page file header size if not 128");
struct SPgFile {
TENV * pEnv; // env containing this page file
char * fname; // backend file name
uint8_t fileid[TDB_FILE_ID_LEN]; // file id
pgno_t lsize; // page file logical size (for count)
pgno_t fsize; // real file size on disk (for rollback)
int fd;
SPgFileListNode envHash;
SPgFileListNode envPgfList;
};
int pgFileOpen(SPgFile **ppPgFile, const char *fname, TENV *pEnv);
int pgFileClose(SPgFile *pPgFile);
SPage *pgFileFetch(SPgFile *pPgFile, pgno_t pgno);
int pgFileRelease(SPage *pPage);
int pgFileWrite(SPage *pPage);
int pgFileAllocatePage(SPgFile *pPgFile, pgno_t *pPgno);
#ifdef __cplusplus
}
#endif
#endif /*_TD_PAGE_FILE_H_*/

View File

@ -13,46 +13,32 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TDB_INC_H_
#define _TD_TDB_INC_H_
#include "os.h"
#include "tlist.h"
#include "tlockfree.h"
#ifndef _TDB_UTIL_H_
#define _TDB_UTIL_H_
#ifdef __cplusplus
extern "C" {
#endif
// pgno_t
typedef int32_t pgno_t;
#define TDB_IVLD_PGNO ((pgno_t)-1)
#if __STDC_VERSION__ >= 201112L
#define TDB_STATIC_ASSERT(op, info) static_assert(op, info)
#else
#define TDB_STATIC_ASSERT(op, info)
#endif
// fileid
#define TDB_FILE_ID_LEN 24
#define TDB_ROUND8(x) (((x) + 7) & ~7)
// pgid_t
typedef struct {
uint8_t fileid[TDB_FILE_ID_LEN];
pgno_t pgno;
} pgid_t;
#define TDB_IVLD_PGID (pgid_t){0, TDB_IVLD_PGNO};
int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique);
// framd_id_t
typedef int32_t frame_id_t;
#define TDB_F_OK 0x1
#define TDB_R_OK 0x2
#define TDB_W_OK 0x4
int tdbCheckFileAccess(const char *pathname, int mode);
// pgsize_t
typedef int32_t pgsize_t;
#define TDB_MIN_PGSIZE 512
#define TDB_MAX_PGSIZE 16384
#define TDB_DEFAULT_PGSIZE 4096
#define TDB_IS_PGSIZE_VLD(s) (((s) >= TDB_MIN_PGSIZE) && ((s) <= TDB_MAX_PGSIZE))
// tdb_log
#define tdbError(var)
int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TDB_INC_H_*/
#endif /*_TDB_UTIL_H_*/

View File

@ -1,94 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TDB_MPOOL_H_
#define _TD_TDB_MPOOL_H_
#include "tdb_inc.h"
#ifdef __cplusplus
extern "C" {
#endif
// Exposed handle
typedef struct TDB_MPOOL TDB_MPOOL;
typedef struct TDB_MPFILE TDB_MPFILE;
typedef TD_DLIST_NODE(pg_t) pg_free_dlist_node_t, pg_hash_dlist_node_t;
typedef struct pg_t {
SRWLatch rwLatch;
frame_id_t frameid;
pgid_t pgid;
uint8_t dirty;
uint8_t rbit;
int32_t pinRef;
pg_free_dlist_node_t free;
pg_hash_dlist_node_t hash;
void * p;
} pg_t;
typedef TD_DLIST(pg_t) pg_list_t;
typedef struct {
SRWLatch latch;
TD_DLIST(TDB_MPFILE);
} mpf_bucket_t;
struct TDB_MPOOL {
int64_t cachesize;
pgsize_t pgsize;
int32_t npages;
pg_t * pages;
pg_list_t freeList;
frame_id_t clockHand;
struct {
int32_t nbucket;
pg_list_t *hashtab;
} pgtab; // page table, hash<pgid_t, pg_t>
struct {
#define MPF_HASH_BUCKETS 16
mpf_bucket_t buckets[MPF_HASH_BUCKETS];
} mpfht; // MPF hash table. MPFs using this MP will be put in this hash table
};
#define MP_PAGE_AT(mp, idx) (mp)->pages[idx]
typedef TD_DLIST_NODE(TDB_MPFILE) td_mpf_dlist_node_t;
struct TDB_MPFILE {
char * fname; // file name
int fd; // fd
uint8_t fileid[TDB_FILE_ID_LEN]; // file ID
TDB_MPOOL * mp; // underlying memory pool
td_mpf_dlist_node_t node;
};
/*=================================================== Exposed apis ==================================================*/
// TDB_MPOOL
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsize_t pgsize);
int tdbMPoolClose(TDB_MPOOL *mp);
int tdbMPoolSync(TDB_MPOOL *mp);
// TDB_MPFILE
int tdbMPoolFileOpen(TDB_MPFILE **mpfp, const char *fname, TDB_MPOOL *mp);
int tdbMPoolFileClose(TDB_MPFILE *mpf);
int tdbMPoolFileNewPage(TDB_MPFILE *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileFreePage(TDB_MPOOL *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileGetPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFilePutPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFileSync(TDB_MPFILE *mpf);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TDB_MPOOL_H_*/

View File

@ -1,7 +1,3 @@
# tdbMPoolTest
add_executable(tdbMPoolTest "tdbMPoolTest.cpp")
target_link_libraries(tdbMPoolTest tdb gtest gtest_main)
# tdbTest
add_executable(tdbTest "tdbTest.cpp")
target_link_libraries(tdbTest tdb gtest gtest_main)

View File

@ -1,31 +0,0 @@
#include "gtest/gtest.h"
#include <iostream>
#include "tdb_mpool.h"
TEST(tdb_mpool_test, test1) {
TDB_MPOOL * mp;
TDB_MPFILE *mpf;
pgno_t pgno;
void * pgdata;
// open mp
tdbMPoolOpen(&mp, 16384, 4096);
// open mpf
tdbMPoolFileOpen(&mpf, "test.db", mp);
#define TEST1_TOTAL_PAGES 100
for (int i = 0; i < TEST1_TOTAL_PAGES; i++) {
tdbMPoolFileNewPage(mpf, &pgno, pgdata);
*(pgno_t *)pgdata = i;
}
// close mpf
tdbMPoolFileClose(mpf);
// close mp
tdbMPoolClose(mp);
}

View File

@ -2,13 +2,67 @@
#include "tdb.h"
TEST(tdb_api_test, tdb_create_open_close_db_test) {
// int ret;
// TDB *dbp;
TEST(tdb_test, simple_test) {
TENV * pEnv;
TDB * pDb1, *pDb2, *pDb3;
pgsz_t pgSize = 1024;
cachesz_t cacheSize = 10240;
// tdbCreateDB(&dbp, TDB_BTREE_T);
// ENV
GTEST_ASSERT_EQ(tdbEnvCreate(&pEnv, "./testtdb"), 0);
// tdbOpenDB(dbp, 0);
GTEST_ASSERT_EQ(tdbEnvSetCache(pEnv, pgSize, cacheSize), 0);
// tdbCloseDB(dbp, 0);
GTEST_ASSERT_EQ(tdbEnvGetCacheSize(pEnv), cacheSize);
GTEST_ASSERT_EQ(tdbEnvGetPageSize(pEnv), pgSize);
GTEST_ASSERT_EQ(tdbEnvOpen(pEnv), 0);
#if 1
// DB
GTEST_ASSERT_EQ(tdbCreate(&pDb1), 0);
// GTEST_ASSERT_EQ(tdbSetKeyLen(pDb1, 8), 0);
// GTEST_ASSERT_EQ(tdbGetKeyLen(pDb1), 8);
// GTEST_ASSERT_EQ(tdbSetValLen(pDb1, 3), 0);
// GTEST_ASSERT_EQ(tdbGetValLen(pDb1), 3);
// GTEST_ASSERT_EQ(tdbSetDup(pDb1, 1), 0);
// GTEST_ASSERT_EQ(tdbGetDup(pDb1), 1);
// GTEST_ASSERT_EQ(tdbSetCmprFunc(pDb1, NULL), 0);
tdbEnvBeginTxn(pEnv);
GTEST_ASSERT_EQ(tdbOpen(pDb1, "db.db", "db1", pEnv), 0);
// char *key = "key1";
// char *val = "value1";
// tdbInsert(pDb1, (void *)key, strlen(key), (void *)val, strlen(val));
tdbEnvCommit(pEnv);
#if 0
// Insert
// Query
// Delete
// Query
#endif
// GTEST_ASSERT_EQ(tdbOpen(&pDb2, "db.db", "db2", pEnv), 0);
// GTEST_ASSERT_EQ(tdbOpen(&pDb3, "index.db", NULL, pEnv), 0);
// tdbClose(pDb3);
// tdbClose(pDb2);
tdbClose(pDb1);
#endif
tdbEnvClose(pEnv);
}

View File

@ -10,7 +10,7 @@ target_link_libraries(
util
PRIVATE os
PUBLIC lz4_static
PUBLIC api
PUBLIC api cjson
)
if(${BUILD_TEST})

76
source/util/src/tjson.c Normal file
View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tjson.h"
#include "taoserror.h"
#include "cJSON.h"
SJson* tjsonCreateObject() {
return cJSON_CreateObject();
}
void tjsonDelete(SJson* pJson) {
cJSON_Delete((cJSON*)pJson);
}
int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t number) {
char tmp[40] = {0};
snprintf(tmp, tListLen(tmp), "%"PRId64, number);
return tjsonAddStringToObject(pJson, pName, tmp);
}
int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) {
return (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal) ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS);
}
SJson* tjsonAddArrayToObject(SJson* pJson, const char* pName) {
return cJSON_AddArrayToObject((cJSON*)pJson, pName);
}
int32_t tjsonAddItemToObject(SJson *pJson, const char* pName, SJson* pItem) {
return (cJSON_AddItemToObject((cJSON*)pJson, pName, pItem) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
}
int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) {
return (cJSON_AddItemToArray((cJSON*)pJson, pItem) ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
}
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj) {
if (NULL == pObj) {
return TSDB_CODE_SUCCESS;
}
SJson* pJobj = tjsonCreateObject();
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) {
printf("%s:%d code = %d\n", __FUNCTION__, __LINE__, TSDB_CODE_FAILED);
tjsonDelete(pJobj);
return TSDB_CODE_FAILED;
}
return tjsonAddItemToObject(pJson, pName, pJobj);
}
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
SJson* pJobj = tjsonCreateObject();
if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) {
tjsonDelete(pJobj);
return TSDB_CODE_FAILED;
}
return tjsonAddItemToArray(pJson, pJobj);
}
char* tjsonToString(const SJson* pJson) {
return cJSON_Print((cJSON*)pJson);
}