Merge from origin/develop

This commit is contained in:
Shengliang Guan 2020-12-26 21:37:05 +08:00
commit df0a9a343f
32 changed files with 1804 additions and 1482 deletions

17
Jenkinsfile vendored
View File

@ -79,14 +79,25 @@ pipeline {
changeRequest() changeRequest()
} }
parallel { parallel {
stage('python') { stage('python_1') {
agent{label 'pytest'} agent{label 'p1'}
steps { steps {
pre_test() pre_test()
sh ''' sh '''
cd ${WKC}/tests cd ${WKC}/tests
./test-all.sh pytestfq ./test-all.sh p1
date'''
}
}
stage('python_2') {
agent{label 'p2'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh p2
date''' date'''
} }
} }

View File

@ -9,7 +9,7 @@ GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m' GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
set -e # set -e
# set -x # set -x
corePath=$1 corePath=$1

View File

@ -5442,6 +5442,7 @@ static void setCreateDBOption(SCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg->quorum = pCreateDb->quorum; pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update; pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast;
} }
int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) { int32_t parseCreateDBOptions(SSqlCmd* pCmd, SCreateDBInfo* pCreateDbSql) {

View File

@ -94,6 +94,7 @@ extern int32_t tsFsyncPeriod;
extern int32_t tsReplications; extern int32_t tsReplications;
extern int32_t tsQuorum; extern int32_t tsQuorum;
extern int8_t tsUpdate; extern int8_t tsUpdate;
extern int8_t tsCacheLastRow;
// balance // balance
extern int8_t tsEnableBalance; extern int8_t tsEnableBalance;

View File

@ -127,6 +127,7 @@ int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
int8_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION; int8_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
int8_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int32_t tsMaxVgroupsPerDb = 0; int32_t tsMaxVgroupsPerDb = 0;
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP; int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES; int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;

View File

@ -217,4 +217,4 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
dnodeStartMnode(&pCfg->mnodes); dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -369,6 +369,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MAX_DB_UPDATE 1 #define TSDB_MAX_DB_UPDATE 1
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0 #define TSDB_DEFAULT_DB_UPDATE_OPTION 0
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
#define TSDB_MIN_FSYNC_PERIOD 0 #define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond #define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second #define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second

View File

@ -549,7 +549,8 @@ typedef struct {
int8_t quorum; int8_t quorum;
int8_t ignoreExist; int8_t ignoreExist;
int8_t update; int8_t update;
int8_t reserve[9]; int8_t cacheLastRow;
int8_t reserve[8];
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
@ -665,8 +666,9 @@ typedef struct {
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t update; int8_t update;
int8_t reserved[11]; int8_t cacheLastRow;
int32_t vgCfgVersion; int32_t vgCfgVersion;
int8_t reserved[10];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {

View File

@ -66,6 +66,7 @@ typedef struct {
int8_t precision; int8_t precision;
int8_t compression; int8_t compression;
int8_t update; int8_t update;
int8_t cacheLastRow;
} STsdbCfg; } STsdbCfg;
// --------- TSDB REPOSITORY USAGE STATISTICS // --------- TSDB REPOSITORY USAGE STATISTICS
@ -119,7 +120,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); // TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);

View File

@ -114,114 +114,115 @@
#define TK_COMP 96 #define TK_COMP 96
#define TK_PRECISION 97 #define TK_PRECISION 97
#define TK_UPDATE 98 #define TK_UPDATE 98
#define TK_LP 99 #define TK_CACHELAST 99
#define TK_RP 100 #define TK_LP 100
#define TK_TAGS 101 #define TK_RP 101
#define TK_USING 102 #define TK_TAGS 102
#define TK_AS 103 #define TK_USING 103
#define TK_COMMA 104 #define TK_AS 104
#define TK_NULL 105 #define TK_COMMA 105
#define TK_SELECT 106 #define TK_NULL 106
#define TK_UNION 107 #define TK_SELECT 107
#define TK_ALL 108 #define TK_UNION 108
#define TK_FROM 109 #define TK_ALL 109
#define TK_VARIABLE 110 #define TK_FROM 110
#define TK_INTERVAL 111 #define TK_VARIABLE 111
#define TK_FILL 112 #define TK_INTERVAL 112
#define TK_SLIDING 113 #define TK_FILL 113
#define TK_ORDER 114 #define TK_SLIDING 114
#define TK_BY 115 #define TK_ORDER 115
#define TK_ASC 116 #define TK_BY 116
#define TK_DESC 117 #define TK_ASC 117
#define TK_GROUP 118 #define TK_DESC 118
#define TK_HAVING 119 #define TK_GROUP 119
#define TK_LIMIT 120 #define TK_HAVING 120
#define TK_OFFSET 121 #define TK_LIMIT 121
#define TK_SLIMIT 122 #define TK_OFFSET 122
#define TK_SOFFSET 123 #define TK_SLIMIT 123
#define TK_WHERE 124 #define TK_SOFFSET 124
#define TK_NOW 125 #define TK_WHERE 125
#define TK_RESET 126 #define TK_NOW 126
#define TK_QUERY 127 #define TK_RESET 127
#define TK_ADD 128 #define TK_QUERY 128
#define TK_COLUMN 129 #define TK_ADD 129
#define TK_TAG 130 #define TK_COLUMN 130
#define TK_CHANGE 131 #define TK_TAG 131
#define TK_SET 132 #define TK_CHANGE 132
#define TK_KILL 133 #define TK_SET 133
#define TK_CONNECTION 134 #define TK_KILL 134
#define TK_STREAM 135 #define TK_CONNECTION 135
#define TK_COLON 136 #define TK_STREAM 136
#define TK_ABORT 137 #define TK_COLON 137
#define TK_AFTER 138 #define TK_ABORT 138
#define TK_ATTACH 139 #define TK_AFTER 139
#define TK_BEFORE 140 #define TK_ATTACH 140
#define TK_BEGIN 141 #define TK_BEFORE 141
#define TK_CASCADE 142 #define TK_BEGIN 142
#define TK_CLUSTER 143 #define TK_CASCADE 143
#define TK_CONFLICT 144 #define TK_CLUSTER 144
#define TK_COPY 145 #define TK_CONFLICT 145
#define TK_DEFERRED 146 #define TK_COPY 146
#define TK_DELIMITERS 147 #define TK_DEFERRED 147
#define TK_DETACH 148 #define TK_DELIMITERS 148
#define TK_EACH 149 #define TK_DETACH 149
#define TK_END 150 #define TK_EACH 150
#define TK_EXPLAIN 151 #define TK_END 151
#define TK_FAIL 152 #define TK_EXPLAIN 152
#define TK_FOR 153 #define TK_FAIL 153
#define TK_IGNORE 154 #define TK_FOR 154
#define TK_IMMEDIATE 155 #define TK_IGNORE 155
#define TK_INITIALLY 156 #define TK_IMMEDIATE 156
#define TK_INSTEAD 157 #define TK_INITIALLY 157
#define TK_MATCH 158 #define TK_INSTEAD 158
#define TK_KEY 159 #define TK_MATCH 159
#define TK_OF 160 #define TK_KEY 160
#define TK_RAISE 161 #define TK_OF 161
#define TK_REPLACE 162 #define TK_RAISE 162
#define TK_RESTRICT 163 #define TK_REPLACE 163
#define TK_ROW 164 #define TK_RESTRICT 164
#define TK_STATEMENT 165 #define TK_ROW 165
#define TK_TRIGGER 166 #define TK_STATEMENT 166
#define TK_VIEW 167 #define TK_TRIGGER 167
#define TK_COUNT 168 #define TK_VIEW 168
#define TK_SUM 169 #define TK_COUNT 169
#define TK_AVG 170 #define TK_SUM 170
#define TK_MIN 171 #define TK_AVG 171
#define TK_MAX 172 #define TK_MIN 172
#define TK_FIRST 173 #define TK_MAX 173
#define TK_LAST 174 #define TK_FIRST 174
#define TK_TOP 175 #define TK_LAST 175
#define TK_BOTTOM 176 #define TK_TOP 176
#define TK_STDDEV 177 #define TK_BOTTOM 177
#define TK_PERCENTILE 178 #define TK_STDDEV 178
#define TK_APERCENTILE 179 #define TK_PERCENTILE 179
#define TK_LEASTSQUARES 180 #define TK_APERCENTILE 180
#define TK_HISTOGRAM 181 #define TK_LEASTSQUARES 181
#define TK_DIFF 182 #define TK_HISTOGRAM 182
#define TK_SPREAD 183 #define TK_DIFF 183
#define TK_TWA 184 #define TK_SPREAD 184
#define TK_INTERP 185 #define TK_TWA 185
#define TK_LAST_ROW 186 #define TK_INTERP 186
#define TK_RATE 187 #define TK_LAST_ROW 187
#define TK_IRATE 188 #define TK_RATE 188
#define TK_SUM_RATE 189 #define TK_IRATE 189
#define TK_SUM_IRATE 190 #define TK_SUM_RATE 190
#define TK_AVG_RATE 191 #define TK_SUM_IRATE 191
#define TK_AVG_IRATE 192 #define TK_AVG_RATE 192
#define TK_TBID 193 #define TK_AVG_IRATE 193
#define TK_SEMI 194 #define TK_TBID 194
#define TK_NONE 195 #define TK_SEMI 195
#define TK_PREV 196 #define TK_NONE 196
#define TK_LINEAR 197 #define TK_PREV 197
#define TK_IMPORT 198 #define TK_LINEAR 198
#define TK_METRIC 199 #define TK_IMPORT 199
#define TK_TBNAME 200 #define TK_METRIC 200
#define TK_JOIN 201 #define TK_TBNAME 201
#define TK_METRICS 202 #define TK_JOIN 202
#define TK_STABLE 203 #define TK_METRICS 203
#define TK_INSERT 204 #define TK_STABLE 204
#define TK_INTO 205 #define TK_INSERT 205
#define TK_VALUES 206 #define TK_INTO 206
#define TK_VALUES 207
#define TK_SPACE 300 #define TK_SPACE 300

View File

@ -95,7 +95,7 @@ typedef struct DemoArguments {
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3}, {0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
#endif #endif
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3}, {0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 3}, {0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3}, {0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 's', "sql file", 0, "The select sql file.", 3}, {0, 's', "sql file", 0, "The select sql file.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13}, {0, 'M', 0, 0, "Use metric flag.", 13},
@ -205,10 +205,10 @@ typedef struct DemoArguments {
arguments->tb_prefix = arg; arguments->tb_prefix = arg;
break; break;
case 'M': case 'M':
arguments->use_metric = false; arguments->use_metric = true;
break; break;
case 'x': case 'x':
arguments->insert_only = false; arguments->insert_only = true;
break; break;
case 'c': case 'c':
if (wordexp(arg, &full_path, 0) != 0) { if (wordexp(arg, &full_path, 0) != 0) {
@ -406,9 +406,9 @@ typedef struct DemoArguments {
} else if (strcmp(argv[i], "-m") == 0) { } else if (strcmp(argv[i], "-m") == 0) {
arguments->tb_prefix = argv[++i]; arguments->tb_prefix = argv[++i];
} else if (strcmp(argv[i], "-M") == 0) { } else if (strcmp(argv[i], "-M") == 0) {
arguments->use_metric = false; arguments->use_metric = true;
} else if (strcmp(argv[i], "-x") == 0) { } else if (strcmp(argv[i], "-x") == 0) {
arguments->insert_only = false; arguments->insert_only = true;
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) { } else if (strcmp(argv[i], "-O") == 0) {
@ -476,6 +476,14 @@ typedef struct {
int notFinished; int notFinished;
tsem_t lock_sem; tsem_t lock_sem;
int counter; int counter;
// insert delay statitics
int64_t cntDelay;
int64_t totalDelay;
int64_t avgDelay;
int64_t maxDelay;
int64_t minDelay;
} info; } info;
typedef struct { typedef struct {
@ -575,7 +583,7 @@ int main(int argc, char *argv[]) {
arguments.num_of_DPT = 100000; arguments.num_of_DPT = 100000;
arguments.num_of_RPR = 1000; arguments.num_of_RPR = 1000;
arguments.use_metric = true; arguments.use_metric = true;
arguments.insert_only = true; arguments.insert_only = false;
// end change // end change
parse_args(argc, argv, &arguments); parse_args(argc, argv, &arguments);
@ -739,6 +747,9 @@ int main(int argc, char *argv[]) {
printf("Inserting data......\n"); printf("Inserting data......\n");
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(threads * sizeof(info)); info *infos = malloc(threads * sizeof(info));
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(info));
int a = ntables / threads; int a = ntables / threads;
if (a < 1) { if (a < 1) {
@ -768,6 +779,7 @@ int main(int argc, char *argv[]) {
t_info->end_table_id = i < b ? last + a : last + a - 1; t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1; last = t_info->end_table_id + 1;
t_info->counter = 0; t_info->counter = 0;
t_info->minDelay = INT16_MAX;
tsem_init(&(t_info->mutex_sem), 0, 1); tsem_init(&(t_info->mutex_sem), 0, 1);
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1; t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
@ -799,12 +811,29 @@ int main(int argc, char *argv[]) {
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request, t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
(int64_t)ntables * nrecords_per_table / t); (int64_t)ntables * nrecords_per_table / t);
int64_t totalDelay = 0;
int64_t maxDelay = 0;
int64_t minDelay = INT16_MAX;
int64_t cntDelay = 0;
double avgDelay = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
info *t_info = infos + i; info *t_info = infos + i;
taos_close(t_info->taos); taos_close(t_info->taos);
tsem_destroy(&(t_info->mutex_sem)); tsem_destroy(&(t_info->mutex_sem));
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
} }
avgDelay = (double)totalDelay / cntDelay;
fprintf(fp, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);
free(pids); free(pids);
free(infos); free(infos);
@ -859,7 +888,7 @@ int main(int argc, char *argv[]) {
} }
if (!insert_only) { if (false == insert_only) {
// query data // query data
pthread_t read_id; pthread_t read_id;
info *rInfo = malloc(sizeof(info)); info *rInfo = malloc(sizeof(info));
@ -998,7 +1027,7 @@ void * createTable(void *sarg)
/* Create all the tables; */ /* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s);", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command); queryDB(winfo->taos, command);
} }
} else { } else {
@ -1204,6 +1233,41 @@ void *readMetric(void *sarg) {
return NULL; return NULL;
} }
static int queryDbExec(TAOS *taos, char *command, int type) {
int i;
TAOS_RES *res = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
}
res = taos_query(taos, command);
code = taos_errno(res);
if (0 == code) {
break;
}
}
if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res);
//taos_close(taos);
return -1;
}
if (1 == type) {
int affectedRows = taos_affected_rows(res);
taos_free_result(res);
return affectedRows;
}
taos_free_result(res);
return 0;
}
void queryDB(TAOS *taos, char *command) { void queryDB(TAOS *taos, char *command) {
int i; int i;
TAOS_RES *pSql = NULL; TAOS_RES *pSql = NULL;
@ -1273,7 +1337,21 @@ void *syncWrite(void *sarg) {
} }
/* puts(buffer); */ /* puts(buffer); */
queryDB(winfo->taos, buffer); int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
if (0 <= affectedRows){
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}
if (tID == winfo->end_table_id) { if (tID == winfo->end_table_id) {
i = inserted; i = inserted;

View File

@ -174,7 +174,8 @@ typedef struct {
int8_t replications; int8_t replications;
int8_t quorum; int8_t quorum;
int8_t update; int8_t update;
int8_t reserved[11]; int8_t cacheLastRow;
int8_t reserved[10];
} SDbCfg; } SDbCfg;
typedef struct SDbObj { typedef struct SDbObj {

View File

@ -322,6 +322,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return TSDB_CODE_MND_INVALID_DB_OPTION; return TSDB_CODE_MND_INVALID_DB_OPTION;
} }
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
mError("invalid db option cacheLastRow:%d valid range: [%d, %d]", pCfg->cacheLastRow, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
return TSDB_CODE_MND_INVALID_DB_OPTION;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -343,6 +348,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = tsReplications; if (pCfg->replications < 0) pCfg->replications = tsReplications;
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum; if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
if (pCfg->update < 0) pCfg->update = tsUpdate; if (pCfg->update < 0) pCfg->update = tsUpdate;
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
} }
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
@ -396,7 +402,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
.walLevel = pCreate->walLevel, .walLevel = pCreate->walLevel,
.replications = pCreate->replications, .replications = pCreate->replications,
.quorum = pCreate->quorum, .quorum = pCreate->quorum,
.update = pCreate->update .update = pCreate->update,
.cacheLastRow = pCreate->cacheLastRow
}; };
mnodeSetDefaultDbCfg(&pDb->cfg); mnodeSetDefaultDbCfg(&pDb->cfg);
@ -605,6 +612,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
strcpy(pSchema[cols].name, "comp"); strcpy(pSchema[cols].name, "comp");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "cachelast");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
#ifndef __CLOUD_VERSION__ #ifndef __CLOUD_VERSION__
} }
#endif #endif
@ -750,6 +763,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.compression; *(int8_t *)pWrite = pDb->cfg.compression;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
#ifndef __CLOUD_VERSION__ #ifndef __CLOUD_VERSION__
} }
#endif #endif
@ -864,6 +881,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
int8_t quorum = pAlter->quorum; int8_t quorum = pAlter->quorum;
int8_t precision = pAlter->precision; int8_t precision = pAlter->precision;
int8_t update = pAlter->update; int8_t update = pAlter->update;
int8_t cacheLastRow = pAlter->cacheLastRow;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
@ -976,6 +994,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
#endif #endif
} }
if (cacheLastRow >= 0 && cacheLastRow != pDb->cfg.cacheLastRow) {
mDebug("db:%s, cacheLastRow:%d change to %d", pDb->name, pDb->cfg.cacheLastRow, cacheLastRow);
newCfg.cacheLastRow = cacheLastRow;
}
return newCfg; return newCfg;
} }

View File

@ -863,6 +863,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
pCfg->wals = 3; pCfg->wals = 3;
pCfg->quorum = pDb->cfg.quorum; pCfg->quorum = pDb->cfg.quorum;
pCfg->update = pDb->cfg.update; pCfg->update = pDb->cfg.update;
pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
SVnodeDesc *pNodes = pVnode->nodes; SVnodeDesc *pNodes = pVnode->nodes;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {

View File

@ -120,7 +120,8 @@ typedef struct SCreateDBInfo {
int32_t compressionLevel; int32_t compressionLevel;
SStrToken precision; SStrToken precision;
bool ignoreExists; bool ignoreExists;
int8_t update; int8_t update;
int8_t cachelast;
SArray *keep; SArray *keep;
} SCreateDBInfo; } SCreateDBInfo;

View File

@ -112,29 +112,29 @@ cmd ::= SHOW dbPrefix(X) STABLES. {
cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). { cmd ::= SHOW dbPrefix(X) STABLES LIKE ids(Y). {
SStrToken token; SStrToken token;
setDBName(&token, &X); setDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &token, &Y); setShowOptions(pInfo, TSDB_MGMT_TABLE_METRIC, &token, &Y);
} }
cmd ::= SHOW dbPrefix(X) VGROUPS. { cmd ::= SHOW dbPrefix(X) VGROUPS. {
SStrToken token; SStrToken token;
setDBName(&token, &X); setDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, 0); setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, 0);
} }
cmd ::= SHOW dbPrefix(X) VGROUPS ids(Y). { cmd ::= SHOW dbPrefix(X) VGROUPS ids(Y). {
SStrToken token; SStrToken token;
setDBName(&token, &X); setDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, &Y); setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, &Y);
} }
//drop configure for tables //drop configure for tables
cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). { cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). {
X.n += Z.n; X.n += Z.n;
setDropDBTableInfo(pInfo, TSDB_SQL_DROP_TABLE, &X, &Y); setDropDbTableInfo(pInfo, TSDB_SQL_DROP_TABLE, &X, &Y);
} }
cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDBTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y); } cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y); }
cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); } cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); }
cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); } cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); }
cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_ACCT, 1, &X); } cmd ::= DROP ACCOUNT ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_ACCT, 1, &X); }
@ -149,16 +149,16 @@ cmd ::= DESCRIBE ids(X) cpxName(Y). {
} }
/////////////////////////////////THE ALTER STATEMENT//////////////////////////////////////// /////////////////////////////////THE ALTER STATEMENT////////////////////////////////////////
cmd ::= ALTER USER ids(X) PASS ids(Y). { setAlterUserSQL(pInfo, TSDB_ALTER_USER_PASSWD, &X, &Y, NULL); } cmd ::= ALTER USER ids(X) PASS ids(Y). { setAlterUserSql(pInfo, TSDB_ALTER_USER_PASSWD, &X, &Y, NULL); }
cmd ::= ALTER USER ids(X) PRIVILEGE ids(Y). { setAlterUserSQL(pInfo, TSDB_ALTER_USER_PRIVILEGES, &X, NULL, &Y);} cmd ::= ALTER USER ids(X) PRIVILEGE ids(Y). { setAlterUserSql(pInfo, TSDB_ALTER_USER_PRIVILEGES, &X, NULL, &Y);}
cmd ::= ALTER DNODE ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 2, &X, &Y); } cmd ::= ALTER DNODE ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 2, &X, &Y); }
cmd ::= ALTER DNODE ids(X) ids(Y) ids(Z). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 3, &X, &Y, &Z); } cmd ::= ALTER DNODE ids(X) ids(Y) ids(Z). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 3, &X, &Y, &Z); }
cmd ::= ALTER LOCAL ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 1, &X); } cmd ::= ALTER LOCAL ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 1, &X); }
cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 2, &X, &Y); } cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 2, &X, &Y); }
cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SStrToken t = {0}; setCreateDBSQL(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);} cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SStrToken t = {0}; setCreateDBSQL(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSQL(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);} cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSQL(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);} cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
// An IDENTIFIER can be a generic identifier, or one of several keywords. // An IDENTIFIER can be a generic identifier, or one of several keywords.
// Any non-standard keyword can also be an identifier. // Any non-standard keyword can also be an identifier.
@ -179,9 +179,9 @@ ifnotexists(X) ::= . { X.n = 0;}
//create option for dnode/db/user/account //create option for dnode/db/user/account
cmd ::= CREATE DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);} cmd ::= CREATE DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);}
cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSQL(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);} { setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);} cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSQL(pInfo, &X, &Y);} cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);}
pps(Y) ::= . { Y.n = 0; } pps(Y) ::= . { Y.n = 0; }
pps(Y) ::= PPS INTEGER(X). { Y = X; } pps(Y) ::= PPS INTEGER(X). { Y = X; }
@ -240,6 +240,7 @@ fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
comp(Y) ::= COMP INTEGER(X). { Y = X; } comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; } prec(Y) ::= PRECISION STRING(X). { Y = X; }
update(Y) ::= UPDATE INTEGER(X). { Y = X; } update(Y) ::= UPDATE INTEGER(X). { Y = X; }
cachelast(Y) ::= CACHELAST INTEGER(X). { Y = X; }
%type db_optr {SCreateDBInfo} %type db_optr {SCreateDBInfo}
db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);} db_optr(Y) ::= . {setDefaultCreateDbOption(&Y);}
@ -258,6 +259,7 @@ db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strto
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; } db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; } db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); } db_optr(Y) ::= db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
%type alter_db_optr {SCreateDBInfo} %type alter_db_optr {SCreateDBInfo}
alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);} alter_db_optr(Y) ::= . { setDefaultCreateDbOption(&Y);}
@ -270,21 +272,22 @@ alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLeve
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); } alter_db_optr(Y) ::= alter_db_optr(Z) update(X). { Y = Z; Y.update = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) cachelast(X). { Y = Z; Y.cachelast = strtol(X.z, NULL, 10); }
%type typename {TAOS_FIELD} %type typename {TAOS_FIELD}
typename(A) ::= ids(X). { typename(A) ::= ids(X). {
X.type = 0; X.type = 0;
tSQLSetColumnType (&A, &X); tSqlSetColumnType (&A, &X);
} }
//define binary type, e.g., binary(10), nchar(10) //define binary type, e.g., binary(10), nchar(10)
typename(A) ::= ids(X) LP signed(Y) RP. { typename(A) ::= ids(X) LP signed(Y) RP. {
if (Y <= 0) { if (Y <= 0) {
X.type = 0; X.type = 0;
tSQLSetColumnType(&A, &X); tSqlSetColumnType(&A, &X);
} else { } else {
X.type = -Y; // negative value of name length X.type = -Y; // negative value of name length
tSQLSetColumnType(&A, &X); tSqlSetColumnType(&A, &X);
} }
} }
@ -315,8 +318,8 @@ create_table_list(A) ::= create_table_list(X) create_from_stable(Z). {
%type create_table_args{SCreateTableSQL*} %type create_table_args{SCreateTableSQL*}
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
A = tSetCreateSQLElems(X, NULL, NULL, TSQL_CREATE_TABLE); A = tSetCreateSqlElems(X, NULL, NULL, TSQL_CREATE_TABLE);
setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
@ -324,8 +327,8 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
// create super table // create super table
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateSQLElems(X, Y, NULL, TSQL_CREATE_STABLE); A = tSetCreateSqlElems(X, Y, NULL, TSQL_CREATE_STABLE);
setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
@ -343,8 +346,8 @@ create_from_stable(A) ::= ifnotexists(U) ids(V) cpxName(Z) USING ids(X) cpxName(
// create stream // create stream
// create table table_name as select count(*) from super_table_name interval(time) // create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
A = tSetCreateSQLElems(NULL, NULL, S, TSQL_CREATE_STREAM); A = tSetCreateSqlElems(NULL, NULL, S, TSQL_CREATE_STREAM);
setSQLInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE); setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n; V.n += Z.n;
setCreatedTableName(pInfo, &V, &U); setCreatedTableName(pInfo, &V, &U);
@ -359,7 +362,7 @@ columnlist(A) ::= column(X). {A = taosArrayInit(4, sizeof(T
// The information used for a column is the name and type of column: // The information used for a column is the name and type of column:
// tinyint smallint int bigint float double bool timestamp binary(x) nchar(x) // tinyint smallint int bigint float double bool timestamp binary(x) nchar(x)
column(A) ::= ids(X) typename(Y). { column(A) ::= ids(X) typename(Y). {
tSQLSetColumnInfo(&A, &X, &Y); tSqlSetColumnInfo(&A, &X, &Y);
} }
%type tagitemlist {SArray*} %type tagitemlist {SArray*}
@ -407,7 +410,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
%type select {SQuerySQL*} %type select {SQuerySQL*}
%destructor select {doDestroyQuerySql($$);} %destructor select {doDestroyQuerySql($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySQLElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G); A = tSetQuerySqlElems(&T, W, X, Y, P, Z, &K, &S, F, &L, &G);
} }
%type union {SSubclauseInfo*} %type union {SSubclauseInfo*}
@ -418,33 +421,33 @@ union(Y) ::= LP union(X) RP. { Y = X; }
union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); } union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); }
union(Y) ::= union(Z) UNION ALL LP select(X) RP. { Y = appendSelectClause(Z, X); } union(Y) ::= union(Z) UNION ALL LP select(X) RP. { Y = appendSelectClause(Z, X); }
cmd ::= union(X). { setSQLInfo(pInfo, X, NULL, TSDB_SQL_SELECT); } cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// Support for the SQL exprssion without from & where subclauses, e.g., // Support for the SQL exprssion without from & where subclauses, e.g.,
// select current_database(), // select current_database(),
// select server_version(), select client_version(), // select server_version(), select client_version(),
// select server_state(); // select server_state();
select(A) ::= SELECT(T) selcollist(W). { select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySQLElems(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); A = tSetQuerySqlElems(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
} }
// selcollist is a list of expressions that are to become the return // selcollist is a list of expressions that are to become the return
// values of the SELECT statement. The "*" in statements like // values of the SELECT statement. The "*" in statements like
// "SELECT * FROM ..." is encoded as a special expression with an opcode of TK_ALL. // "SELECT * FROM ..." is encoded as a special expression with an opcode of TK_ALL.
%type selcollist {tSQLExprList*} %type selcollist {tSQLExprList*}
%destructor selcollist {tSQLExprListDestroy($$);} %destructor selcollist {tSqlExprListDestroy($$);}
%type sclp {tSQLExprList*} %type sclp {tSQLExprList*}
%destructor sclp {tSQLExprListDestroy($$);} %destructor sclp {tSqlExprListDestroy($$);}
sclp(A) ::= selcollist(X) COMMA. {A = X;} sclp(A) ::= selcollist(X) COMMA. {A = X;}
sclp(A) ::= . {A = 0;} sclp(A) ::= . {A = 0;}
selcollist(A) ::= sclp(P) expr(X) as(Y). { selcollist(A) ::= sclp(P) expr(X) as(Y). {
A = tSQLExprListAppend(P, X, Y.n?&Y:0); A = tSqlExprListAppend(P, X, Y.n?&Y:0);
} }
selcollist(A) ::= sclp(P) STAR. { selcollist(A) ::= sclp(P) STAR. {
tSQLExpr *pNode = tSQLExprIdValueCreate(NULL, TK_ALL); tSQLExpr *pNode = tSqlExprIdValueCreate(NULL, TK_ALL);
A = tSQLExprListAppend(P, pNode, 0); A = tSqlExprListAppend(P, pNode, 0);
} }
// An option "AS <id>" phrase that can follow one of the expressions that // An option "AS <id>" phrase that can follow one of the expressions that
@ -573,7 +576,7 @@ grouplist(A) ::= item(X). {
//having clause, ignore the input condition in having //having clause, ignore the input condition in having
%type having_opt {tSQLExpr*} %type having_opt {tSQLExpr*}
%destructor having_opt {tSQLExprDestroy($$);} %destructor having_opt {tSqlExprDestroy($$);}
having_opt(A) ::=. {A = 0;} having_opt(A) ::=. {A = 0;}
having_opt(A) ::= HAVING expr(X). {A = X;} having_opt(A) ::= HAVING expr(X). {A = X;}
@ -595,7 +598,7 @@ slimit_opt(A) ::= SLIMIT signed(X) COMMA signed(Y).
{A.limit = Y; A.offset = X;} {A.limit = Y; A.offset = X;}
%type where_opt {tSQLExpr*} %type where_opt {tSQLExpr*}
%destructor where_opt {tSQLExprDestroy($$);} %destructor where_opt {tSqlExprDestroy($$);}
where_opt(A) ::= . {A = 0;} where_opt(A) ::= . {A = 0;}
where_opt(A) ::= WHERE expr(X). {A = X;} where_opt(A) ::= WHERE expr(X). {A = X;}
@ -603,67 +606,67 @@ where_opt(A) ::= WHERE expr(X). {A = X;}
/////////////////////////// Expression Processing ///////////////////////////// /////////////////////////// Expression Processing /////////////////////////////
// //
%type expr {tSQLExpr*} %type expr {tSQLExpr*}
%destructor expr {tSQLExprDestroy($$);} %destructor expr {tSqlExprDestroy($$);}
expr(A) ::= LP(X) expr(Y) RP(Z). {A = Y; A->token.z = X.z; A->token.n = (Z.z - X.z + 1);} expr(A) ::= LP(X) expr(Y) RP(Z). {A = Y; A->token.z = X.z; A->token.n = (Z.z - X.z + 1);}
expr(A) ::= ID(X). { A = tSQLExprIdValueCreate(&X, TK_ID);} expr(A) ::= ID(X). { A = tSqlExprIdValueCreate(&X, TK_ID);}
expr(A) ::= ID(X) DOT ID(Y). { X.n += (1+Y.n); A = tSQLExprIdValueCreate(&X, TK_ID);} expr(A) ::= ID(X) DOT ID(Y). { X.n += (1+Y.n); A = tSqlExprIdValueCreate(&X, TK_ID);}
expr(A) ::= ID(X) DOT STAR(Y). { X.n += (1+Y.n); A = tSQLExprIdValueCreate(&X, TK_ALL);} expr(A) ::= ID(X) DOT STAR(Y). { X.n += (1+Y.n); A = tSqlExprIdValueCreate(&X, TK_ALL);}
expr(A) ::= INTEGER(X). { A = tSQLExprIdValueCreate(&X, TK_INTEGER);} expr(A) ::= INTEGER(X). { A = tSqlExprIdValueCreate(&X, TK_INTEGER);}
expr(A) ::= MINUS(X) INTEGER(Y). { X.n += Y.n; X.type = TK_INTEGER; A = tSQLExprIdValueCreate(&X, TK_INTEGER);} expr(A) ::= MINUS(X) INTEGER(Y). { X.n += Y.n; X.type = TK_INTEGER; A = tSqlExprIdValueCreate(&X, TK_INTEGER);}
expr(A) ::= PLUS(X) INTEGER(Y). { X.n += Y.n; X.type = TK_INTEGER; A = tSQLExprIdValueCreate(&X, TK_INTEGER);} expr(A) ::= PLUS(X) INTEGER(Y). { X.n += Y.n; X.type = TK_INTEGER; A = tSqlExprIdValueCreate(&X, TK_INTEGER);}
expr(A) ::= FLOAT(X). { A = tSQLExprIdValueCreate(&X, TK_FLOAT);} expr(A) ::= FLOAT(X). { A = tSqlExprIdValueCreate(&X, TK_FLOAT);}
expr(A) ::= MINUS(X) FLOAT(Y). { X.n += Y.n; X.type = TK_FLOAT; A = tSQLExprIdValueCreate(&X, TK_FLOAT);} expr(A) ::= MINUS(X) FLOAT(Y). { X.n += Y.n; X.type = TK_FLOAT; A = tSqlExprIdValueCreate(&X, TK_FLOAT);}
expr(A) ::= PLUS(X) FLOAT(Y). { X.n += Y.n; X.type = TK_FLOAT; A = tSQLExprIdValueCreate(&X, TK_FLOAT);} expr(A) ::= PLUS(X) FLOAT(Y). { X.n += Y.n; X.type = TK_FLOAT; A = tSqlExprIdValueCreate(&X, TK_FLOAT);}
expr(A) ::= STRING(X). { A = tSQLExprIdValueCreate(&X, TK_STRING);} expr(A) ::= STRING(X). { A = tSqlExprIdValueCreate(&X, TK_STRING);}
expr(A) ::= NOW(X). { A = tSQLExprIdValueCreate(&X, TK_NOW); } expr(A) ::= NOW(X). { A = tSqlExprIdValueCreate(&X, TK_NOW); }
expr(A) ::= VARIABLE(X). { A = tSQLExprIdValueCreate(&X, TK_VARIABLE);} expr(A) ::= VARIABLE(X). { A = tSqlExprIdValueCreate(&X, TK_VARIABLE);}
expr(A) ::= BOOL(X). { A = tSQLExprIdValueCreate(&X, TK_BOOL);} expr(A) ::= BOOL(X). { A = tSqlExprIdValueCreate(&X, TK_BOOL);}
// ordinary functions: min(x), max(x), top(k, 20) // ordinary functions: min(x), max(x), top(k, 20)
expr(A) ::= ID(X) LP exprlist(Y) RP(E). { A = tSQLExprCreateFunction(Y, &X, &E, X.type); } expr(A) ::= ID(X) LP exprlist(Y) RP(E). { A = tSqlExprCreateFunction(Y, &X, &E, X.type); }
// for parsing sql functions with wildcard for parameters. e.g., count(*)/first(*)/last(*) operation // for parsing sql functions with wildcard for parameters. e.g., count(*)/first(*)/last(*) operation
expr(A) ::= ID(X) LP STAR RP(Y). { A = tSQLExprCreateFunction(NULL, &X, &Y, X.type); } expr(A) ::= ID(X) LP STAR RP(Y). { A = tSqlExprCreateFunction(NULL, &X, &Y, X.type); }
// is (not) null expression // is (not) null expression
expr(A) ::= expr(X) IS NULL. {A = tSQLExprCreate(X, NULL, TK_ISNULL);} expr(A) ::= expr(X) IS NULL. {A = tSqlExprCreate(X, NULL, TK_ISNULL);}
expr(A) ::= expr(X) IS NOT NULL. {A = tSQLExprCreate(X, NULL, TK_NOTNULL);} expr(A) ::= expr(X) IS NOT NULL. {A = tSqlExprCreate(X, NULL, TK_NOTNULL);}
// relational expression // relational expression
expr(A) ::= expr(X) LT expr(Y). {A = tSQLExprCreate(X, Y, TK_LT);} expr(A) ::= expr(X) LT expr(Y). {A = tSqlExprCreate(X, Y, TK_LT);}
expr(A) ::= expr(X) GT expr(Y). {A = tSQLExprCreate(X, Y, TK_GT);} expr(A) ::= expr(X) GT expr(Y). {A = tSqlExprCreate(X, Y, TK_GT);}
expr(A) ::= expr(X) LE expr(Y). {A = tSQLExprCreate(X, Y, TK_LE);} expr(A) ::= expr(X) LE expr(Y). {A = tSqlExprCreate(X, Y, TK_LE);}
expr(A) ::= expr(X) GE expr(Y). {A = tSQLExprCreate(X, Y, TK_GE);} expr(A) ::= expr(X) GE expr(Y). {A = tSqlExprCreate(X, Y, TK_GE);}
expr(A) ::= expr(X) NE expr(Y). {A = tSQLExprCreate(X, Y, TK_NE);} expr(A) ::= expr(X) NE expr(Y). {A = tSqlExprCreate(X, Y, TK_NE);}
expr(A) ::= expr(X) EQ expr(Y). {A = tSQLExprCreate(X, Y, TK_EQ);} expr(A) ::= expr(X) EQ expr(Y). {A = tSqlExprCreate(X, Y, TK_EQ);}
expr(A) ::= expr(X) AND expr(Y). {A = tSQLExprCreate(X, Y, TK_AND);} expr(A) ::= expr(X) AND expr(Y). {A = tSqlExprCreate(X, Y, TK_AND);}
expr(A) ::= expr(X) OR expr(Y). {A = tSQLExprCreate(X, Y, TK_OR); } expr(A) ::= expr(X) OR expr(Y). {A = tSqlExprCreate(X, Y, TK_OR); }
// binary arithmetic expression // binary arithmetic expression
expr(A) ::= expr(X) PLUS expr(Y). {A = tSQLExprCreate(X, Y, TK_PLUS); } expr(A) ::= expr(X) PLUS expr(Y). {A = tSqlExprCreate(X, Y, TK_PLUS); }
expr(A) ::= expr(X) MINUS expr(Y). {A = tSQLExprCreate(X, Y, TK_MINUS); } expr(A) ::= expr(X) MINUS expr(Y). {A = tSqlExprCreate(X, Y, TK_MINUS); }
expr(A) ::= expr(X) STAR expr(Y). {A = tSQLExprCreate(X, Y, TK_STAR); } expr(A) ::= expr(X) STAR expr(Y). {A = tSqlExprCreate(X, Y, TK_STAR); }
expr(A) ::= expr(X) SLASH expr(Y). {A = tSQLExprCreate(X, Y, TK_DIVIDE);} expr(A) ::= expr(X) SLASH expr(Y). {A = tSqlExprCreate(X, Y, TK_DIVIDE);}
expr(A) ::= expr(X) REM expr(Y). {A = tSQLExprCreate(X, Y, TK_REM); } expr(A) ::= expr(X) REM expr(Y). {A = tSqlExprCreate(X, Y, TK_REM); }
// like expression // like expression
expr(A) ::= expr(X) LIKE expr(Y). {A = tSQLExprCreate(X, Y, TK_LIKE); } expr(A) ::= expr(X) LIKE expr(Y). {A = tSqlExprCreate(X, Y, TK_LIKE); }
//in expression //in expression
expr(A) ::= expr(X) IN LP exprlist(Y) RP. {A = tSQLExprCreate(X, (tSQLExpr*)Y, TK_IN); } expr(A) ::= expr(X) IN LP exprlist(Y) RP. {A = tSqlExprCreate(X, (tSQLExpr*)Y, TK_IN); }
%type exprlist {tSQLExprList*} %type exprlist {tSQLExprList*}
%destructor exprlist {tSQLExprListDestroy($$);} %destructor exprlist {tSqlExprListDestroy($$);}
%type expritem {tSQLExpr*} %type expritem {tSQLExpr*}
%destructor expritem {tSQLExprDestroy($$);} %destructor expritem {tSqlExprDestroy($$);}
exprlist(A) ::= exprlist(X) COMMA expritem(Y). {A = tSQLExprListAppend(X,Y,0);} exprlist(A) ::= exprlist(X) COMMA expritem(Y). {A = tSqlExprListAppend(X,Y,0);}
exprlist(A) ::= expritem(X). {A = tSQLExprListAppend(0,X,0);} exprlist(A) ::= expritem(X). {A = tSqlExprListAppend(0,X,0);}
expritem(A) ::= expr(X). {A = X;} expritem(A) ::= expr(X). {A = X;}
expritem(A) ::= . {A = 0;} expritem(A) ::= . {A = 0;}
@ -673,8 +676,8 @@ cmd ::= RESET QUERY CACHE. { setDCLSQLElems(pInfo, TSDB_SQL_RESET_CACHE, 0);}
///////////////////////////////////ALTER TABLE statement////////////////////////////////// ///////////////////////////////////ALTER TABLE statement//////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). { cmd ::= ALTER TABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). {
X.n += F.n; X.n += F.n;
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_COLUMN);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). { cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
@ -683,15 +686,15 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
toTSDBType(A.type); toTSDBType(A.type);
SArray* K = tVariantListAppendToken(NULL, &A, -1); SArray* K = tVariantListAppendToken(NULL, &A, -1);
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, K, TSDB_ALTER_TABLE_DROP_COLUMN);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
//////////////////////////////////ALTER TAGS statement///////////////////////////////////// //////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). { cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n; X.n += Y.n;
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, A, NULL, TSDB_ALTER_TABLE_ADD_TAG_COLUMN);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). { cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
X.n += Z.n; X.n += Z.n;
@ -699,8 +702,8 @@ cmd ::= ALTER TABLE ids(X) cpxName(Z) DROP TAG ids(Y). {
toTSDBType(Y.type); toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1); SArray* A = tVariantListAppendToken(NULL, &Y, -1);
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_DROP_TAG_COLUMN);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). { cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
@ -712,8 +715,8 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type); toTSDBType(Z.type);
A = tVariantListAppendToken(A, &Z, -1); A = tVariantListAppendToken(A, &Z, -1);
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). { cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
@ -723,14 +726,14 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
SArray* A = tVariantListAppendToken(NULL, &Y, -1); SArray* A = tVariantListAppendToken(NULL, &Y, -1);
A = tVariantListAppend(A, &Z, -1); A = tVariantListAppend(A, &Z, -1);
SAlterTableSQL* pAlterTable = tAlterTableSQLElems(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL); SAlterTableSQL* pAlterTable = tAlterTableSqlElems(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
setSQLInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE); setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
} }
////////////////////////////////////////kill statement/////////////////////////////////////// ////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION INTEGER(Y). {setKillSQL(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);} cmd ::= KILL CONNECTION INTEGER(Y). {setKillSql(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);}
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_STREAM, &X);} cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_STREAM, &X);}
cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSQL(pInfo, TSDB_SQL_KILL_QUERY, &X);} cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_QUERY, &X);}
%fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED %fallback ID ABORT AFTER ASC ATTACH BEFORE BEGIN CASCADE CLUSTER CONFLICT COPY DATABASE DEFERRED
DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD DELIMITERS DESC DETACH EACH END EXPLAIN FAIL FOR GLOB IGNORE IMMEDIATE INITIALLY INSTEAD

View File

@ -4662,7 +4662,6 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
// update the query time window // update the query time window
pQuery->window = cond.twindow; pQuery->window = cond.twindow;
if (pQInfo->tableGroupInfo.numOfTables == 0) { if (pQInfo->tableGroupInfo.numOfTables == 0) {
pQInfo->tableqinfoGroupInfo.numOfTables = 0; pQInfo->tableqinfoGroupInfo.numOfTables = 0;
} else { } else {

View File

@ -846,5 +846,6 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo->keep = NULL; pDBInfo->keep = NULL;
pDBInfo->update = -1; pDBInfo->update = -1;
pDBInfo->cachelast = 0;
memset(&pDBInfo->precision, 0, sizeof(SStrToken)); memset(&pDBInfo->precision, 0, sizeof(SStrToken));
} }

View File

@ -238,6 +238,7 @@ static SKeyword keywordTable[] = {
{"SUM_IRATE", TK_SUM_IRATE}, {"SUM_IRATE", TK_SUM_IRATE},
{"AVG_RATE", TK_AVG_RATE}, {"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE}, {"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST},
}; };
static const char isIdChar[] = { static const char isIdChar[] = {

File diff suppressed because it is too large Load Diff

View File

@ -66,7 +66,8 @@ typedef struct STable {
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void* eventHandler; // TODO void* eventHandler; // TODO
void* streamHandler; // TODO void* streamHandler; // TODO
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure TSKEY lastKey;
SDataRow lastRow;
char* sql; char* sql;
void* cqhandle; void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions SRWLatch latch; // TODO: implementa latch functions
@ -360,8 +361,11 @@ typedef struct {
#define TABLE_UID(t) (t)->tableId.uid #define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid #define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid #define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore) #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta); void tsdbFreeMeta(STsdbMeta* pMeta);
@ -391,7 +395,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (lock) taosRLockLatch(&(pDTable->latch)); if (lock) TSDB_RLOCK_TABLE(pDTable);
if (version < 0) { // get the latest version of schema if (version < 0) { // get the latest version of schema
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
} else { // get the schema with version } else { // get the schema with version
@ -413,7 +417,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
} }
_exit: _exit:
if (lock) taosRUnLockLatch(&(pDTable->latch)); if (lock) TSDB_RUNLOCK_TABLE(pDTable);
return pSchema; return pSchema;
} }
@ -433,6 +437,11 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
} }
} }
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow));
return pTable->lastKey;
}
// ------------------ tsdbBuffer.c // ------------------ tsdbBuffer.c
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold

View File

@ -225,7 +225,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SCommitIter *pIter = iters + tid; SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch)); TSDB_RLOCK_TABLE(pIter->pTable);
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
@ -236,7 +236,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
} }
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch)); TSDB_RUNLOCK_TABLE(pIter->pTable);
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno)); tstrerror(terrno));
@ -244,7 +244,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
} }
} }
taosRUnLockLatch(&(pIter->pTable->latch)); TSDB_RUNLOCK_TABLE(pIter->pTable);
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {

View File

@ -77,9 +77,9 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
tsdbDebug( tsdbDebug(
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep " "vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d", "%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d update %d cacheLastRow %d",
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression); pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression, pCfg->update, pCfg->cacheLastRow);
return 0; return 0;
} }
@ -281,6 +281,10 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
config.totalBlocks = pCfg->totalBlocks; config.totalBlocks = pCfg->totalBlocks;
configChanged = true; configChanged = true;
} }
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
config.cacheLastRow = pCfg->cacheLastRow;
configChanged = true;
}
if (configChanged) { if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) { if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
@ -475,6 +479,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// update check // update check
if (pCfg->update != 0) pCfg->update = 1; if (pCfg->update != 0) pCfg->update = 1;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
return 0; return 0;
_err: _err:
@ -692,10 +699,12 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
} }
} }
static int tsdbRestoreInfo(STsdbRepo *pRepo) { static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pFGroup = NULL; SFileGroup *pFGroup = NULL;
STsdbCfg * pCfg = &(pRepo->config);
SCompBlock *pBlock = NULL;
SFileGroupIter iter; SFileGroupIter iter;
SRWHelper rhelper = {0}; SRWHelper rhelper = {0};
@ -713,7 +722,32 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err; if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
SCompIdx *pIdx = &(rhelper.curCompIdx); SCompIdx *pIdx = &(rhelper.curCompIdx);
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey; TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
if (pIdx->offset > 0 && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey;
if (pCfg->cacheLastRow) { // load the block of data
if (tsdbLoadCompInfo(&rhelper, NULL) < 0) goto _err;
pBlock = rhelper.pCompInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(&rhelper, pBlock, NULL) < 0) goto _err;
// construct the data row
ASSERT(pTable->lastRow == NULL);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(schemaTLen(pSchema));
if (pTable->lastRow == NULL) {
goto _err;
}
tdInitDataRow(pTable->lastRow, pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = rhelper.pDataCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
}
}
}
} }
} }
@ -800,6 +834,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen += taosEncodeFixedI8(buf, pCfg->precision); tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression); tlen += taosEncodeFixedI8(buf, pCfg->compression);
tlen += taosEncodeFixedI8(buf, pCfg->update); tlen += taosEncodeFixedI8(buf, pCfg->update);
tlen += taosEncodeFixedI8(buf, pCfg->cacheLastRow);
return tlen; return tlen;
} }
@ -817,6 +852,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeFixedI8(buf, &(pCfg->precision)); buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression)); buf = taosDecodeFixedI8(buf, &(pCfg->compression));
buf = taosDecodeFixedI8(buf, &(pCfg->update)); buf = taosDecodeFixedI8(buf, &(pCfg->update));
buf = taosDecodeFixedI8(buf, &(pCfg->cacheLastRow));
return buf; return buf;
} }

View File

@ -36,6 +36,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPB
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
@ -663,9 +664,10 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void
return -1; return -1;
} }
if (key > TABLE_LASTKEY(pTable)) { TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
if (key > lastKey) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); REPO_ID(pRepo), key, lastKey);
return 0; return 0;
} }
} }
@ -846,8 +848,10 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]); if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
pTableData->numOfRows += dsize; pTableData->numOfRows += dsize;
// TODO: impl delete row thing // update table latest info
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]); if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) {
return -1;
}
return 0; return 0;
} }
@ -889,4 +893,38 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
} }
} }
} }
}
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
STsdbCfg *pCfg = &pRepo->config;
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) {
if (pCfg->cacheLastRow || pTable->lastRow != NULL) {
SDataRow nrow = pTable->lastRow;
if (taosTSizeof(nrow) < dataRowLen(row)) {
SDataRow orow = nrow;
nrow = taosTMalloc(dataRowLen(row));
if (nrow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
dataRowCpy(nrow, row);
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row);
pTable->lastRow = nrow;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(orow);
} else {
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row);
dataRowCpy(nrow, row);
TSDB_WUNLOCK_TABLE(pTable);
}
} else {
pTable->lastKey = dataRowKey(row);
}
}
return 0;
} }

View File

@ -377,11 +377,11 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
// Chage in memory // Chage in memory
if (pNewSchema != NULL) { // change super table tag schema if (pNewSchema != NULL) { // change super table tag schema
taosWLockLatch(&(pTable->pSuper->latch)); TSDB_WLOCK_TABLE(pTable->pSuper);
STSchema *pOldSchema = pTable->pSuper->tagSchema; STSchema *pOldSchema = pTable->pSuper->tagSchema;
pTable->pSuper->tagSchema = pNewSchema; pTable->pSuper->tagSchema = pNewSchema;
tdFreeSchema(pOldSchema); tdFreeSchema(pOldSchema);
taosWUnLockLatch(&(pTable->pSuper->latch)); TSDB_WUNLOCK_TABLE(pTable->pSuper);
} }
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0))); bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
@ -392,9 +392,9 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
tsdbWLockRepoMeta(pRepo); tsdbWLockRepoMeta(pRepo);
tsdbRemoveTableFromIndex(pMeta, pTable); tsdbRemoveTableFromIndex(pMeta, pTable);
} }
taosWLockLatch(&(pTable->latch)); TSDB_WLOCK_TABLE(pTable);
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen)); tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
taosWUnLockLatch(&(pTable->latch)); TSDB_WUNLOCK_TABLE(pTable);
if (isChangeIndexCol) { if (isChangeIndexCol) {
tsdbAddTableIntoIndex(pMeta, pTable, false); tsdbAddTableIntoIndex(pMeta, pTable, false);
tsdbUnlockRepoMeta(pRepo); tsdbUnlockRepoMeta(pRepo);
@ -587,7 +587,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1])); ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
taosWLockLatch(&(pCTable->latch)); TSDB_WLOCK_TABLE(pCTable);
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pCTable->schema[pCTable->numOfSchemas++] = pSchema; pCTable->schema[pCTable->numOfSchemas++] = pSchema;
} else { } else {
@ -599,7 +599,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
taosWUnLockLatch(&(pCTable->latch)); TSDB_WUNLOCK_TABLE(pCTable);
if (insertAct) { if (insertAct) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable); int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
@ -775,6 +775,7 @@ static void tsdbFreeTable(STable *pTable) {
kvRowFree(pTable->tagVal); kvRowFree(pTable->tagVal);
tSkipListDestroy(pTable->pIndex); tSkipListDestroy(pTable->pIndex);
taosTZfree(pTable->lastRow);
tfree(pTable->sql); tfree(pTable->sql);
free(pTable); free(pTable);
} }

View File

@ -110,6 +110,7 @@ typedef struct STsdbQueryHandle {
SArray* pTableCheckInfo; // SArray<STableCheckInfo> SArray* pTableCheckInfo; // SArray<STableCheckInfo>
int32_t activeIndex; int32_t activeIndex;
bool checkFiles; // check file stage bool checkFiles; // check file stage
bool cachelastrow; // check if last row cached
void* qinfo; // query info handle, for debug purpose void* qinfo; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SFileGroup* pFileGroup; SFileGroup* pFileGroup;
@ -133,7 +134,9 @@ typedef struct STableGroupSupporter {
STSchema* pTagSchema; STSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
@ -370,7 +373,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
} }
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = changeTableGroupByLastrow(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table // no qualified table
if (groupList->numOfTables == 0) { if (groupList->numOfTables == 0) {
@ -378,8 +381,14 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
} }
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code;
return NULL;
}
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
return pQueryHandle; return pQueryHandle;
} }
@ -2144,6 +2153,39 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
// restore the pMemRef // restore the pMemRef
pQueryHandle->pMemRef = pMemRef; pQueryHandle->pMemRef = pMemRef;
return ret; return ret;
} else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (++pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
return true;
}
return false;
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
@ -2176,7 +2218,57 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return ret; return ret;
} }
STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { /*
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
* 2. has data but not loaded, just return lastKey but not set pRes
* 3. has data and loaded, return lastKey and set pRes
*/
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
TSDB_RLOCK_TABLE(pTable);
*lastKey = pTable->lastKey;
if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) {
*pRes = tdDataRowDup(pTable->lastRow);
if (*pRes == NULL) {
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
}
TSDB_RUNLOCK_TABLE(pTable);
return TSDB_CODE_SUCCESS;
}
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
assert(pQueryHandle != NULL && groupList != NULL);
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
SArray* group = taosArrayGetP(groupList->pGroupList, 0);
assert(group != NULL);
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(group, 0);
int32_t code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = false;
} else {
pQueryHandle->cachelastrow = (pRow != NULL);
}
// update the tsdb query time range
if (pQueryHandle->cachelastrow) {
pQueryHandle->window = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = false;
pQueryHandle->activeIndex = -1; // start from -1
}
tfree(pRow);
return code;
}
STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN}; STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0; int32_t totalNumOfTable = 0;
@ -2191,16 +2283,16 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
size_t numOfTables = taosArrayGetSize(pGroup); size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); STableKeyInfo* pInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
// if the lastKey equals to INT64_MIN, there is no data in this table // if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = ((STable*)(pKeyInfo->pTable))->lastKey; TSKEY lastKey = ((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) { if (key < lastKey) {
key = lastKey; key = lastKey;
keyInfo.pTable = pKeyInfo->pTable; keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key; keyInfo.lastKey = key;
pKeyInfo->lastKey = key; pInfo->lastKey = key;
if (key < window.skey) { if (key < window.skey) {
window.skey = key; window.skey = key;
@ -2214,11 +2306,11 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
// clear current group, unref unused table // clear current group, unref unused table
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
// keyInfo.pTable may be NULL here. // keyInfo.pTable may be NULL here.
if (pKeyInfo->pTable != keyInfo.pTable) { if (pInfo->pTable != keyInfo.pTable) {
tsdbUnRefTable(pKeyInfo->pTable); tsdbUnRefTable(pInfo->pTable);
} }
} }

View File

@ -34,6 +34,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock; pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision; pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression; pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow;
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel; pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod; pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP; pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
@ -216,6 +217,15 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint; vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
//goto PARSE_VCFG_ERROR;
vnodeMsg.cfg.cacheLastRow = 0;
} else {
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
}
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) { if (!nodeInfos || nodeInfos->type != cJSON_Array) {
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
@ -304,6 +314,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pMsg->cfg.replications; i++) { for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
SVnodeDesc *node = &pMsg->nodes[i]; SVnodeDesc *node = &pMsg->nodes[i];

View File

@ -86,6 +86,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression; tsdbCfg.compression = pVnodeCfg->cfg.compression;
tsdbCfg.update = pVnodeCfg->cfg.update; tsdbCfg.update = pVnodeCfg->cfg.update;
tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);

View File

@ -19,7 +19,8 @@ python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f query/isNullTest.py #python3 ./test.py -f insert/before_1970.py
python3 bug2265.py
#table #table
python3 ./test.py -f table/alter_wal0.py python3 ./test.py -f table/alter_wal0.py
@ -33,7 +34,7 @@ python3 ./test.py -f table/alter_column.py
python3 ./test.py -f table/boundary.py python3 ./test.py -f table/boundary.py
python3 ./test.py -f table/create.py python3 ./test.py -f table/create.py
python3 ./test.py -f table/del_stable.py python3 ./test.py -f table/del_stable.py
python3 ./test.py -f table/queryWithTaosdKilled.py
# tag # tag
python3 ./test.py -f tag_lite/filter.py python3 ./test.py -f tag_lite/filter.py
@ -163,10 +164,16 @@ python3 ./test.py -f query/bug1471.py
python3 ./test.py -f query/bug1874.py python3 ./test.py -f query/bug1874.py
python3 ./test.py -f query/bug1875.py python3 ./test.py -f query/bug1875.py
python3 ./test.py -f query/bug1876.py python3 ./test.py -f query/bug1876.py
python3 ./test.py -f query/bug2218.py python3 ./test.py -f query/bug2218.py
python3 ./test.py -f query/bug2117.py
python3 ./test.py -f query/bug2118.py
python3 ./test.py -f query/bug2143.py
python3 ./test.py -f query/sliding.py
python3 ./test.py -f query/unionAllTest.py
python3 ./test.py -f query/bug2281.py python3 ./test.py -f query/bug2281.py
python3 ./test.py -f query/bug2119.py python3 ./test.py -f query/bug2119.py
python3 bug2265.py python3 ./test.py -f query/isNullTest.py
python3 ./test.py -f query/queryWithTaosdKilled.py
python3 ./test.py -f query/floatCompare.py python3 ./test.py -f query/floatCompare.py
#stream #stream
@ -184,6 +191,7 @@ python3 ./test.py -f alter/alter_table_crash.py
python3 ./test.py -f client/client.py python3 ./test.py -f client/client.py
python3 ./test.py -f client/version.py python3 ./test.py -f client/version.py
python3 ./test.py -f client/alterDatabase.py python3 ./test.py -f client/alterDatabase.py
python3 ./test.py -f client/noConnectionErrorTest.py
# Misc # Misc
python3 testCompress.py python3 testCompress.py
@ -207,7 +215,7 @@ python3 ./test.py -f functions/function_spread.py -r 1
python3 ./test.py -f functions/function_stddev.py -r 1 python3 ./test.py -f functions/function_stddev.py -r 1
python3 ./test.py -f functions/function_sum.py -r 1 python3 ./test.py -f functions/function_sum.py -r 1
python3 ./test.py -f functions/function_top.py -r 1 python3 ./test.py -f functions/function_top.py -r 1
#python3 ./test.py -f functions/function_twa.py -r 1 python3 ./test.py -f functions/function_twa.py -r 1
python3 ./test.py -f functions/function_twa_test2.py python3 ./test.py -f functions/function_twa_test2.py
python3 queryCount.py python3 queryCount.py
python3 ./test.py -f query/queryGroupbyWithInterval.py python3 ./test.py -f query/queryGroupbyWithInterval.py
@ -219,10 +227,10 @@ python3 test.py -f query/queryFillTest.py
python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdumpTest.py python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/lowaTest.py python3 test.py -f tools/lowaTest.py
python3 test.py -f tools/taosdemoTest2.py
# subscribe # subscribe
python3 test.py -f subscribe/singlemeter.py python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py #python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py python3 test.py -f subscribe/supertable.py

View File

@ -12,6 +12,7 @@ python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_data2_update0.py python3 ./test.py -f update/merge_commit_data2_update0.py
python3 ./test.py -f update/merge_commit_last-0.py python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py python3 ./test.py -f update/merge_commit_last.py
python3 ./test.py -f update/bug_td2279.py
# wal # wal
python3 ./test.py -f wal/addOldWalTest.py python3 ./test.py -f wal/addOldWalTest.py

View File

@ -81,7 +81,7 @@ print =============== step2 - no db
#11 #11
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:7111/rest/sql system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'show databases' 127.0.0.1:7111/rest/sql
print 11-> $system_content print 11-> $system_content
if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","precision","update","status"],"data":[],"rows":0}@ then if $system_content != @{"status":"succ","head":["name","created_time","ntables","vgroups","replica","quorum","days","keep1,keep2,keep(D)","cache(MB)","blocks","minrows","maxrows","wallevel","fsync","comp","cachelast","precision","update","status"],"data":[],"rows":0}@ then
return -1 return -1
endi endi

View File

@ -202,10 +202,10 @@ if [ "$2" != "sim" ]; then
runPyCaseOneByOnefq fulltest.sh runPyCaseOneByOnefq fulltest.sh
elif [ "$1" == "p1" ]; then elif [ "$1" == "p1" ]; then
echo "### run Python_1 test ###" echo "### run Python_1 test ###"
runPyCaseOneByOne pytest_1.sh runPyCaseOneByOnefq pytest_1.sh
elif [ "$1" == "p2" ]; then elif [ "$1" == "p2" ]; then
echo "### run Python_2 test ###" echo "### run Python_2 test ###"
runPyCaseOneByOne pytest_2.sh runPyCaseOneByOnefq pytest_2.sh
elif [ "$1" == "b2" ] || [ "$1" == "b3" ]; then elif [ "$1" == "b2" ] || [ "$1" == "b3" ]; then
exit $(($totalFailed + $totalPyFailed)) exit $(($totalFailed + $totalPyFailed))
elif [ "$1" == "smoke" ] || [ -z "$1" ]; then elif [ "$1" == "smoke" ] || [ -z "$1" ]; then