Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
86e7dd3a61
|
@ -38,12 +38,12 @@ typedef enum {
|
|||
} EQType;
|
||||
|
||||
typedef enum {
|
||||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_TABLE_MAX = 6
|
||||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_TABLE_MAX = 6
|
||||
} ETableType;
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -218,26 +218,6 @@ typedef struct {
|
|||
char data[];
|
||||
} SMDCreateTableMsg;
|
||||
|
||||
// typedef struct {
|
||||
// int32_t len; // one create table message
|
||||
// char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
// int16_t numOfColumns;
|
||||
// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
|
||||
// int8_t igExists;
|
||||
// int8_t rspMeta;
|
||||
// int8_t reserved[16];
|
||||
// char schema[];
|
||||
//} SCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char tableName[TSDB_TABLE_FNAME_LEN];
|
||||
int16_t numOfColumns;
|
||||
int16_t numOfTags;
|
||||
int8_t igExists;
|
||||
int8_t rspMeta;
|
||||
char schema[];
|
||||
} SCreateCTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
|
|
@ -13,205 +13,199 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_COMMON_TOKEN_DEF_H_
|
||||
#define _TD_COMMON_TOKEN_DEF_H_
|
||||
#ifndef TDENGINE_TTOKENDEF_H
|
||||
#define TDENGINE_TTOKENDEF_H
|
||||
|
||||
#define TK_ID 1
|
||||
#define TK_BOOL 2
|
||||
#define TK_TINYINT 3
|
||||
#define TK_SMALLINT 4
|
||||
#define TK_INTEGER 5
|
||||
#define TK_BIGINT 6
|
||||
#define TK_FLOAT 7
|
||||
#define TK_DOUBLE 8
|
||||
#define TK_STRING 9
|
||||
#define TK_TIMESTAMP 10
|
||||
#define TK_BINARY 11
|
||||
#define TK_NCHAR 12
|
||||
#define TK_OR 13
|
||||
#define TK_AND 14
|
||||
#define TK_NOT 15
|
||||
#define TK_EQ 16
|
||||
#define TK_NE 17
|
||||
#define TK_ISNULL 18
|
||||
#define TK_NOTNULL 19
|
||||
#define TK_IS 20
|
||||
#define TK_LIKE 21
|
||||
#define TK_MATCH 22
|
||||
#define TK_NMATCH 23
|
||||
#define TK_GLOB 24
|
||||
#define TK_BETWEEN 25
|
||||
#define TK_IN 26
|
||||
#define TK_GT 27
|
||||
#define TK_GE 28
|
||||
#define TK_LT 29
|
||||
#define TK_LE 30
|
||||
#define TK_BITAND 31
|
||||
#define TK_BITOR 32
|
||||
#define TK_LSHIFT 33
|
||||
#define TK_RSHIFT 34
|
||||
#define TK_PLUS 35
|
||||
#define TK_MINUS 36
|
||||
#define TK_DIVIDE 37
|
||||
#define TK_TIMES 38
|
||||
#define TK_STAR 39
|
||||
#define TK_SLASH 40
|
||||
#define TK_REM 41
|
||||
#define TK_CONCAT 42
|
||||
#define TK_UMINUS 43
|
||||
#define TK_UPLUS 44
|
||||
#define TK_BITNOT 45
|
||||
#define TK_SHOW 46
|
||||
#define TK_DATABASES 47
|
||||
#define TK_TOPICS 48
|
||||
#define TK_FUNCTIONS 49
|
||||
#define TK_MNODES 50
|
||||
#define TK_DNODES 51
|
||||
#define TK_ACCOUNTS 52
|
||||
#define TK_USERS 53
|
||||
#define TK_MODULES 54
|
||||
#define TK_QUERIES 55
|
||||
#define TK_CONNECTIONS 56
|
||||
#define TK_STREAMS 57
|
||||
#define TK_VARIABLES 58
|
||||
#define TK_SCORES 59
|
||||
#define TK_GRANTS 60
|
||||
#define TK_VNODES 61
|
||||
#define TK_DOT 62
|
||||
#define TK_CREATE 63
|
||||
#define TK_TABLE 64
|
||||
#define TK_STABLE 65
|
||||
#define TK_DATABASE 66
|
||||
#define TK_TABLES 67
|
||||
#define TK_STABLES 68
|
||||
#define TK_VGROUPS 69
|
||||
#define TK_DROP 70
|
||||
#define TK_TOPIC 71
|
||||
#define TK_FUNCTION 72
|
||||
#define TK_DNODE 73
|
||||
#define TK_USER 74
|
||||
#define TK_ACCOUNT 75
|
||||
#define TK_USE 76
|
||||
#define TK_DESCRIBE 77
|
||||
#define TK_DESC 78
|
||||
#define TK_ALTER 79
|
||||
#define TK_PASS 80
|
||||
#define TK_PRIVILEGE 81
|
||||
#define TK_LOCAL 82
|
||||
#define TK_COMPACT 83
|
||||
#define TK_LP 84
|
||||
#define TK_RP 85
|
||||
#define TK_IF 86
|
||||
#define TK_EXISTS 87
|
||||
#define TK_AS 88
|
||||
#define TK_OUTPUTTYPE 89
|
||||
#define TK_AGGREGATE 90
|
||||
#define TK_BUFSIZE 91
|
||||
#define TK_PPS 92
|
||||
#define TK_TSERIES 93
|
||||
#define TK_DBS 94
|
||||
#define TK_STORAGE 95
|
||||
#define TK_QTIME 96
|
||||
#define TK_CONNS 97
|
||||
#define TK_STATE 98
|
||||
#define TK_COMMA 99
|
||||
#define TK_KEEP 100
|
||||
#define TK_CACHE 101
|
||||
#define TK_REPLICA 102
|
||||
#define TK_QUORUM 103
|
||||
#define TK_DAYS 104
|
||||
#define TK_MINROWS 105
|
||||
#define TK_MAXROWS 106
|
||||
#define TK_BLOCKS 107
|
||||
#define TK_CTIME 108
|
||||
#define TK_WAL 109
|
||||
#define TK_FSYNC 110
|
||||
#define TK_COMP 111
|
||||
#define TK_PRECISION 112
|
||||
#define TK_UPDATE 113
|
||||
#define TK_CACHELAST 114
|
||||
#define TK_PARTITIONS 115
|
||||
#define TK_UNSIGNED 116
|
||||
#define TK_TAGS 117
|
||||
#define TK_USING 118
|
||||
#define TK_NULL 119
|
||||
#define TK_NOW 120
|
||||
#define TK_SELECT 121
|
||||
#define TK_UNION 122
|
||||
#define TK_ALL 123
|
||||
#define TK_DISTINCT 124
|
||||
#define TK_FROM 125
|
||||
#define TK_VARIABLE 126
|
||||
#define TK_INTERVAL 127
|
||||
#define TK_EVERY 128
|
||||
#define TK_SESSION 129
|
||||
#define TK_STATE_WINDOW 130
|
||||
#define TK_FILL 131
|
||||
#define TK_SLIDING 132
|
||||
#define TK_ORDER 133
|
||||
#define TK_BY 134
|
||||
#define TK_ASC 135
|
||||
#define TK_GROUP 136
|
||||
#define TK_HAVING 137
|
||||
#define TK_LIMIT 138
|
||||
#define TK_OFFSET 139
|
||||
#define TK_SLIMIT 140
|
||||
#define TK_SOFFSET 141
|
||||
#define TK_WHERE 142
|
||||
#define TK_RESET 143
|
||||
#define TK_QUERY 144
|
||||
#define TK_SYNCDB 145
|
||||
#define TK_ADD 146
|
||||
#define TK_COLUMN 147
|
||||
#define TK_MODIFY 148
|
||||
#define TK_TAG 149
|
||||
#define TK_CHANGE 150
|
||||
#define TK_SET 151
|
||||
#define TK_KILL 152
|
||||
#define TK_CONNECTION 153
|
||||
#define TK_STREAM 154
|
||||
#define TK_COLON 155
|
||||
#define TK_ABORT 156
|
||||
#define TK_AFTER 157
|
||||
#define TK_ATTACH 158
|
||||
#define TK_BEFORE 159
|
||||
#define TK_BEGIN 160
|
||||
#define TK_CASCADE 161
|
||||
#define TK_CLUSTER 162
|
||||
#define TK_CONFLICT 163
|
||||
#define TK_COPY 164
|
||||
#define TK_DEFERRED 165
|
||||
#define TK_DELIMITERS 166
|
||||
#define TK_DETACH 167
|
||||
#define TK_EACH 168
|
||||
#define TK_END 169
|
||||
#define TK_EXPLAIN 170
|
||||
#define TK_FAIL 171
|
||||
#define TK_FOR 172
|
||||
#define TK_IGNORE 173
|
||||
#define TK_IMMEDIATE 174
|
||||
#define TK_INITIALLY 175
|
||||
#define TK_INSTEAD 176
|
||||
#define TK_KEY 177
|
||||
#define TK_OF 178
|
||||
#define TK_RAISE 179
|
||||
#define TK_REPLACE 180
|
||||
#define TK_RESTRICT 181
|
||||
#define TK_ROW 182
|
||||
#define TK_STATEMENT 183
|
||||
#define TK_TRIGGER 184
|
||||
#define TK_VIEW 185
|
||||
#define TK_IPTOKEN 186
|
||||
#define TK_SEMI 187
|
||||
#define TK_NONE 188
|
||||
#define TK_PREV 189
|
||||
#define TK_LINEAR 190
|
||||
#define TK_IMPORT 191
|
||||
#define TK_TBNAME 192
|
||||
#define TK_JOIN 193
|
||||
#define TK_INSERT 194
|
||||
#define TK_INTO 195
|
||||
#define TK_VALUES 196
|
||||
#define TK_INTEGER 3
|
||||
#define TK_FLOAT 4
|
||||
#define TK_STRING 5
|
||||
#define TK_TIMESTAMP 6
|
||||
#define TK_OR 7
|
||||
#define TK_AND 8
|
||||
#define TK_NOT 9
|
||||
#define TK_EQ 10
|
||||
#define TK_NE 11
|
||||
#define TK_ISNULL 12
|
||||
#define TK_NOTNULL 13
|
||||
#define TK_IS 14
|
||||
#define TK_LIKE 15
|
||||
#define TK_MATCH 16
|
||||
#define TK_NMATCH 17
|
||||
#define TK_GLOB 18
|
||||
#define TK_BETWEEN 19
|
||||
#define TK_IN 20
|
||||
#define TK_GT 21
|
||||
#define TK_GE 22
|
||||
#define TK_LT 23
|
||||
#define TK_LE 24
|
||||
#define TK_BITAND 25
|
||||
#define TK_BITOR 26
|
||||
#define TK_LSHIFT 27
|
||||
#define TK_RSHIFT 28
|
||||
#define TK_PLUS 29
|
||||
#define TK_MINUS 30
|
||||
#define TK_DIVIDE 31
|
||||
#define TK_TIMES 32
|
||||
#define TK_STAR 33
|
||||
#define TK_SLASH 34
|
||||
#define TK_REM 35
|
||||
#define TK_CONCAT 36
|
||||
#define TK_UMINUS 37
|
||||
#define TK_UPLUS 38
|
||||
#define TK_BITNOT 39
|
||||
#define TK_SHOW 40
|
||||
#define TK_DATABASES 41
|
||||
#define TK_TOPICS 42
|
||||
#define TK_FUNCTIONS 43
|
||||
#define TK_MNODES 44
|
||||
#define TK_DNODES 45
|
||||
#define TK_ACCOUNTS 46
|
||||
#define TK_USERS 47
|
||||
#define TK_MODULES 48
|
||||
#define TK_QUERIES 49
|
||||
#define TK_CONNECTIONS 50
|
||||
#define TK_STREAMS 51
|
||||
#define TK_VARIABLES 52
|
||||
#define TK_SCORES 53
|
||||
#define TK_GRANTS 54
|
||||
#define TK_VNODES 55
|
||||
#define TK_DOT 56
|
||||
#define TK_CREATE 57
|
||||
#define TK_TABLE 58
|
||||
#define TK_STABLE 59
|
||||
#define TK_DATABASE 60
|
||||
#define TK_TABLES 61
|
||||
#define TK_STABLES 62
|
||||
#define TK_VGROUPS 63
|
||||
#define TK_DROP 64
|
||||
#define TK_TOPIC 65
|
||||
#define TK_FUNCTION 66
|
||||
#define TK_DNODE 67
|
||||
#define TK_USER 68
|
||||
#define TK_ACCOUNT 69
|
||||
#define TK_USE 70
|
||||
#define TK_DESCRIBE 71
|
||||
#define TK_DESC 72
|
||||
#define TK_ALTER 73
|
||||
#define TK_PASS 74
|
||||
#define TK_PRIVILEGE 75
|
||||
#define TK_LOCAL 76
|
||||
#define TK_COMPACT 77
|
||||
#define TK_LP 78
|
||||
#define TK_RP 79
|
||||
#define TK_IF 80
|
||||
#define TK_EXISTS 81
|
||||
#define TK_PORT 82
|
||||
#define TK_IPTOKEN 83
|
||||
#define TK_AS 84
|
||||
#define TK_OUTPUTTYPE 85
|
||||
#define TK_AGGREGATE 86
|
||||
#define TK_BUFSIZE 87
|
||||
#define TK_PPS 88
|
||||
#define TK_TSERIES 89
|
||||
#define TK_DBS 90
|
||||
#define TK_STORAGE 91
|
||||
#define TK_QTIME 92
|
||||
#define TK_CONNS 93
|
||||
#define TK_STATE 94
|
||||
#define TK_COMMA 95
|
||||
#define TK_KEEP 96
|
||||
#define TK_CACHE 97
|
||||
#define TK_REPLICA 98
|
||||
#define TK_QUORUM 99
|
||||
#define TK_DAYS 100
|
||||
#define TK_MINROWS 101
|
||||
#define TK_MAXROWS 102
|
||||
#define TK_BLOCKS 103
|
||||
#define TK_CTIME 104
|
||||
#define TK_WAL 105
|
||||
#define TK_FSYNC 106
|
||||
#define TK_COMP 107
|
||||
#define TK_PRECISION 108
|
||||
#define TK_UPDATE 109
|
||||
#define TK_CACHELAST 110
|
||||
#define TK_UNSIGNED 111
|
||||
#define TK_TAGS 112
|
||||
#define TK_USING 113
|
||||
#define TK_NULL 114
|
||||
#define TK_NOW 115
|
||||
#define TK_SELECT 116
|
||||
#define TK_UNION 117
|
||||
#define TK_ALL 118
|
||||
#define TK_DISTINCT 119
|
||||
#define TK_FROM 120
|
||||
#define TK_VARIABLE 121
|
||||
#define TK_INTERVAL 122
|
||||
#define TK_EVERY 123
|
||||
#define TK_SESSION 124
|
||||
#define TK_STATE_WINDOW 125
|
||||
#define TK_FILL 126
|
||||
#define TK_SLIDING 127
|
||||
#define TK_ORDER 128
|
||||
#define TK_BY 129
|
||||
#define TK_ASC 130
|
||||
#define TK_GROUP 131
|
||||
#define TK_HAVING 132
|
||||
#define TK_LIMIT 133
|
||||
#define TK_OFFSET 134
|
||||
#define TK_SLIMIT 135
|
||||
#define TK_SOFFSET 136
|
||||
#define TK_WHERE 137
|
||||
#define TK_RESET 138
|
||||
#define TK_QUERY 139
|
||||
#define TK_SYNCDB 140
|
||||
#define TK_ADD 141
|
||||
#define TK_COLUMN 142
|
||||
#define TK_MODIFY 143
|
||||
#define TK_TAG 144
|
||||
#define TK_CHANGE 145
|
||||
#define TK_SET 146
|
||||
#define TK_KILL 147
|
||||
#define TK_CONNECTION 148
|
||||
#define TK_STREAM 149
|
||||
#define TK_COLON 150
|
||||
#define TK_ABORT 151
|
||||
#define TK_AFTER 152
|
||||
#define TK_ATTACH 153
|
||||
#define TK_BEFORE 154
|
||||
#define TK_BEGIN 155
|
||||
#define TK_CASCADE 156
|
||||
#define TK_CLUSTER 157
|
||||
#define TK_CONFLICT 158
|
||||
#define TK_COPY 159
|
||||
#define TK_DEFERRED 160
|
||||
#define TK_DELIMITERS 161
|
||||
#define TK_DETACH 162
|
||||
#define TK_EACH 163
|
||||
#define TK_END 164
|
||||
#define TK_EXPLAIN 165
|
||||
#define TK_FAIL 166
|
||||
#define TK_FOR 167
|
||||
#define TK_IGNORE 168
|
||||
#define TK_IMMEDIATE 169
|
||||
#define TK_INITIALLY 170
|
||||
#define TK_INSTEAD 171
|
||||
#define TK_KEY 172
|
||||
#define TK_OF 173
|
||||
#define TK_RAISE 174
|
||||
#define TK_REPLACE 175
|
||||
#define TK_RESTRICT 176
|
||||
#define TK_ROW 177
|
||||
#define TK_STATEMENT 178
|
||||
#define TK_TRIGGER 179
|
||||
#define TK_VIEW 180
|
||||
#define TK_SEMI 181
|
||||
#define TK_NONE 182
|
||||
#define TK_PREV 183
|
||||
#define TK_LINEAR 184
|
||||
#define TK_IMPORT 185
|
||||
#define TK_TBNAME 186
|
||||
#define TK_JOIN 187
|
||||
#define TK_INSERT 188
|
||||
#define TK_INTO 189
|
||||
#define TK_VALUES 190
|
||||
|
||||
|
||||
#define TK_SPACE 300
|
||||
|
@ -223,6 +217,6 @@
|
|||
#define TK_FILE 306
|
||||
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
|
||||
|
||||
#endif /*_TD_COMMON_TOKEN_DEF_H_*/
|
||||
#endif
|
||||
|
||||
|
||||
|
|
|
@ -45,7 +45,6 @@ typedef struct SMetaData {
|
|||
} SMetaData;
|
||||
|
||||
typedef struct SCatalogCfg {
|
||||
bool enableVgroupCache;
|
||||
uint32_t maxTblCacheNum;
|
||||
uint32_t maxDBCacheNum;
|
||||
} SCatalogCfg;
|
||||
|
@ -61,8 +60,8 @@ int32_t catalogInit(SCatalogCfg *cfg);
|
|||
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
|
||||
|
||||
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
|
||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
|
||||
|
||||
/**
|
||||
* Get a table's meta data.
|
||||
|
|
|
@ -166,6 +166,7 @@ typedef struct SInsertStmtInfo {
|
|||
typedef struct SDclStmtInfo {
|
||||
int16_t nodeType;
|
||||
int16_t msgType;
|
||||
SEpSet epSet;
|
||||
char* pMsg;
|
||||
int32_t msgLen;
|
||||
} SDclStmtInfo;
|
||||
|
|
|
@ -24,7 +24,6 @@ extern "C" {
|
|||
#include "catalog.h"
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
int32_t clusterType;
|
||||
int32_t maxJobNum;
|
||||
} SSchedulerCfg;
|
||||
|
||||
|
|
|
@ -317,12 +317,12 @@ do { \
|
|||
#define TSDB_MAX_FIELD_LEN 16384
|
||||
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
|
||||
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
|
||||
#define PRIMARYKEY_TIMESTAMP_COL_ID 0
|
||||
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
|
||||
|
||||
#define TSDB_MAX_RPC_THREADS 5
|
||||
|
||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
|
||||
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
|
||||
|
||||
|
||||
#define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default
|
||||
|
|
|
@ -182,40 +182,40 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
|||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
|
||||
SMsgSendInfo* pSendMsg = buildSendMsgInfoImpl(pRequest);
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
|
||||
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
|
||||
char buf[18] = {0};
|
||||
sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId);
|
||||
int32_t code = catalogGetHandle(buf, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SCreateTableMsg* pMsg = pSendMsg->msgInfo.pData;
|
||||
|
||||
SName t = {0};
|
||||
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
||||
|
||||
char db[TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
||||
tNameGetFullDbName(&t, db);
|
||||
|
||||
SVgroupInfo info = {0};
|
||||
catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
||||
|
||||
// struct SCatalog* pCatalog = NULL;
|
||||
//
|
||||
// char buf[18] = {0};
|
||||
// sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId);
|
||||
// int32_t code = catalogGetHandle(buf, &pCatalog);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
// }
|
||||
//
|
||||
// SCreateTableMsg* pMsg = pSendMsg->msgInfo.pData;
|
||||
//
|
||||
// SName t = {0};
|
||||
// tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
||||
//
|
||||
// char db[TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
||||
// tNameGetFullDbName(&t, db);
|
||||
//
|
||||
// SVgroupInfo info = {0};
|
||||
// catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
||||
//
|
||||
int64_t transporterId = 0;
|
||||
SEpSet ep = {0};
|
||||
ep.inUse = info.inUse;
|
||||
ep.numOfEps = info.numOfEps;
|
||||
for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
||||
ep.port[i] = info.epAddr[i].port;
|
||||
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
||||
}
|
||||
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, pSendMsg);
|
||||
// SEpSet ep = {0};
|
||||
// ep.inUse = info.inUse;
|
||||
// ep.numOfEps = info.numOfEps;
|
||||
// for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
||||
// ep.port[i] = info.epAddr[i].port;
|
||||
// tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
||||
// }
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
|
||||
} else {
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
#include "taos.h"
|
||||
#include "taosdef.h"
|
||||
#include "thash.h"
|
||||
#include "ttime.h"
|
||||
#include "ttokendef.h"
|
||||
#include "ttypes.h"
|
||||
|
|
|
@ -66,8 +66,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
|||
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0)
|
||||
|
||||
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||
|
|
|
@ -370,6 +370,41 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t exist = 0;
|
||||
|
||||
if (0 == forceUpdate) {
|
||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
||||
|
||||
if (exist) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
SUseDbOutput DbOut = {0};
|
||||
SBuildUseDBInput input = {0};
|
||||
|
||||
strncpy(input.db, dbName, sizeof(input.db));
|
||||
input.db[sizeof(input.db) - 1] = 0;
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||
|
||||
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
|
||||
|
||||
if (dbInfo) {
|
||||
*dbInfo = DbOut.dbVgroup;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
if (ctgMgmt.pCluster) {
|
||||
ctgError("catalog already init");
|
||||
|
@ -378,16 +413,22 @@ int32_t catalogInit(SCatalogCfg *cfg) {
|
|||
|
||||
if (cfg) {
|
||||
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
|
||||
|
||||
if (ctgMgmt.cfg.maxDBCacheNum == 0) {
|
||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
}
|
||||
|
||||
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
|
||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
||||
}
|
||||
} else {
|
||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
||||
}
|
||||
|
||||
if (CTG_CACHE_ENABLED()) {
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||
}
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -449,13 +490,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (dbInfo->vgVersion < 0) {
|
||||
if (pCatalog->dbCache.cache) {
|
||||
SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
if (oldInfo && oldInfo->vgInfo) {
|
||||
taosHashCleanup(oldInfo->vgInfo);
|
||||
oldInfo->vgInfo = NULL;
|
||||
}
|
||||
|
||||
taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
}
|
||||
|
||||
|
@ -485,42 +532,6 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t exist = 0;
|
||||
|
||||
if (0 == forceUpdate) {
|
||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));
|
||||
|
||||
if (exist) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
SUseDbOutput DbOut = {0};
|
||||
SBuildUseDBInput input = {0};
|
||||
|
||||
strncpy(input.db, dbName, sizeof(input.db));
|
||||
input.db[sizeof(input.db) - 1] = 0;
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||
|
||||
CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
|
||||
|
||||
if (dbInfo) {
|
||||
*dbInfo = DbOut.dbVgroup;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
|
||||
}
|
||||
|
@ -531,6 +542,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
}
|
||||
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
|
||||
|
||||
|
@ -540,11 +552,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
|
|||
|
||||
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
|
||||
|
||||
CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||
|
||||
_return:
|
||||
|
||||
tfree(output.tbMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
|
||||
|
@ -563,7 +577,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
|
|||
|
||||
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
|
||||
|
||||
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
|
||||
|
@ -594,6 +608,7 @@ _return:
|
|||
tfree(tbMeta);
|
||||
|
||||
taosArrayDestroy(*pVgroupList);
|
||||
*pVgroupList = NULL;
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -604,7 +619,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter,
|
|||
int32_t code = 0;
|
||||
int32_t vgId = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
|
||||
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
|
||||
|
||||
if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
|
||||
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
|
||||
|
@ -627,12 +642,15 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p
|
|||
if (pReq->pTableName) {
|
||||
char dbName[TSDB_DB_FNAME_LEN];
|
||||
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
|
||||
if (tbNum > 0) {
|
||||
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
||||
if (NULL == pRsp->pTableMeta) {
|
||||
ctgError("taosArrayInit num[%d] failed", tbNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
if (tbNum <= 0) {
|
||||
ctgError("empty table name list");
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
|
||||
if (NULL == pRsp->pTableMeta) {
|
||||
ctgError("taosArrayInit num[%d] failed", tbNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbNum; ++i) {
|
||||
|
@ -663,6 +681,7 @@ _return:
|
|||
}
|
||||
|
||||
taosArrayDestroy(pRsp->pTableMeta);
|
||||
pRsp->pTableMeta = NULL;
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
|
|
@ -30,14 +30,18 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct MemTable {
|
||||
T_REF_DECLARE()
|
||||
SSkipList* mem;
|
||||
} MemTable;
|
||||
typedef struct IndexCache {
|
||||
T_REF_DECLARE()
|
||||
SSkipList *mem, *imm;
|
||||
SIndex* index;
|
||||
char* colName;
|
||||
int32_t version;
|
||||
int32_t nTerm;
|
||||
int8_t type;
|
||||
MemTable *mem, *imm;
|
||||
SIndex* index;
|
||||
char* colName;
|
||||
int32_t version;
|
||||
int32_t nTerm;
|
||||
int8_t type;
|
||||
|
||||
pthread_mutex_t mtx;
|
||||
} IndexCache;
|
||||
|
@ -45,7 +49,6 @@ typedef struct IndexCache {
|
|||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||
typedef struct CacheTerm {
|
||||
// key
|
||||
int32_t nColVal;
|
||||
char* colVal;
|
||||
int32_t version;
|
||||
// value
|
||||
|
|
|
@ -34,9 +34,7 @@ int32_t indexInit() {
|
|||
return indexQhandle == NULL ? -1 : 0;
|
||||
// do nothing
|
||||
}
|
||||
void indexCleanUp() {
|
||||
taosCleanUpScheduler(indexQhandle);
|
||||
}
|
||||
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
|
||||
|
||||
static int uidCompare(const void* a, const void* b) {
|
||||
uint64_t u1 = *(uint64_t*)a;
|
||||
|
@ -63,7 +61,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
|
|||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||
// pthread_once(&isInit, indexInit);
|
||||
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
||||
if (sIdx == NULL) { return -1; }
|
||||
if (sIdx == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
index_t* index = index_open(path);
|
||||
|
@ -99,7 +99,9 @@ void indexClose(SIndex* sIdx) {
|
|||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
if (*pCache) { indexCacheUnRef(*pCache); }
|
||||
if (*pCache) {
|
||||
indexCacheUnRef(*pCache);
|
||||
}
|
||||
iter = taosHashIterate(sIdx->colObj, iter);
|
||||
}
|
||||
taosHashCleanup(sIdx->colObj);
|
||||
|
@ -133,7 +135,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
|||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||
if (*cache == NULL) {
|
||||
if (cache == NULL) {
|
||||
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
|
||||
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
|
||||
}
|
||||
|
@ -143,10 +145,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
|||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||
|
||||
assert(*cache != NULL);
|
||||
int ret = indexCachePut(*cache, p, uid);
|
||||
if (ret != 0) { return ret; }
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -224,17 +227,20 @@ SIndexOpts* indexOptsCreate() {
|
|||
#endif
|
||||
return NULL;
|
||||
}
|
||||
void indexOptsDestroy(SIndexOpts* opts){
|
||||
void indexOptsDestroy(SIndexOpts* opts) {
|
||||
#ifdef USE_LUCENE
|
||||
#endif
|
||||
} /*
|
||||
* @param: oper
|
||||
*
|
||||
*/
|
||||
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* @param: oper
|
||||
*
|
||||
*/
|
||||
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
||||
if (p == NULL) { return NULL; }
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
p->opera = opera;
|
||||
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||
return p;
|
||||
|
@ -253,15 +259,12 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
|
|||
return 0;
|
||||
}
|
||||
|
||||
SIndexTerm* indexTermCreate(int64_t suid,
|
||||
SIndexOperOnColumn oper,
|
||||
uint8_t colType,
|
||||
const char* colName,
|
||||
int32_t nColName,
|
||||
const char* colVal,
|
||||
int32_t nColVal) {
|
||||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
|
||||
int32_t nColName, const char* colVal, int32_t nColVal) {
|
||||
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
|
||||
if (t == NULL) { return NULL; }
|
||||
if (t == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->suid = suid;
|
||||
t->operType = oper;
|
||||
|
@ -282,9 +285,7 @@ void indexTermDestroy(SIndexTerm* p) {
|
|||
free(p);
|
||||
}
|
||||
|
||||
SIndexMultiTerm* indexMultiTermCreate() {
|
||||
return taosArrayInit(4, sizeof(SIndexTerm*));
|
||||
}
|
||||
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
|
||||
|
||||
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
|
||||
taosArrayPush(terms, &term);
|
||||
|
@ -307,7 +308,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
|||
IndexCache* cache = NULL;
|
||||
pthread_mutex_lock(&sIdx->mtx);
|
||||
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
|
||||
if (*pCache == NULL) {
|
||||
if (pCache == NULL) {
|
||||
pthread_mutex_unlock(&sIdx->mtx);
|
||||
return -1;
|
||||
}
|
||||
|
@ -335,7 +336,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
|||
return 0;
|
||||
}
|
||||
static void indexInterResultsDestroy(SArray* results) {
|
||||
if (results == NULL) { return; }
|
||||
if (results == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t sz = taosArrayGetSize(results);
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
|
@ -366,7 +369,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
|||
}
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||
if (sIdx == NULL) { return -1; }
|
||||
if (sIdx == NULL) {
|
||||
return -1;
|
||||
}
|
||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
|
@ -399,7 +404,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
|||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
|
||||
// copy to final Result;
|
||||
cn = cacheIter->next(cacheIter);
|
||||
} else {
|
||||
|
@ -433,7 +437,9 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
|||
indexError("faile to open file to write");
|
||||
} else {
|
||||
int ret = tfileWriterPut(tw, result);
|
||||
if (ret != 0) { indexError("faile to write into tindex "); }
|
||||
if (ret != 0) {
|
||||
indexError("faile to write into tindex ");
|
||||
}
|
||||
}
|
||||
// not free later, just put int table cache
|
||||
indexCacheDestroyImm(pCache);
|
||||
|
|
|
@ -23,46 +23,22 @@
|
|||
#define MEM_TERM_LIMIT 1000000
|
||||
// ref index_cache.h:22
|
||||
//#define CACHE_KEY_LEN(p) \
|
||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
|
||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||
// sizeof(p->operType))
|
||||
|
||||
static void cacheTermDestroy(CacheTerm* ct) {
|
||||
if (ct == NULL) { return; }
|
||||
static void indexMemRef(MemTable* tbl);
|
||||
static void indexMemUnRef(MemTable* tbl);
|
||||
|
||||
free(ct->colVal);
|
||||
free(ct);
|
||||
}
|
||||
static char* getIndexKey(const void* pData) {
|
||||
CacheTerm* p = (CacheTerm*)pData;
|
||||
return (char*)p;
|
||||
}
|
||||
static void cacheTermDestroy(CacheTerm* ct);
|
||||
static char* getIndexKey(const void* pData);
|
||||
static int32_t compareKey(const void* l, const void* r);
|
||||
|
||||
static int32_t compareKey(const void* l, const void* r) {
|
||||
CacheTerm* lt = (CacheTerm*)l;
|
||||
CacheTerm* rt = (CacheTerm*)r;
|
||||
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||
|
||||
// compare colVal
|
||||
int i, j;
|
||||
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
|
||||
if (lt->colVal[i] == rt->colVal[j]) {
|
||||
continue;
|
||||
} else {
|
||||
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
|
||||
}
|
||||
}
|
||||
if (i < lt->nColVal) {
|
||||
return 1;
|
||||
} else if (j < rt->nColVal) {
|
||||
return -1;
|
||||
}
|
||||
// compare version
|
||||
return rt->version - lt->version;
|
||||
}
|
||||
static void doMergeWork(SSchedMsg* msg);
|
||||
static bool indexCacheIteratorNext(Iterate* itera);
|
||||
|
||||
static SSkipList* indexInternalCacheCreate(int8_t type) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||
}
|
||||
}
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||
|
||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
||||
|
@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
|||
return cache;
|
||||
}
|
||||
void indexCacheDebug(IndexCache* cache) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(cache->mem);
|
||||
MemTable* tbl = NULL;
|
||||
|
||||
pthread_mutex_lock(&cache->mtx);
|
||||
tbl = cache->mem;
|
||||
indexMemRef(tbl);
|
||||
pthread_mutex_unlock(&cache->mtx);
|
||||
|
||||
SSkipList* slt = tbl->mem;
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) {
|
|||
}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
|
||||
indexMemUnRef(tbl);
|
||||
}
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||
|
@ -100,71 +86,50 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
|||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (ct != NULL) {}
|
||||
if (ct != NULL) {
|
||||
}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
tSkipListDestroy(slt);
|
||||
}
|
||||
void indexCacheDestroyImm(IndexCache* cache) {
|
||||
MemTable* tbl = NULL;
|
||||
pthread_mutex_lock(&cache->mtx);
|
||||
SSkipList* timm = (SSkipList*)cache->imm;
|
||||
tbl = cache->imm;
|
||||
cache->imm = NULL; // or throw int bg thread
|
||||
pthread_mutex_unlock(&cache->mtx);
|
||||
|
||||
indexCacheDestroySkiplist(timm);
|
||||
indexMemUnRef(tbl);
|
||||
}
|
||||
void indexCacheDestroy(void* cache) {
|
||||
IndexCache* pCache = cache;
|
||||
if (pCache == NULL) { return; }
|
||||
tSkipListDestroy(pCache->mem);
|
||||
tSkipListDestroy(pCache->imm);
|
||||
if (pCache == NULL) {
|
||||
return;
|
||||
}
|
||||
indexMemUnRef(pCache->mem);
|
||||
indexMemUnRef(pCache->imm);
|
||||
free(pCache->colName);
|
||||
|
||||
free(pCache);
|
||||
}
|
||||
|
||||
static void doMergeWork(SSchedMsg* msg) {
|
||||
IndexCache* pCache = msg->ahandle;
|
||||
SIndex* sidx = (SIndex*)pCache->index;
|
||||
indexFlushCacheTFile(sidx, pCache);
|
||||
}
|
||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||
SSkipListIterator* iter = itera->iter;
|
||||
if (iter == NULL) { return false; }
|
||||
|
||||
IterateValue* iv = &itera->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
bool next = tSkipListIterNext(iter);
|
||||
if (next) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
||||
iv->type = ct->operaType;
|
||||
iv->colVal = ct->colVal;
|
||||
|
||||
taosArrayPush(iv->val, &ct->uid);
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||
if (iiter == NULL) { return NULL; }
|
||||
if (iiter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
MemTable* tbl = cache->imm;
|
||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
|
||||
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
||||
iiter->next = indexCacheIteratorNext;
|
||||
iiter->getValue = indexCacheIteratorGetValue;
|
||||
|
||||
return iiter;
|
||||
}
|
||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
|
||||
if (iter == NULL) {
|
||||
return;
|
||||
}
|
||||
tSkipListDestroyIter(iter->iter);
|
||||
iterateValueDestroy(&iter->val, true);
|
||||
free(iter);
|
||||
|
@ -201,18 +166,21 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
}
|
||||
|
||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||
if (cache == NULL) { return -1; }
|
||||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
IndexCache* pCache = cache;
|
||||
indexCacheRef(pCache);
|
||||
// encode data
|
||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||
if (cache == NULL) { return -1; }
|
||||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// set up key
|
||||
ct->colType = term->colType;
|
||||
ct->nColVal = term->nColVal;
|
||||
ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1));
|
||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
||||
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
||||
// set value
|
||||
ct->uid = uid;
|
||||
|
@ -220,8 +188,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
|||
|
||||
// ugly code, refactor later
|
||||
pthread_mutex_lock(&pCache->mtx);
|
||||
|
||||
indexCacheMakeRoomForWrite(pCache);
|
||||
tSkipListPut(pCache->mem, (char*)ct);
|
||||
MemTable* tbl = pCache->mem;
|
||||
indexMemRef(tbl);
|
||||
tSkipListPut(tbl->mem, (char*)ct);
|
||||
indexMemUnRef(tbl);
|
||||
|
||||
pthread_mutex_unlock(&pCache->mtx);
|
||||
|
||||
indexCacheUnRef(pCache);
|
||||
|
@ -233,27 +206,38 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
|||
return 0;
|
||||
}
|
||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
||||
if (cache == NULL) { return -1; }
|
||||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
IndexCache* pCache = cache;
|
||||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
|
||||
MemTable *mem = NULL, *imm = NULL;
|
||||
pthread_mutex_lock(&pCache->mtx);
|
||||
mem = pCache->mem;
|
||||
imm = pCache->imm;
|
||||
indexMemRef(mem);
|
||||
indexMemRef(imm);
|
||||
pthread_mutex_unlock(&pCache->mtx);
|
||||
|
||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||
if (ct == NULL) { return -1; }
|
||||
ct->nColVal = term->nColVal;
|
||||
ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1));
|
||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
||||
if (ct == NULL) {
|
||||
return -1;
|
||||
}
|
||||
ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||
ct->version = atomic_load_32(&pCache->version);
|
||||
|
||||
char* key = getIndexKey(ct);
|
||||
// TODO handle multi situation later, and refactor
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
if (node != NULL) {
|
||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
||||
if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) {
|
||||
if (strcmp(c->colVal, ct->colVal) == 0) {
|
||||
taosArrayPush(result, &c->uid);
|
||||
*s = kTypeValue;
|
||||
} else {
|
||||
|
@ -279,14 +263,104 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
|||
} else if (qtype == QUERY_REGEX) {
|
||||
//
|
||||
}
|
||||
indexMemUnRef(mem);
|
||||
indexMemUnRef(imm);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void indexCacheRef(IndexCache* cache) {
|
||||
if (cache == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_INC(cache);
|
||||
UNUSED(ref);
|
||||
}
|
||||
void indexCacheUnRef(IndexCache* cache) {
|
||||
if (cache == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_DEC(cache);
|
||||
if (ref == 0) { indexCacheDestroy(cache); }
|
||||
if (ref == 0) {
|
||||
indexCacheDestroy(cache);
|
||||
}
|
||||
}
|
||||
|
||||
void indexMemRef(MemTable* tbl) {
|
||||
if (tbl == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_INC(tbl);
|
||||
UNUSED(ref);
|
||||
}
|
||||
void indexMemUnRef(MemTable* tbl) {
|
||||
if (tbl == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_DEC(tbl);
|
||||
if (ref == 0) {
|
||||
SSkipList* slt = tbl->mem;
|
||||
indexCacheDestroySkiplist(slt);
|
||||
free(tbl);
|
||||
}
|
||||
}
|
||||
|
||||
static void cacheTermDestroy(CacheTerm* ct) {
|
||||
if (ct == NULL) {
|
||||
return;
|
||||
}
|
||||
free(ct->colVal);
|
||||
free(ct);
|
||||
}
|
||||
static char* getIndexKey(const void* pData) {
|
||||
CacheTerm* p = (CacheTerm*)pData;
|
||||
return (char*)p;
|
||||
}
|
||||
|
||||
static int32_t compareKey(const void* l, const void* r) {
|
||||
CacheTerm* lt = (CacheTerm*)l;
|
||||
CacheTerm* rt = (CacheTerm*)r;
|
||||
|
||||
// compare colVal
|
||||
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
||||
if (cmp == 0) {
|
||||
return rt->version - lt->version;
|
||||
}
|
||||
return cmp;
|
||||
}
|
||||
|
||||
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||
MemTable* tbl = calloc(1, sizeof(MemTable));
|
||||
indexMemRef(tbl);
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||
}
|
||||
return tbl;
|
||||
}
|
||||
|
||||
static void doMergeWork(SSchedMsg* msg) {
|
||||
IndexCache* pCache = msg->ahandle;
|
||||
SIndex* sidx = (SIndex*)pCache->index;
|
||||
indexFlushCacheTFile(sidx, pCache);
|
||||
}
|
||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||
SSkipListIterator* iter = itera->iter;
|
||||
if (iter == NULL) {
|
||||
return false;
|
||||
}
|
||||
IterateValue* iv = &itera->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
bool next = tSkipListIterNext(iter);
|
||||
if (next) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
||||
iv->type = ct->operaType;
|
||||
iv->colVal = ct->colVal;
|
||||
|
||||
taosArrayPush(iv->val, &ct->uid);
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }
|
||||
|
|
|
@ -54,7 +54,9 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
|||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||
if (tcache == NULL) { return NULL; }
|
||||
if (tcache == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
tcache->capacity = 64;
|
||||
|
@ -83,7 +85,10 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
tfileReaderRef(reader);
|
||||
// loader fst and validate it
|
||||
TFileHeader* header = &reader->header;
|
||||
TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
||||
TFileCacheKey key = {.suid = header->suid,
|
||||
.colName = header->colName,
|
||||
.nColName = strlen(header->colName),
|
||||
.colType = header->colType};
|
||||
|
||||
char buf[128] = {0};
|
||||
tfileSerialCacheKey(&key, buf);
|
||||
|
@ -97,13 +102,16 @@ End:
|
|||
return NULL;
|
||||
}
|
||||
void tfileCacheDestroy(TFileCache* tcache) {
|
||||
if (tcache == NULL) { return; }
|
||||
if (tcache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// free table cache
|
||||
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
|
||||
while (reader) {
|
||||
TFileReader* p = *reader;
|
||||
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
|
||||
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName,
|
||||
p->header.colType);
|
||||
|
||||
tfileReaderUnRef(p);
|
||||
reader = taosHashIterate(tcache->tableCache, reader);
|
||||
|
@ -116,10 +124,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
|
|||
char buf[128] = {0};
|
||||
tfileSerialCacheKey(key, buf);
|
||||
|
||||
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
||||
tfileReaderRef(reader);
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tfileReaderRef(*reader);
|
||||
|
||||
return reader;
|
||||
return *reader;
|
||||
}
|
||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
|
||||
char buf[128] = {0};
|
||||
|
@ -138,14 +149,17 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
|
|||
}
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
||||
if (reader == NULL) { return NULL; }
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// T_REF_INC(reader);
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderLoadHeader(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||
reader->header.colName);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -158,7 +172,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
return reader;
|
||||
}
|
||||
void tfileReaderDestroy(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
if (reader == NULL) {
|
||||
return;
|
||||
}
|
||||
// T_REF_INC(reader);
|
||||
fstDestroy(reader->fst);
|
||||
writerCtxDestroy(reader->ctx);
|
||||
|
@ -175,10 +191,12 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
|
|||
uint64_t offset;
|
||||
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
|
||||
if (fstGet(reader->fst, &key, &offset)) {
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
|
||||
term->colVal);
|
||||
ret = tfileReaderLoadTableIds(reader, offset, result);
|
||||
} else {
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
||||
term->colVal);
|
||||
}
|
||||
fstSliceDestroy(&key);
|
||||
} else if (qtype == QUERY_PREFIX) {
|
||||
|
@ -304,12 +322,16 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
|
|||
return 0;
|
||||
}
|
||||
void tfileWriteClose(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
writerCtxDestroy(tw->ctx);
|
||||
free(tw);
|
||||
}
|
||||
void tfileWriterDestroy(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
writerCtxDestroy(tw->ctx);
|
||||
free(tw);
|
||||
|
@ -317,29 +339,35 @@ void tfileWriterDestroy(TFileWriter* tw) {
|
|||
|
||||
IndexTFile* indexTFileCreate(const char* path) {
|
||||
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
||||
if (tfile == NULL) { return NULL; }
|
||||
if (tfile == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tfile->cache = tfileCacheCreate(path);
|
||||
return tfile;
|
||||
}
|
||||
void IndexTFileDestroy(IndexTFile* tfile) {
|
||||
free(tfile);
|
||||
}
|
||||
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
|
||||
|
||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||
int ret = -1;
|
||||
if (tfile == NULL) { return ret; }
|
||||
if (tfile == NULL) {
|
||||
return ret;
|
||||
}
|
||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||
|
||||
SIndexTerm* term = query->term;
|
||||
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||
TFileCacheKey key = {
|
||||
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||
if (reader == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return tfileReaderSearch(reader, query, result);
|
||||
}
|
||||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
||||
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
|
||||
// 1};
|
||||
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
|
||||
// term->nColName, .version = 1};
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -353,7 +381,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
|||
|
||||
TFileFstIter* tIter = iiter->iter;
|
||||
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
||||
if (rt == NULL) { return false; }
|
||||
if (rt == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
||||
|
@ -364,20 +394,22 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
|||
|
||||
swsResultDestroy(rt);
|
||||
// set up iterate value
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
iv->colVal = colVal;
|
||||
|
||||
// std::string key(ch, sz);
|
||||
}
|
||||
|
||||
static IterateValue* tifileIterateGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
|
||||
|
||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
||||
if (tIter == NULL) { return NULL; }
|
||||
if (tIter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
||||
tIter->st = streamBuilderIntoStream(tIter->fb);
|
||||
|
@ -389,14 +421,18 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
|
|||
Iterate* iter = calloc(1, sizeof(Iterate));
|
||||
|
||||
iter->iter = tfileFstIteratorCreate(reader);
|
||||
if (iter->iter == NULL) { return NULL; }
|
||||
if (iter->iter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
iter->next = tfileIteratorNext;
|
||||
iter->getValue = tifileIterateGetValue;
|
||||
return iter;
|
||||
}
|
||||
void tfileIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
if (iter == NULL) {
|
||||
return;
|
||||
}
|
||||
IterateValue* iv = &iter->val;
|
||||
iterateValueDestroy(iv, true);
|
||||
|
||||
|
@ -409,14 +445,18 @@ void tfileIteratorDestroy(Iterate* iter) {
|
|||
}
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
||||
if (tf == NULL) { return NULL; }
|
||||
if (tf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
return tfileCacheGet(tf->cache, &key);
|
||||
}
|
||||
|
||||
static int tfileStrCompare(const void* a, const void* b) {
|
||||
int ret = strcmp((char*)a, (char*)b);
|
||||
if (ret == 0) { return ret; }
|
||||
if (ret == 0) {
|
||||
return ret;
|
||||
}
|
||||
return ret < 0 ? -1 : 1;
|
||||
}
|
||||
|
||||
|
@ -431,13 +471,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
|||
|
||||
TFileValue* tfileValueCreate(char* val) {
|
||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||
if (tf == NULL) { return NULL; }
|
||||
if (tf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||
return tf;
|
||||
}
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||
if (tf == NULL) { return -1; }
|
||||
if (tf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
taosArrayPush(tf->tableId, &val);
|
||||
return 0;
|
||||
}
|
||||
|
@ -457,7 +501,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
|||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
||||
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
||||
tw->header.fstOffset = fstOffset;
|
||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
|
||||
return -1;
|
||||
}
|
||||
tw->offset += sizeof(fstOffset);
|
||||
return 0;
|
||||
}
|
||||
|
@ -468,7 +514,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
|
|||
memcpy(buf, (char*)header, sizeof(buf));
|
||||
|
||||
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
||||
if (sizeof(buf) != nwrite) { return -1; }
|
||||
if (sizeof(buf) != nwrite) {
|
||||
return -1;
|
||||
}
|
||||
writer->offset = nwrite;
|
||||
return 0;
|
||||
}
|
||||
|
@ -502,7 +550,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
static int FST_MAX_SIZE = 16 * 1024;
|
||||
|
||||
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
|
||||
if (buf == NULL) { return -1; }
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||
|
@ -525,7 +575,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
|
||||
int32_t total = sizeof(uint64_t) * nid;
|
||||
char* buf = calloc(1, total);
|
||||
if (buf == NULL) { return -1; }
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
nread = ctx->read(ctx, buf, total);
|
||||
assert(total == nread);
|
||||
|
@ -543,12 +595,16 @@ void tfileReaderRef(TFileReader* reader) {
|
|||
|
||||
void tfileReaderUnRef(TFileReader* reader) {
|
||||
int ref = T_REF_DEC(reader);
|
||||
if (ref == 0) { tfileReaderDestroy(reader); }
|
||||
if (ref == 0) {
|
||||
tfileReaderDestroy(reader);
|
||||
}
|
||||
}
|
||||
|
||||
static int tfileGetFileList(const char* path, SArray* result) {
|
||||
DIR* dir = opendir(path);
|
||||
if (NULL == dir) { return -1; }
|
||||
if (NULL == dir) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct dirent* entry;
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
|
@ -576,7 +632,9 @@ static int tfileCompare(const void* a, const void* b) {
|
|||
size_t bLen = strlen(bName);
|
||||
|
||||
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||
if (ret == 0) { return ret; }
|
||||
if (ret == 0) {
|
||||
return ret;
|
||||
}
|
||||
return ret < 0 ? -1 : 1;
|
||||
}
|
||||
// tfile name suid-colId-version.tindex
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation.
|
||||
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||
* Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
|
@ -75,7 +76,9 @@ class FstReadMemory {
|
|||
bool init() {
|
||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) { return false; }
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
_size = nRead;
|
||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||
_fst = fstCreate(&_s);
|
||||
|
@ -179,7 +182,9 @@ void checkFstPerf() {
|
|||
delete fw;
|
||||
|
||||
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||
if (m->init()) { printf("success to init fst read"); }
|
||||
if (m->init()) {
|
||||
printf("success to init fst read");
|
||||
}
|
||||
Performance_fstReadRecords(m);
|
||||
delete m;
|
||||
}
|
||||
|
@ -283,7 +288,8 @@ class IndexEnv : public ::testing::Test {
|
|||
// / {
|
||||
// / std::string colName("tag1"), colVal("Hello world");
|
||||
// / SIndexTerm* term =
|
||||
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / colVal.size());
|
||||
// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), /
|
||||
// colVal.size());
|
||||
// SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
// indexMultiTermAdd(terms, term);
|
||||
// / / for (size_t i = 0; i < 100; i++) {
|
||||
|
@ -301,14 +307,16 @@ class IndexEnv : public ::testing::Test {
|
|||
// / {
|
||||
// / std::string colName("tag1"), colVal("Hello world");
|
||||
// / SIndexTerm* term =
|
||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
|
||||
// colVal.size());
|
||||
// / indexMultiTermAdd(terms, term);
|
||||
// /
|
||||
// }
|
||||
// / {
|
||||
// / std::string colName("tag2"), colVal("Hello world");
|
||||
// / SIndexTerm* term =
|
||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(),
|
||||
// colVal.size());
|
||||
// / indexMultiTermAdd(terms, term);
|
||||
// /
|
||||
// }
|
||||
|
@ -327,7 +335,8 @@ class IndexEnv : public ::testing::Test {
|
|||
|
||||
class TFileObj {
|
||||
public:
|
||||
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) {
|
||||
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
|
||||
: path_(path), colName_(colName) {
|
||||
colId_ = 10;
|
||||
// Do Nothing
|
||||
//
|
||||
|
@ -337,7 +346,9 @@ class TFileObj {
|
|||
tfileReaderDestroy(reader_);
|
||||
reader_ = NULL;
|
||||
}
|
||||
if (writer_ == NULL) { InitWriter(); }
|
||||
if (writer_ == NULL) {
|
||||
InitWriter();
|
||||
}
|
||||
return tfileWriterPut(writer_, tv);
|
||||
}
|
||||
bool InitWriter() {
|
||||
|
@ -377,8 +388,12 @@ class TFileObj {
|
|||
return tfileReaderSearch(reader_, query, result);
|
||||
}
|
||||
~TFileObj() {
|
||||
if (writer_) { tfileWriterDestroy(writer_); }
|
||||
if (reader_) { tfileReaderDestroy(reader_); }
|
||||
if (writer_) {
|
||||
tfileWriterDestroy(writer_);
|
||||
}
|
||||
if (reader_) {
|
||||
tfileReaderDestroy(reader_);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -455,9 +470,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
|||
}
|
||||
taosArrayDestroy(data);
|
||||
|
||||
std::string colName("voltage");
|
||||
std::string colVal("ab");
|
||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
std::string colName("voltage");
|
||||
std::string colVal("ab");
|
||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||
|
||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||
|
@ -525,54 +541,62 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
std::string colName("voltage");
|
||||
{
|
||||
std::string colVal("v1");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v2");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, othColId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v4");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, othColId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v4");
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
colVal[colVal.size() - 1] = 'a' + i;
|
||||
SIndexTerm* term =
|
||||
indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
}
|
||||
coj->Debug();
|
||||
// begin query
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||
STermValueType valType;
|
||||
|
@ -582,8 +606,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
assert(taosArrayGetSize(ret) == 4);
|
||||
}
|
||||
{
|
||||
std::string colVal("v2");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
std::string colVal("v2");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
|
||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||
STermValueType valType;
|
||||
|
@ -592,3 +617,132 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
assert(taosArrayGetSize(ret) == 1);
|
||||
}
|
||||
}
|
||||
class IndexObj {
|
||||
public:
|
||||
IndexObj() {
|
||||
// opt
|
||||
numOfWrite = 0;
|
||||
numOfRead = 0;
|
||||
indexInit();
|
||||
}
|
||||
int Init(const std::string& dir) {
|
||||
taosRemoveDir(dir.c_str());
|
||||
taosMkDir(dir.c_str());
|
||||
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||
if (ret != 0) {
|
||||
// opt
|
||||
std::cout << "failed to open index: %s" << dir << std::endl;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
|
||||
numOfWrite += taosArrayGetSize(fvs);
|
||||
return indexPut(idx, fvs, uid);
|
||||
}
|
||||
int Search(SIndexMultiTermQuery* multiQ, SArray* result) {
|
||||
SArray* query = multiQ->query;
|
||||
numOfRead = taosArrayGetSize(query);
|
||||
return indexSearch(idx, multiQ, result);
|
||||
}
|
||||
|
||||
void Debug() {
|
||||
std::cout << "numOfWrite:" << numOfWrite << std::endl;
|
||||
std::cout << "numOfRead:" << numOfRead << std::endl;
|
||||
}
|
||||
|
||||
~IndexObj() {
|
||||
indexClose(idx);
|
||||
indexCleanUp();
|
||||
}
|
||||
|
||||
private:
|
||||
SIndexOpts opts;
|
||||
SIndex* idx;
|
||||
int numOfWrite;
|
||||
int numOfRead;
|
||||
};
|
||||
|
||||
class IndexEnv2 : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
index = new IndexObj();
|
||||
//
|
||||
}
|
||||
virtual void TearDown() {
|
||||
// r
|
||||
delete index;
|
||||
}
|
||||
IndexObj* index;
|
||||
};
|
||||
TEST_F(IndexEnv2, testIndexOpen) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {
|
||||
std::cout << "failed to init index" << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int targetSize = 100;
|
||||
{
|
||||
std::string colName("tag1"), colVal("Hello world");
|
||||
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = 0; i < targetSize; i++) {
|
||||
int tableId = i;
|
||||
int ret = index->Put(terms, tableId);
|
||||
assert(ret == 0);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
{
|
||||
size_t size = 100;
|
||||
std::string colName("tag1"), colVal("hello world");
|
||||
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
int tableId = i;
|
||||
int ret = index->Put(terms, tableId);
|
||||
assert(ret == 0);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
|
||||
{
|
||||
std::string colName("tag1"), colVal("Hello world");
|
||||
|
||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||
|
||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||
index->Search(mq, result);
|
||||
assert(taosArrayGetSize(result) == targetSize);
|
||||
}
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_CachePut) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(IndexEnv2, testIndexr_TFilePut) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_CacheSearch) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_TFileSearch) {
|
||||
std::string path = "/tmp";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
%default_type {SToken}
|
||||
%extra_argument {SSqlInfo* pInfo}
|
||||
|
||||
%fallback ID BOOL TINYINT SMALLINT INTEGER BIGINT FLOAT DOUBLE STRING TIMESTAMP BINARY NCHAR.
|
||||
%fallback ID BOOL INTEGER FLOAT STRING TIMESTAMP.
|
||||
|
||||
%left OR.
|
||||
%left AND.
|
||||
|
|
|
@ -1,231 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_TTOKENDEF_H
|
||||
#define TDENGINE_TTOKENDEF_H
|
||||
|
||||
#define TK_ID 1
|
||||
#define TK_BOOL 2
|
||||
#define TK_TINYINT 3
|
||||
#define TK_SMALLINT 4
|
||||
#define TK_INTEGER 5
|
||||
#define TK_BIGINT 6
|
||||
#define TK_FLOAT 7
|
||||
#define TK_DOUBLE 8
|
||||
#define TK_STRING 9
|
||||
#define TK_TIMESTAMP 10
|
||||
#define TK_BINARY 11
|
||||
#define TK_NCHAR 12
|
||||
#define TK_OR 13
|
||||
#define TK_AND 14
|
||||
#define TK_NOT 15
|
||||
#define TK_EQ 16
|
||||
#define TK_NE 17
|
||||
#define TK_ISNULL 18
|
||||
#define TK_NOTNULL 19
|
||||
#define TK_IS 20
|
||||
#define TK_LIKE 21
|
||||
#define TK_MATCH 22
|
||||
#define TK_NMATCH 23
|
||||
#define TK_GLOB 24
|
||||
#define TK_BETWEEN 25
|
||||
#define TK_IN 26
|
||||
#define TK_GT 27
|
||||
#define TK_GE 28
|
||||
#define TK_LT 29
|
||||
#define TK_LE 30
|
||||
#define TK_BITAND 31
|
||||
#define TK_BITOR 32
|
||||
#define TK_LSHIFT 33
|
||||
#define TK_RSHIFT 34
|
||||
#define TK_PLUS 35
|
||||
#define TK_MINUS 36
|
||||
#define TK_DIVIDE 37
|
||||
#define TK_TIMES 38
|
||||
#define TK_STAR 39
|
||||
#define TK_SLASH 40
|
||||
#define TK_REM 41
|
||||
#define TK_CONCAT 42
|
||||
#define TK_UMINUS 43
|
||||
#define TK_UPLUS 44
|
||||
#define TK_BITNOT 45
|
||||
#define TK_SHOW 46
|
||||
#define TK_DATABASES 47
|
||||
#define TK_TOPICS 48
|
||||
#define TK_FUNCTIONS 49
|
||||
#define TK_MNODES 50
|
||||
#define TK_DNODES 51
|
||||
#define TK_ACCOUNTS 52
|
||||
#define TK_USERS 53
|
||||
#define TK_MODULES 54
|
||||
#define TK_QUERIES 55
|
||||
#define TK_CONNECTIONS 56
|
||||
#define TK_STREAMS 57
|
||||
#define TK_VARIABLES 58
|
||||
#define TK_SCORES 59
|
||||
#define TK_GRANTS 60
|
||||
#define TK_VNODES 61
|
||||
#define TK_DOT 62
|
||||
#define TK_CREATE 63
|
||||
#define TK_TABLE 64
|
||||
#define TK_STABLE 65
|
||||
#define TK_DATABASE 66
|
||||
#define TK_TABLES 67
|
||||
#define TK_STABLES 68
|
||||
#define TK_VGROUPS 69
|
||||
#define TK_DROP 70
|
||||
#define TK_TOPIC 71
|
||||
#define TK_FUNCTION 72
|
||||
#define TK_DNODE 73
|
||||
#define TK_USER 74
|
||||
#define TK_ACCOUNT 75
|
||||
#define TK_USE 76
|
||||
#define TK_DESCRIBE 77
|
||||
#define TK_DESC 78
|
||||
#define TK_ALTER 79
|
||||
#define TK_PASS 80
|
||||
#define TK_PRIVILEGE 81
|
||||
#define TK_LOCAL 82
|
||||
#define TK_COMPACT 83
|
||||
#define TK_LP 84
|
||||
#define TK_RP 85
|
||||
#define TK_IF 86
|
||||
#define TK_EXISTS 87
|
||||
#define TK_PORT 88
|
||||
#define TK_IPTOKEN 89
|
||||
#define TK_AS 90
|
||||
#define TK_OUTPUTTYPE 91
|
||||
#define TK_AGGREGATE 92
|
||||
#define TK_BUFSIZE 93
|
||||
#define TK_PPS 94
|
||||
#define TK_TSERIES 95
|
||||
#define TK_DBS 96
|
||||
#define TK_STORAGE 97
|
||||
#define TK_QTIME 98
|
||||
#define TK_CONNS 99
|
||||
#define TK_STATE 100
|
||||
#define TK_COMMA 101
|
||||
#define TK_KEEP 102
|
||||
#define TK_CACHE 103
|
||||
#define TK_REPLICA 104
|
||||
#define TK_QUORUM 105
|
||||
#define TK_DAYS 106
|
||||
#define TK_MINROWS 107
|
||||
#define TK_MAXROWS 108
|
||||
#define TK_BLOCKS 109
|
||||
#define TK_CTIME 110
|
||||
#define TK_WAL 111
|
||||
#define TK_FSYNC 112
|
||||
#define TK_COMP 113
|
||||
#define TK_PRECISION 114
|
||||
#define TK_UPDATE 115
|
||||
#define TK_CACHELAST 116
|
||||
#define TK_UNSIGNED 117
|
||||
#define TK_TAGS 118
|
||||
#define TK_USING 119
|
||||
#define TK_NULL 120
|
||||
#define TK_NOW 121
|
||||
#define TK_SELECT 122
|
||||
#define TK_UNION 123
|
||||
#define TK_ALL 124
|
||||
#define TK_DISTINCT 125
|
||||
#define TK_FROM 126
|
||||
#define TK_VARIABLE 127
|
||||
#define TK_INTERVAL 128
|
||||
#define TK_EVERY 129
|
||||
#define TK_SESSION 130
|
||||
#define TK_STATE_WINDOW 131
|
||||
#define TK_FILL 132
|
||||
#define TK_SLIDING 133
|
||||
#define TK_ORDER 134
|
||||
#define TK_BY 135
|
||||
#define TK_ASC 136
|
||||
#define TK_GROUP 137
|
||||
#define TK_HAVING 138
|
||||
#define TK_LIMIT 139
|
||||
#define TK_OFFSET 140
|
||||
#define TK_SLIMIT 141
|
||||
#define TK_SOFFSET 142
|
||||
#define TK_WHERE 143
|
||||
#define TK_RESET 144
|
||||
#define TK_QUERY 145
|
||||
#define TK_SYNCDB 146
|
||||
#define TK_ADD 147
|
||||
#define TK_COLUMN 148
|
||||
#define TK_MODIFY 149
|
||||
#define TK_TAG 150
|
||||
#define TK_CHANGE 151
|
||||
#define TK_SET 152
|
||||
#define TK_KILL 153
|
||||
#define TK_CONNECTION 154
|
||||
#define TK_STREAM 155
|
||||
#define TK_COLON 156
|
||||
#define TK_ABORT 157
|
||||
#define TK_AFTER 158
|
||||
#define TK_ATTACH 159
|
||||
#define TK_BEFORE 160
|
||||
#define TK_BEGIN 161
|
||||
#define TK_CASCADE 162
|
||||
#define TK_CLUSTER 163
|
||||
#define TK_CONFLICT 164
|
||||
#define TK_COPY 165
|
||||
#define TK_DEFERRED 166
|
||||
#define TK_DELIMITERS 167
|
||||
#define TK_DETACH 168
|
||||
#define TK_EACH 169
|
||||
#define TK_END 170
|
||||
#define TK_EXPLAIN 171
|
||||
#define TK_FAIL 172
|
||||
#define TK_FOR 173
|
||||
#define TK_IGNORE 174
|
||||
#define TK_IMMEDIATE 175
|
||||
#define TK_INITIALLY 176
|
||||
#define TK_INSTEAD 177
|
||||
#define TK_KEY 178
|
||||
#define TK_OF 179
|
||||
#define TK_RAISE 180
|
||||
#define TK_REPLACE 181
|
||||
#define TK_RESTRICT 182
|
||||
#define TK_ROW 183
|
||||
#define TK_STATEMENT 184
|
||||
#define TK_TRIGGER 185
|
||||
#define TK_VIEW 186
|
||||
#define TK_SEMI 187
|
||||
#define TK_NONE 188
|
||||
#define TK_PREV 189
|
||||
#define TK_LINEAR 190
|
||||
#define TK_IMPORT 191
|
||||
#define TK_TBNAME 192
|
||||
#define TK_JOIN 193
|
||||
#define TK_INSERT 194
|
||||
#define TK_INTO 195
|
||||
#define TK_VALUES 196
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_COMMENT 301
|
||||
#define TK_ILLEGAL 302
|
||||
#define TK_HEX 303 // hex number 0x123
|
||||
#define TK_OCT 304 // oct number
|
||||
#define TK_BIN 305 // bin format data 0b111
|
||||
#define TK_FILE 306
|
||||
#define TK_QUESTION 307 // denoting the placeholder of "?",when invoking statement bind query
|
||||
|
||||
#endif
|
||||
|
||||
|
|
@ -276,7 +276,7 @@ bool tSqlExprIsLeaf(tSqlExpr *pExpr) {
|
|||
return (pExpr->pRight == NULL && pExpr->pLeft == NULL) &&
|
||||
(pExpr->tokenId == 0 ||
|
||||
(pExpr->tokenId == TK_ID) ||
|
||||
(pExpr->tokenId >= TK_BOOL && pExpr->tokenId <= TK_NCHAR) ||
|
||||
(pExpr->tokenId == TK_BOOL || pExpr->tokenId == TK_STRING || pExpr->tokenId == TK_FLOAT) ||
|
||||
(pExpr->tokenId == TK_NULL) ||
|
||||
(pExpr->tokenId == TK_SET));
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#include <tmsg.h>
|
||||
#include <ttime.h>
|
||||
#include "astToMsg.h"
|
||||
#include "parserInt.h"
|
||||
|
@ -283,7 +284,7 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
|
||||
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, SEpSet* pEpSet) {
|
||||
const char* msg1 = "invalid table name";
|
||||
const char* msg2 = "tags number not matched";
|
||||
const char* msg3 = "tag value too long";
|
||||
|
@ -316,13 +317,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
|||
const char* pStableName = tNameGetTableName(&name);
|
||||
SArray* pValList = pCreateTableInfo->pTagVals;
|
||||
|
||||
size_t valSize = taosArrayGetSize(pValList);
|
||||
size_t numOfInputTag = taosArrayGetSize(pValList);
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
|
||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbName);
|
||||
|
||||
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pStableName, &pSuperTableMeta);
|
||||
assert(pSuperTableMeta != NULL);
|
||||
|
||||
// too long tag values will return invalid sql, not be truncated automatically
|
||||
SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||
|
@ -342,7 +344,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
|||
pNameList = pCreateTableInfo->pTagNames;
|
||||
nameSize = taosArrayGetSize(pNameList);
|
||||
|
||||
if (valSize != nameSize || schemaSize < valSize) {
|
||||
if (numOfInputTag != nameSize || schemaSize < numOfInputTag) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
@ -418,33 +420,36 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
|||
}
|
||||
}
|
||||
} else {
|
||||
if (schemaSize != valSize) {
|
||||
if (schemaSize != numOfInputTag) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < valSize; ++i) {
|
||||
for (int32_t i = 0; i < numOfInputTag; ++i) {
|
||||
SSchema *pSchema = &pTagSchema[i];
|
||||
SListItem *pItem = taosArrayGet(pValList, i);
|
||||
SToken* pItem = taosArrayGet(pValList, i);
|
||||
|
||||
char tagVal[TSDB_MAX_TAGS_LEN];
|
||||
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pItem->pVar.nLen > pSchema->bytes) {
|
||||
if (pItem->n > pSchema->bytes) {
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
}
|
||||
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
|
||||
// code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
}
|
||||
} else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision);
|
||||
}
|
||||
// if (pItem->pVar.nType == TSDB_DATA_TYPE_BINARY) {
|
||||
//// code = convertTimestampStrToInt64(&(pItem->pVar), tinfo.precision);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
// }
|
||||
// } else if (pItem->pVar.nType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
// pItem->pVar.i = convertTimePrecision(pItem->pVar.i, TSDB_TIME_PRECISION_NANO, tinfo.precision);
|
||||
// }
|
||||
}
|
||||
|
||||
code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
|
||||
char* endPtr = NULL;
|
||||
int64_t v = strtoll(pItem->z, &endPtr, 10);
|
||||
*(int32_t*) tagVal = v;
|
||||
// code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
|
||||
|
||||
// check again after the convert since it may be converted from binary to nchar.
|
||||
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
@ -469,33 +474,37 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
|
|||
if (row == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
tdSortKVRowByColIdx(row);
|
||||
pTag->dataLen = kvRowLen(row);
|
||||
|
||||
if (pTag->data == NULL) {
|
||||
pTag->data = malloc(pTag->dataLen);
|
||||
tdSortKVRowByColIdx(row);
|
||||
|
||||
SName tableName = {0};
|
||||
code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
kvRowCpy(pTag->data, row);
|
||||
free(row);
|
||||
struct SVCreateTbReq req = {0};
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(tNameGetTableName(&tableName));
|
||||
req.ctbCfg.suid = pSuperTableMeta->suid;
|
||||
req.ctbCfg.pTag = row;
|
||||
|
||||
bool dbIncluded2 = false;
|
||||
// table name
|
||||
// if (tscValidateName(&(pCreateTableInfo->name), true, &dbIncluded2) != TSDB_CODE_SUCCESS) {
|
||||
// return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
// }
|
||||
int32_t serLen = tSerializeSVCreateTbReq(NULL, &req);
|
||||
char* buf1 = calloc(1, serLen);
|
||||
char* p = buf1;
|
||||
tSerializeSVCreateTbReq((void*) &buf1, &req);
|
||||
*pOutput = p;
|
||||
*len = serLen;
|
||||
|
||||
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
|
||||
// code = tscSetTableFullName(&pTableMetaInfo->name, &pCreateTableInfo->name, pSql, dbIncluded2);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
// }
|
||||
SVgroupInfo info = {0};
|
||||
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, req.name, &info);
|
||||
|
||||
// pCreateTableInfo->fullname = calloc(1, tNameLen(&pTableMetaInfo->name) + 1);
|
||||
// code = tNameExtractFullName(&pTableMetaInfo->name, pCreateTableInfo->fullname);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
// }
|
||||
pEpSet->inUse = info.inUse;
|
||||
pEpSet->numOfEps = info.numOfEps;
|
||||
for(int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
pEpSet->port[i] = info.epAddr[i].port;
|
||||
tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -692,10 +701,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
|
|||
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE)? TDMT_VND_CREATE_TABLE:TDMT_MND_CREATE_STB;
|
||||
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
|
||||
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pDcl->msgType = TDMT_VND_CREATE_TABLE;
|
||||
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
|
||||
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
|
|
|
@ -26,7 +26,7 @@ size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
SSchema* getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
|
||||
assert(pTableMeta != NULL && pTableMeta->schema != NULL && colIndex >= 0 && colIndex < getNumOfColumns(pTableMeta));
|
||||
assert(pTableMeta != NULL && pTableMeta->schema != NULL && colIndex >= 0 && colIndex < (getNumOfColumns(pTableMeta) + getNumOfTags(pTableMeta)));
|
||||
|
||||
SSchema* pSchema = (SSchema*) pTableMeta->schema;
|
||||
return &pSchema[colIndex];
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -31,17 +31,17 @@ typedef struct SKeyword {
|
|||
static SKeyword keywordTable[] = {
|
||||
{"ID", TK_ID},
|
||||
{"BOOL", TK_BOOL},
|
||||
{"TINYINT", TK_TINYINT},
|
||||
{"SMALLINT", TK_SMALLINT},
|
||||
// {"TINYINT", TK_TINYINT},
|
||||
// {"SMALLINT", TK_SMALLINT},
|
||||
{"INTEGER", TK_INTEGER},
|
||||
{"INT", TK_INTEGER},
|
||||
{"BIGINT", TK_BIGINT},
|
||||
// {"BIGINT", TK_BIGINT},
|
||||
{"FLOAT", TK_FLOAT},
|
||||
{"DOUBLE", TK_DOUBLE},
|
||||
// {"DOUBLE", TK_DOUBLE},
|
||||
{"STRING", TK_STRING},
|
||||
{"TIMESTAMP", TK_TIMESTAMP},
|
||||
{"BINARY", TK_BINARY},
|
||||
{"NCHAR", TK_NCHAR},
|
||||
// {"BINARY", TK_BINARY},
|
||||
// {"NCHAR", TK_NCHAR},
|
||||
{"OR", TK_OR},
|
||||
{"AND", TK_AND},
|
||||
{"NOT", TK_NOT},
|
||||
|
|
|
@ -791,9 +791,29 @@ void schDropJobAllTasks(SSchJob *job) {
|
|||
}
|
||||
}
|
||||
|
||||
uint64_t schGenSchId(void) {
|
||||
uint64_t sId = 0;
|
||||
|
||||
// TODO
|
||||
|
||||
qDebug("Gen sId:0x%"PRIx64, sId);
|
||||
|
||||
return sId;
|
||||
}
|
||||
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||
if (schMgmt.jobs) {
|
||||
qError("scheduler already init");
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
if (cfg) {
|
||||
schMgmt.cfg = *cfg;
|
||||
|
||||
if (schMgmt.cfg.maxJobNum <= 0) {
|
||||
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
||||
}
|
||||
} else {
|
||||
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
||||
}
|
||||
|
@ -803,18 +823,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
|
||||
}
|
||||
|
||||
schMgmt.sId = 1; //TODO GENERATE A UUID
|
||||
schMgmt.sId = schGenSchId();
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
|
||||
if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(qnodeList) <= 0) {
|
||||
if (qnodeList && taosArrayGetSize(qnodeList) <= 0) {
|
||||
qInfo("qnodeList is empty");
|
||||
}
|
||||
|
||||
|
@ -882,6 +898,10 @@ _return:
|
|||
}
|
||||
|
||||
int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) {
|
||||
if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
*numOfRows = 0;
|
||||
|
||||
SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true));
|
||||
|
@ -894,6 +914,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
|
|||
}
|
||||
|
||||
int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
|
||||
if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue