Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
7bc2f86137
|
@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
|
|||
pMsg->commitTime = htonl(pCreateDb->commitTime);
|
||||
pMsg->minRowsPerFileBlock = htonl(pCreateDb->minRowsPerBlock);
|
||||
pMsg->maxRowsPerFileBlock = htonl(pCreateDb->maxRowsPerBlock);
|
||||
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
|
||||
pMsg->compression = pCreateDb->compressionLevel;
|
||||
pMsg->walLevel = (char)pCreateDb->walLevel;
|
||||
pMsg->replications = pCreateDb->replica;
|
||||
|
@ -5529,6 +5530,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
|
|||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
val = htonl(pCreate->fsyncPeriod);
|
||||
if (val != -1 && (val < TSDB_MIN_FSYNC_PERIOD || val > TSDB_MAX_FSYNC_PERIOD)) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option fsyncPeriod: %d valid range: [%d, %d]", val,
|
||||
TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
|
||||
}
|
||||
|
||||
if (pCreate->compression != -1 &&
|
||||
(pCreate->compression < TSDB_MIN_COMP_LEVEL || pCreate->compression > TSDB_MAX_COMP_LEVEL)) {
|
||||
snprintf(msg, tListLen(msg), "invalid db option compression: %d valid range: [%d, %d]", pCreate->compression,
|
||||
|
|
|
@ -80,6 +80,7 @@ extern int16_t tsCommitTime; // seconds
|
|||
extern int32_t tsTimePrecision;
|
||||
extern int16_t tsCompression;
|
||||
extern int16_t tsWAL;
|
||||
extern int32_t tsFsyncPeriod;
|
||||
extern int32_t tsReplications;
|
||||
|
||||
// balance
|
||||
|
|
|
@ -384,9 +384,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
}
|
||||
|
||||
void tdResetDataCols(SDataCols *pCols) {
|
||||
pCols->numOfRows = 0;
|
||||
for (int i = 0; i < pCols->maxCols; i++) {
|
||||
dataColReset(pCols->cols + i);
|
||||
if (pCols != NULL) {
|
||||
pCols->numOfRows = 0;
|
||||
for (int i = 0; i < pCols->maxCols; i++) {
|
||||
dataColReset(pCols->cols + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
|
|||
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
|
||||
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
|
||||
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
|
||||
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
|
||||
int32_t tsMaxVgroupsPerDb = 0;
|
||||
int32_t tsMinTablePerVnode = 100;
|
||||
|
@ -715,6 +716,16 @@ static void doInitGlobalConfig() {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "fsync";
|
||||
cfg.ptr = &tsFsyncPeriod;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||
cfg.minValue = TSDB_MIN_FSYNC_PERIOD;
|
||||
cfg.maxValue = TSDB_MAX_FSYNC_PERIOD;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "replica";
|
||||
cfg.ptr = &tsReplications;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
|
|
|
@ -401,6 +401,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
|||
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
|
||||
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
|
||||
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
||||
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
|
||||
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||
|
||||
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||
|
|
|
@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_MAX_WAL_LEVEL 2
|
||||
#define TSDB_DEFAULT_WAL_LEVEL 1
|
||||
|
||||
#define TSDB_MIN_FSYNC_PERIOD 0
|
||||
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
|
||||
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
|
||||
|
||||
#define TSDB_MIN_DB_REPLICA_OPTION 1
|
||||
#define TSDB_MAX_DB_REPLICA_OPTION 3
|
||||
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
|
||||
|
|
|
@ -515,6 +515,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
uint8_t precision; // time resolution
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
@ -608,6 +609,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
|
|
@ -110,117 +110,117 @@
|
|||
#define TK_BLOCKS 92
|
||||
#define TK_CTIME 93
|
||||
#define TK_WAL 94
|
||||
#define TK_COMP 95
|
||||
#define TK_PRECISION 96
|
||||
#define TK_LP 97
|
||||
#define TK_RP 98
|
||||
#define TK_TAGS 99
|
||||
#define TK_USING 100
|
||||
#define TK_AS 101
|
||||
#define TK_COMMA 102
|
||||
#define TK_NULL 103
|
||||
#define TK_SELECT 104
|
||||
#define TK_UNION 105
|
||||
#define TK_ALL 106
|
||||
#define TK_FROM 107
|
||||
#define TK_VARIABLE 108
|
||||
#define TK_INTERVAL 109
|
||||
#define TK_FILL 110
|
||||
#define TK_SLIDING 111
|
||||
#define TK_ORDER 112
|
||||
#define TK_BY 113
|
||||
#define TK_ASC 114
|
||||
#define TK_DESC 115
|
||||
#define TK_GROUP 116
|
||||
#define TK_HAVING 117
|
||||
#define TK_LIMIT 118
|
||||
#define TK_OFFSET 119
|
||||
#define TK_SLIMIT 120
|
||||
#define TK_SOFFSET 121
|
||||
#define TK_WHERE 122
|
||||
#define TK_NOW 123
|
||||
#define TK_RESET 124
|
||||
#define TK_QUERY 125
|
||||
#define TK_ADD 126
|
||||
#define TK_COLUMN 127
|
||||
#define TK_TAG 128
|
||||
#define TK_CHANGE 129
|
||||
#define TK_SET 130
|
||||
#define TK_KILL 131
|
||||
#define TK_CONNECTION 132
|
||||
#define TK_STREAM 133
|
||||
#define TK_COLON 134
|
||||
#define TK_ABORT 135
|
||||
#define TK_AFTER 136
|
||||
#define TK_ATTACH 137
|
||||
#define TK_BEFORE 138
|
||||
#define TK_BEGIN 139
|
||||
#define TK_CASCADE 140
|
||||
#define TK_CLUSTER 141
|
||||
#define TK_CONFLICT 142
|
||||
#define TK_COPY 143
|
||||
#define TK_DEFERRED 144
|
||||
#define TK_DELIMITERS 145
|
||||
#define TK_DETACH 146
|
||||
#define TK_EACH 147
|
||||
#define TK_END 148
|
||||
#define TK_EXPLAIN 149
|
||||
#define TK_FAIL 150
|
||||
#define TK_FOR 151
|
||||
#define TK_IGNORE 152
|
||||
#define TK_IMMEDIATE 153
|
||||
#define TK_INITIALLY 154
|
||||
#define TK_INSTEAD 155
|
||||
#define TK_MATCH 156
|
||||
#define TK_KEY 157
|
||||
#define TK_OF 158
|
||||
#define TK_RAISE 159
|
||||
#define TK_REPLACE 160
|
||||
#define TK_RESTRICT 161
|
||||
#define TK_ROW 162
|
||||
#define TK_STATEMENT 163
|
||||
#define TK_TRIGGER 164
|
||||
#define TK_VIEW 165
|
||||
#define TK_COUNT 166
|
||||
#define TK_SUM 167
|
||||
#define TK_AVG 168
|
||||
#define TK_MIN 169
|
||||
#define TK_MAX 170
|
||||
#define TK_FIRST 171
|
||||
#define TK_LAST 172
|
||||
#define TK_TOP 173
|
||||
#define TK_BOTTOM 174
|
||||
#define TK_STDDEV 175
|
||||
#define TK_PERCENTILE 176
|
||||
#define TK_APERCENTILE 177
|
||||
#define TK_LEASTSQUARES 178
|
||||
#define TK_HISTOGRAM 179
|
||||
#define TK_DIFF 180
|
||||
#define TK_SPREAD 181
|
||||
#define TK_TWA 182
|
||||
#define TK_INTERP 183
|
||||
#define TK_LAST_ROW 184
|
||||
#define TK_RATE 185
|
||||
#define TK_IRATE 186
|
||||
#define TK_SUM_RATE 187
|
||||
#define TK_SUM_IRATE 188
|
||||
#define TK_AVG_RATE 189
|
||||
#define TK_AVG_IRATE 190
|
||||
#define TK_TBID 191
|
||||
#define TK_SEMI 192
|
||||
#define TK_NONE 193
|
||||
#define TK_PREV 194
|
||||
#define TK_LINEAR 195
|
||||
#define TK_IMPORT 196
|
||||
#define TK_METRIC 197
|
||||
#define TK_TBNAME 198
|
||||
#define TK_JOIN 199
|
||||
#define TK_METRICS 200
|
||||
#define TK_STABLE 201
|
||||
#define TK_INSERT 202
|
||||
#define TK_INTO 203
|
||||
#define TK_VALUES 204
|
||||
|
||||
#define TK_FSYNC 95
|
||||
#define TK_COMP 96
|
||||
#define TK_PRECISION 97
|
||||
#define TK_LP 98
|
||||
#define TK_RP 99
|
||||
#define TK_TAGS 100
|
||||
#define TK_USING 101
|
||||
#define TK_AS 102
|
||||
#define TK_COMMA 103
|
||||
#define TK_NULL 104
|
||||
#define TK_SELECT 105
|
||||
#define TK_UNION 106
|
||||
#define TK_ALL 107
|
||||
#define TK_FROM 108
|
||||
#define TK_VARIABLE 109
|
||||
#define TK_INTERVAL 110
|
||||
#define TK_FILL 111
|
||||
#define TK_SLIDING 112
|
||||
#define TK_ORDER 113
|
||||
#define TK_BY 114
|
||||
#define TK_ASC 115
|
||||
#define TK_DESC 116
|
||||
#define TK_GROUP 117
|
||||
#define TK_HAVING 118
|
||||
#define TK_LIMIT 119
|
||||
#define TK_OFFSET 120
|
||||
#define TK_SLIMIT 121
|
||||
#define TK_SOFFSET 122
|
||||
#define TK_WHERE 123
|
||||
#define TK_NOW 124
|
||||
#define TK_RESET 125
|
||||
#define TK_QUERY 126
|
||||
#define TK_ADD 127
|
||||
#define TK_COLUMN 128
|
||||
#define TK_TAG 129
|
||||
#define TK_CHANGE 130
|
||||
#define TK_SET 131
|
||||
#define TK_KILL 132
|
||||
#define TK_CONNECTION 133
|
||||
#define TK_STREAM 134
|
||||
#define TK_COLON 135
|
||||
#define TK_ABORT 136
|
||||
#define TK_AFTER 137
|
||||
#define TK_ATTACH 138
|
||||
#define TK_BEFORE 139
|
||||
#define TK_BEGIN 140
|
||||
#define TK_CASCADE 141
|
||||
#define TK_CLUSTER 142
|
||||
#define TK_CONFLICT 143
|
||||
#define TK_COPY 144
|
||||
#define TK_DEFERRED 145
|
||||
#define TK_DELIMITERS 146
|
||||
#define TK_DETACH 147
|
||||
#define TK_EACH 148
|
||||
#define TK_END 149
|
||||
#define TK_EXPLAIN 150
|
||||
#define TK_FAIL 151
|
||||
#define TK_FOR 152
|
||||
#define TK_IGNORE 153
|
||||
#define TK_IMMEDIATE 154
|
||||
#define TK_INITIALLY 155
|
||||
#define TK_INSTEAD 156
|
||||
#define TK_MATCH 157
|
||||
#define TK_KEY 158
|
||||
#define TK_OF 159
|
||||
#define TK_RAISE 160
|
||||
#define TK_REPLACE 161
|
||||
#define TK_RESTRICT 162
|
||||
#define TK_ROW 163
|
||||
#define TK_STATEMENT 164
|
||||
#define TK_TRIGGER 165
|
||||
#define TK_VIEW 166
|
||||
#define TK_COUNT 167
|
||||
#define TK_SUM 168
|
||||
#define TK_AVG 169
|
||||
#define TK_MIN 170
|
||||
#define TK_MAX 171
|
||||
#define TK_FIRST 172
|
||||
#define TK_LAST 173
|
||||
#define TK_TOP 174
|
||||
#define TK_BOTTOM 175
|
||||
#define TK_STDDEV 176
|
||||
#define TK_PERCENTILE 177
|
||||
#define TK_APERCENTILE 178
|
||||
#define TK_LEASTSQUARES 179
|
||||
#define TK_HISTOGRAM 180
|
||||
#define TK_DIFF 181
|
||||
#define TK_SPREAD 182
|
||||
#define TK_TWA 183
|
||||
#define TK_INTERP 184
|
||||
#define TK_LAST_ROW 185
|
||||
#define TK_RATE 186
|
||||
#define TK_IRATE 187
|
||||
#define TK_SUM_RATE 188
|
||||
#define TK_SUM_IRATE 189
|
||||
#define TK_AVG_RATE 190
|
||||
#define TK_AVG_IRATE 191
|
||||
#define TK_TBID 192
|
||||
#define TK_SEMI 193
|
||||
#define TK_NONE 194
|
||||
#define TK_PREV 195
|
||||
#define TK_LINEAR 196
|
||||
#define TK_IMPORT 197
|
||||
#define TK_METRIC 198
|
||||
#define TK_TBNAME 199
|
||||
#define TK_JOIN 200
|
||||
#define TK_METRICS 201
|
||||
#define TK_STABLE 202
|
||||
#define TK_INSERT 203
|
||||
#define TK_INTO 204
|
||||
#define TK_VALUES 205
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_COMMENT 301
|
||||
|
|
|
@ -35,6 +35,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int8_t walLevel; // wal level
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
int8_t wals; // number of WAL files;
|
||||
int8_t keep; // keep the wal file when closed
|
||||
} SWalCfg;
|
||||
|
|
|
@ -179,8 +179,8 @@ static struct argp_option options[] = {
|
|||
{"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3},
|
||||
{"end-time", 'E', "END_TIME", 0, "End time to dump.", 3},
|
||||
{"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
|
||||
{"table-batch", 'T', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
|
||||
{"thread_num", 't', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
|
||||
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
|
||||
{"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
|
||||
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
|
||||
{0}};
|
||||
|
||||
|
@ -304,10 +304,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
|||
case 'N':
|
||||
arguments->data_batch = atoi(arg);
|
||||
break;
|
||||
case 'T':
|
||||
case 't':
|
||||
arguments->table_batch = atoi(arg);
|
||||
break;
|
||||
case 't':
|
||||
case 'T':
|
||||
arguments->thread_num = atoi(arg);
|
||||
break;
|
||||
case OPT_ABORT:
|
||||
|
@ -406,7 +406,7 @@ int main(int argc, char *argv[]) {
|
|||
printf("password: %s\n", tsArguments.password);
|
||||
printf("port: %u\n", tsArguments.port);
|
||||
printf("cversion: %s\n", tsArguments.cversion);
|
||||
printf("mysqlFlag: %d", tsArguments.mysqlFlag);
|
||||
printf("mysqlFlag: %d\n", tsArguments.mysqlFlag);
|
||||
printf("outpath: %s\n", tsArguments.outpath);
|
||||
printf("inpath: %s\n", tsArguments.inpath);
|
||||
printf("encode: %s\n", tsArguments.encode);
|
||||
|
@ -821,7 +821,7 @@ _exit_failure:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
|
||||
int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon, bool isSuperTable) {
|
||||
TAOS_ROW row = NULL;
|
||||
TAOS_RES *tmpResult = NULL;
|
||||
int count = 0;
|
||||
|
@ -832,6 +832,13 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
char* tbuf = (char *)malloc(COMMAND_SIZE);
|
||||
if (tbuf == NULL) {
|
||||
fprintf(stderr, "failed to allocate memory\n");
|
||||
free(tempCommand);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sprintf(tempCommand, "describe %s", table);
|
||||
|
||||
tmpResult = taos_query(taosCon, tempCommand);
|
||||
|
@ -862,6 +869,92 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
|
|||
taos_free_result(tmpResult);
|
||||
tmpResult = NULL;
|
||||
|
||||
if (isSuperTable) {
|
||||
free(tempCommand);
|
||||
return count;
|
||||
}
|
||||
|
||||
// if chidl-table have tag, using select tagName from table to get tagValue
|
||||
for (int i = 0 ; i < count; i++) {
|
||||
if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue;
|
||||
|
||||
|
||||
sprintf(tempCommand, "select %s from %s", tableDes->cols[i].field, table);
|
||||
|
||||
tmpResult = taos_query(taosCon, tempCommand);
|
||||
code = taos_errno(tmpResult);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to run command %s\n", tempCommand);
|
||||
free(tempCommand);
|
||||
taos_free_result(tmpResult);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fields = taos_fetch_fields(tmpResult);
|
||||
|
||||
row = taos_fetch_row(tmpResult);
|
||||
if (NULL == row) {
|
||||
fprintf(stderr, " fetch failed to run command %s\n", tempCommand);
|
||||
free(tempCommand);
|
||||
taos_free_result(tmpResult);
|
||||
return -1;
|
||||
}
|
||||
|
||||
switch (fields[0].type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
sprintf(tableDes->cols[i].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
sprintf(tableDes->cols[i].note, "%d", (int)(*((char *)row[0])));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
sprintf(tableDes->cols[i].note, "%d", (int)(*((short *)row[0])));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
sprintf(tableDes->cols[i].note, "%d", *((int *)row[0]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
sprintf(tableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
sprintf(tableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
sprintf(tableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0]));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
tableDes->cols[i].note[0] = '\'';
|
||||
converStringToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE);
|
||||
char* pstr = stpcpy(&(tableDes->cols[i].note[1]), tbuf);
|
||||
*(pstr++) = '\'';
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
convertNCharToReadable((char *)row[0], fields[0].bytes, tbuf, COMMAND_SIZE);
|
||||
sprintf(tableDes->cols[i].note, "\'%s\'", tbuf);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
|
||||
#if 0
|
||||
if (!arguments->mysqlFlag) {
|
||||
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
|
||||
} else {
|
||||
char buf[64] = "\0";
|
||||
int64_t ts = *((int64_t *)row[0]);
|
||||
time_t tt = (time_t)(ts / 1000);
|
||||
struct tm *ptm = localtime(&tt);
|
||||
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
|
||||
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
taos_free_result(tmpResult);
|
||||
tmpResult = NULL;
|
||||
}
|
||||
|
||||
free(tempCommand);
|
||||
|
||||
return count;
|
||||
|
@ -886,23 +979,25 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
|
|||
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
|
||||
*/
|
||||
|
||||
count = taosGetTableDes(table, tableDes, taosCon);
|
||||
count = taosGetTableDes(table, tableDes, taosCon, false);
|
||||
|
||||
if (count < 0) {
|
||||
free(tableDes);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create child-table using super-table
|
||||
taosDumpCreateMTableClause(tableDes, metric, count, fp);
|
||||
|
||||
} else { // dump table definition
|
||||
count = taosGetTableDes(table, tableDes, taosCon);
|
||||
count = taosGetTableDes(table, tableDes, taosCon, false);
|
||||
|
||||
if (count < 0) {
|
||||
free(tableDes);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create normal-table or super-table
|
||||
taosDumpCreateTableClause(tableDes, count, fp);
|
||||
}
|
||||
|
||||
|
@ -1033,7 +1128,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) {
|
|||
exit(-1);
|
||||
}
|
||||
|
||||
count = taosGetTableDes(table, tableDes, taosCon);
|
||||
count = taosGetTableDes(table, tableDes, taosCon, true);
|
||||
|
||||
if (count < 0) {
|
||||
free(tableDes);
|
||||
|
@ -1083,7 +1178,6 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
|
|||
taos_free_result(tmpResult);
|
||||
exit(-1);
|
||||
}
|
||||
taos_free_result(tmpResult);
|
||||
|
||||
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
|
||||
|
||||
|
@ -1291,14 +1385,16 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
|
|||
if (counter != count_temp) {
|
||||
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
|
||||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
|
||||
pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
|
||||
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
|
||||
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
|
||||
} else {
|
||||
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
|
||||
}
|
||||
} else {
|
||||
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
|
||||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
|
||||
pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
|
||||
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
|
||||
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
|
||||
} else {
|
||||
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
|
||||
}
|
||||
|
@ -1363,7 +1459,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
|
|||
return -1;
|
||||
}
|
||||
|
||||
numFields = taos_field_count(taosCon);
|
||||
numFields = taos_field_count(tmpResult);
|
||||
assert(numFields > 0);
|
||||
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
|
||||
tbuf = (char *)malloc(COMMAND_SIZE);
|
||||
|
@ -2015,6 +2111,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
|
|||
}
|
||||
|
||||
memcpy(cmd + cmd_len, line, read_len);
|
||||
cmd[read_len + cmd_len]= '\0';
|
||||
if (queryDB(taos, cmd)) {
|
||||
fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName);
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
|
||||
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnode->walCfg.fsyncPeriod);
|
||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnode->syncCfg.replica);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnode->walCfg.wals);
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnode->syncCfg.quorum);
|
||||
|
@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
|
||||
|
||||
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
|
||||
if (!fsyncPeriod || fsyncPeriod->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, fsyncPeriod not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
|
||||
|
||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||
if (!wals || wals->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, wals not found\n", pVnode->vgId);
|
||||
|
|
|
@ -160,6 +160,7 @@ typedef struct {
|
|||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int32_t commitTime;
|
||||
int32_t fsyncPeriod;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t walLevel;
|
||||
|
|
|
@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
|
|||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
|
||||
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
|
||||
TSDB_MAX_DB_REPLICA_OPTION);
|
||||
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) {
|
||||
mError("invalid db option fsyncPeriod:%d, valid range: [%d, %d]", pCfg->fsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
|
||||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL) {
|
||||
mError("invalid db option walLevel:%d must be greater than 0", pCfg->walLevel);
|
||||
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
|
||||
mError("invalid db option replications:%d valid range: [%d, %d]", pCfg->replications, TSDB_MIN_DB_REPLICA_OPTION,
|
||||
TSDB_MAX_DB_REPLICA_OPTION);
|
||||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
|
@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep;
|
||||
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = tsMinRowsInFileBlock;
|
||||
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = tsMaxRowsInFileBlock;
|
||||
if (pCfg->fsyncPeriod <0) pCfg->fsyncPeriod = tsFsyncPeriod;
|
||||
if (pCfg->commitTime < 0) pCfg->commitTime = tsCommitTime;
|
||||
if (pCfg->precision < 0) pCfg->precision = tsTimePrecision;
|
||||
if (pCfg->compression < 0) pCfg->compression = tsCompression;
|
||||
|
@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
|
|||
.daysToKeep2 = pCreate->daysToKeep2,
|
||||
.minRowsPerFileBlock = pCreate->minRowsPerFileBlock,
|
||||
.maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock,
|
||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||
.commitTime = pCreate->commitTime,
|
||||
.precision = pCreate->precision,
|
||||
.compression = pCreate->compression,
|
||||
|
@ -559,6 +561,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
|
|||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "fsync");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 1;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema[cols].name, "comp");
|
||||
|
@ -682,6 +690,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
|
|||
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||
cols++;
|
||||
|
@ -758,6 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
|
|||
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
|
||||
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
|
||||
pCreate->commitTime = htonl(pCreate->commitTime);
|
||||
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
|
||||
pCreate->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
|
||||
pCreate->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
|
||||
|
||||
|
@ -785,6 +798,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
int32_t minRows = htonl(pAlter->minRowsPerFileBlock);
|
||||
int32_t maxRows = htonl(pAlter->maxRowsPerFileBlock);
|
||||
int32_t commitTime = htonl(pAlter->commitTime);
|
||||
int32_t fsyncPeriod = htonl(pAlter->fsyncPeriod);
|
||||
int8_t compression = pAlter->compression;
|
||||
int8_t walLevel = pAlter->walLevel;
|
||||
int8_t replications = pAlter->replications;
|
||||
|
@ -861,6 +875,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (fsyncPeriod >= 0 && fsyncPeriod != pDb->cfg.fsyncPeriod) {
|
||||
mError("db:%s, can't alter fsyncPeriod option", pDb->name);
|
||||
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||
}
|
||||
|
||||
if (replications > 0 && replications != pDb->cfg.replications) {
|
||||
mDebug("db:%s, replications:%d change to %d", pDb->name, pDb->cfg.replications, replications);
|
||||
newCfg.replications = replications;
|
||||
|
|
|
@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
|
|||
}
|
||||
|
||||
static int32_t sdbInitWal() {
|
||||
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1};
|
||||
SWalCfg walCfg = {.walLevel = 2, .wals = 2, .keep = 1, .fsyncPeriod = 0};
|
||||
char temp[TSDB_FILENAME_LEN];
|
||||
sprintf(temp, "%s/wal", tsMnodeDir);
|
||||
tsSdbObj.wal = walOpen(temp, &walCfg);
|
||||
|
@ -252,13 +252,15 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|||
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
|
||||
if (processedCount <= 1) {
|
||||
if (pMsg != NULL) {
|
||||
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
|
||||
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d result:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
processedCount, tstrerror(code));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (pMsg != NULL) {
|
||||
sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
|
||||
sdbDebug("app:%p:%p, is confirmed and will do callback func, result:%s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
if (pOper->cb != NULL) {
|
||||
|
|
|
@ -757,6 +757,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
|||
pCfg->daysToKeep2 = htonl(pDb->cfg.daysToKeep2);
|
||||
pCfg->minRowsPerFileBlock = htonl(pDb->cfg.minRowsPerFileBlock);
|
||||
pCfg->maxRowsPerFileBlock = htonl(pDb->cfg.maxRowsPerFileBlock);
|
||||
pCfg->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod);
|
||||
pCfg->commitTime = htonl(pDb->cfg.commitTime);
|
||||
pCfg->precision = pDb->cfg.precision;
|
||||
pCfg->compression = pDb->cfg.compression;
|
||||
|
|
|
@ -116,6 +116,7 @@ typedef struct SCreateDBInfo {
|
|||
int32_t daysPerFile;
|
||||
int32_t minRowsPerBlock;
|
||||
int32_t maxRowsPerBlock;
|
||||
int32_t fsyncPeriod;
|
||||
int64_t commitTime;
|
||||
int32_t walLevel;
|
||||
int32_t compressionLevel;
|
||||
|
|
|
@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; }
|
|||
blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; }
|
||||
ctime(Y) ::= CTIME INTEGER(X). { Y = X; }
|
||||
wal(Y) ::= WAL INTEGER(X). { Y = X; }
|
||||
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
|
||||
comp(Y) ::= COMP INTEGER(X). { Y = X; }
|
||||
prec(Y) ::= PRECISION STRING(X). { Y = X; }
|
||||
|
||||
|
@ -236,6 +237,7 @@ db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod
|
|||
db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
|
||||
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
|
||||
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
||||
|
@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
|
|||
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
|
||||
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); }
|
||||
|
||||
%type typename {TAOS_FIELD}
|
||||
typename(A) ::= ids(X). {
|
||||
|
|
|
@ -2202,7 +2202,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
while (true) {
|
||||
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
break;
|
||||
}
|
||||
summary->totalBlocks += 1;
|
||||
|
||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
|
@ -3188,6 +3194,9 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
|||
|
||||
// add ref for table
|
||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
switchCtxOrder(pRuntimeEnv);
|
||||
|
@ -3260,6 +3269,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
|||
}
|
||||
|
||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
|
@ -3916,7 +3928,14 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
while (true) {
|
||||
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
|
@ -3960,7 +3979,14 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||
|
||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
|
||||
while (true) {
|
||||
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle, &blockInfo);
|
||||
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
|
@ -4059,16 +4085,16 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
if (onlyQueryTags(pQuery)) {
|
||||
return;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) {
|
||||
return;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STsdbQueryCond cond = {
|
||||
|
@ -4090,6 +4116,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
|||
cond.twindow = pCheckInfo->win;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
} else if (isPointInterpoQuery(pQuery)) {
|
||||
|
@ -4097,6 +4124,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
|||
} else {
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
}
|
||||
return terrno;
|
||||
}
|
||||
|
||||
static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
||||
|
@ -4133,7 +4161,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
|
||||
setScanLimitationByResultBuffer(pQuery);
|
||||
changeExecuteScanOrder(pQInfo, false);
|
||||
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pQInfo->tsdb = tsdb;
|
||||
pQInfo->vgId = vgId;
|
||||
|
@ -4257,7 +4288,14 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
while (true) {
|
||||
if (!tsdbNextDataBlock(pQueryHandle)) {
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
summary->totalBlocks += 1;
|
||||
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
|
@ -4338,6 +4376,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
|||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||
taosArrayDestroy(tx);
|
||||
taosArrayDestroy(g1);
|
||||
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||
if (pRuntimeEnv->cur.vgroupIndex == -1) {
|
||||
|
@ -4405,6 +4446,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
} else {
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||
}
|
||||
|
||||
taosArrayDestroy(tx);
|
||||
taosArrayDestroy(g1);
|
||||
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
initCtxOutputBuf(pRuntimeEnv);
|
||||
|
||||
|
@ -4469,6 +4516,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||
taosArrayDestroy(g1);
|
||||
taosArrayDestroy(tx);
|
||||
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle);
|
||||
assert(taosArrayGetSize(s) >= 1);
|
||||
|
@ -4664,6 +4714,9 @@ static void doSaveContext(SQInfo *pQInfo) {
|
|||
|
||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||
longjmp(pRuntimeEnv->env, terrno);
|
||||
}
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
switchCtxOrder(pRuntimeEnv);
|
||||
|
|
|
@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
|
|||
pDBInfo->compressionLevel = -1;
|
||||
|
||||
pDBInfo->walLevel = -1;
|
||||
pDBInfo->fsyncPeriod = -1;
|
||||
pDBInfo->commitTime = -1;
|
||||
pDBInfo->maxTablesPerVnode = -1;
|
||||
|
||||
|
|
|
@ -124,6 +124,7 @@ static SKeyword keywordTable[] = {
|
|||
{"CACHE", TK_CACHE},
|
||||
{"CTIME", TK_CTIME},
|
||||
{"WAL", TK_WAL},
|
||||
{"FSYNC", TK_FSYNC},
|
||||
{"COMP", TK_COMP},
|
||||
{"PRECISION", TK_PRECISION},
|
||||
{"LP", TK_LP},
|
||||
|
|
2253
src/query/src/sql.c
2253
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
|
@ -96,6 +96,11 @@ typedef struct {
|
|||
} STsdbBufPool;
|
||||
|
||||
// ------------------ tsdbMemTable.c
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
typedef struct {
|
||||
uint64_t uid;
|
||||
TSKEY keyFirst;
|
||||
|
@ -206,10 +211,10 @@ typedef struct {
|
|||
int64_t offset : 63;
|
||||
int32_t algorithm : 8;
|
||||
int32_t numOfRows : 24;
|
||||
int32_t sversion;
|
||||
int32_t len;
|
||||
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols
|
||||
int16_t numOfSubBlocks;
|
||||
int16_t numOfCols;
|
||||
int16_t numOfCols; // not including timestamp column
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
} SCompBlock;
|
||||
|
@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
|||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
TSKEY* filterKeys, int nFilterKeys);
|
||||
|
||||
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||
if (pIter == NULL) return NULL;
|
||||
|
||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) return NULL;
|
||||
|
||||
return SL_GET_NODE_DATA(node);
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) return -1;
|
||||
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
// ------------------ tsdbFile.c
|
||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||
|
@ -421,25 +444,36 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
|||
#define helperType(h) (h)->type
|
||||
#define helperRepo(h) (h)->pRepo
|
||||
#define helperState(h) (h)->state
|
||||
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
||||
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||
void tsdbDestroyHelper(SRWHelper* pHelper);
|
||||
void tsdbResetHelper(SRWHelper* pHelper);
|
||||
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
|
||||
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
|
||||
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
|
||||
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
|
||||
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
|
||||
int tsdbWriteCompInfo(SRWHelper* pHelper);
|
||||
int tsdbWriteCompIdx(SRWHelper* pHelper);
|
||||
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
|
||||
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
|
||||
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
|
||||
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
|
||||
int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
|
||||
|
||||
static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
|
||||
if (*(TSKEY*)key1 > *(TSKEY*)key2) {
|
||||
return 1;
|
||||
} else if (*(TSKEY*)key1 == *(TSKEY*)key2) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
#define REPO_ID(r) (r)->config.tsdbId
|
||||
|
|
|
@ -18,11 +18,6 @@
|
|||
|
||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||
|
||||
typedef struct {
|
||||
STable * pTable;
|
||||
SSkipListIterator *pIter;
|
||||
} SCommitIter;
|
||||
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
|
@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data);
|
|||
static void * tsdbCommitData(void *arg);
|
||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey,
|
||||
int maxRowsToRead, SDataCols *pCols);
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
|
@ -252,6 +244,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||
TSKEY *filterKeys, int nFilterKeys) {
|
||||
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
int numOfRows = 0;
|
||||
TSKEY keyNext = 0;
|
||||
int filterIter = 0;
|
||||
|
||||
if (nFilterKeys != 0) { // for filter purpose
|
||||
ASSERT(filterKeys != NULL);
|
||||
keyNext = tsdbNextIterKey(pIter);
|
||||
if (keyNext < 0 || keyNext > maxKey) return numOfRows;
|
||||
void *ptr = taosbsearch((void *)(&keyNext), (void *)filterKeys, nFilterKeys, sizeof(TSKEY), compTSKEY, TD_GE);
|
||||
filterIter = (ptr == NULL) ? nFilterKeys : (POINTER_DISTANCE(ptr, filterKeys) / sizeof(TSKEY));
|
||||
}
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SDataRow row = tsdbNextIterRow(pIter);
|
||||
if (row == NULL) break;
|
||||
|
||||
keyNext = dataRowKey(row);
|
||||
if (keyNext < 0 || keyNext > maxKey) break;
|
||||
|
||||
bool keyFiltered = false;
|
||||
if (nFilterKeys != 0) {
|
||||
while (true) {
|
||||
if (filterIter >= nFilterKeys) break;
|
||||
if (keyNext == filterKeys[filterIter]) {
|
||||
keyFiltered = true;
|
||||
filterIter++;
|
||||
break;
|
||||
} else if (keyNext < filterKeys[filterIter]) {
|
||||
break;
|
||||
} else {
|
||||
filterIter++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!keyFiltered) {
|
||||
if (pCols) {
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
}
|
||||
numOfRows++;
|
||||
}
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL);
|
||||
|
@ -378,7 +430,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
// Create the iterator to read from cache
|
||||
if (pMem->numOfRows > 0) {
|
||||
iters = tsdbCreateTableIters(pRepo);
|
||||
iters = tsdbCreateCommitIters(pRepo);
|
||||
if (iters == NULL) {
|
||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
goto _exit;
|
||||
|
@ -418,7 +470,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
@ -479,19 +531,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
|
|||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
|
||||
}
|
||||
|
||||
static TSKEY tsdbNextIterKey(SCommitIter *pIter) {
|
||||
if (pIter == NULL) return -1;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter->pIter);
|
||||
if (node == NULL) return -1;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
return dataRowKey(row);
|
||||
}
|
||||
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
|
||||
for (int i = 0; i < nIters; i++) {
|
||||
TSKEY nextKey = tsdbNextIterKey(iters + i);
|
||||
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
|
||||
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
|
||||
}
|
||||
return 0;
|
||||
|
@ -504,7 +546,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
|
|||
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||
char * dataDir = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
|
@ -549,33 +590,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
if (pIter->pIter != NULL) {
|
||||
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
||||
|
||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||
int nLoop = 0;
|
||||
while (true) {
|
||||
int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
ASSERT(rowsRead >= 0);
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
nLoop++;
|
||||
|
||||
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
|
||||
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
|
||||
|
||||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
||||
ASSERT(rowsWritten != 0);
|
||||
if (rowsWritten < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
ASSERT(rowsWritten <= pDataCols->numOfRows);
|
||||
|
||||
tdPopDataColsPoints(pDataCols, rowsWritten);
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
|
||||
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(pDataCols->numOfRows == 0);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
|
@ -615,7 +636,7 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
@ -645,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (!tSkipListIterNext(iters[i].pIter)) {
|
||||
terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM;
|
||||
goto _err;
|
||||
}
|
||||
tSkipListIterNext(iters[i].pIter);
|
||||
}
|
||||
}
|
||||
|
||||
return iters;
|
||||
|
||||
_err:
|
||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
||||
if (iters == NULL) return;
|
||||
|
||||
for (int i = 1; i < maxTables; i++) {
|
||||
|
@ -670,35 +688,4 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
|
|||
}
|
||||
|
||||
free(iters);
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||
ASSERT(maxRowsToRead > 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
|
||||
int numOfRows = 0;
|
||||
|
||||
do {
|
||||
if (numOfRows >= maxRowsToRead) break;
|
||||
|
||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||
if (node == NULL) break;
|
||||
|
||||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
// TODO: deal with the error here
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
numOfRows++;
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
return numOfRows;
|
||||
}
|
|
@ -727,7 +727,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
|||
|
||||
T_REF_INC(pTable);
|
||||
|
||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
tsdbTrace("table %s tid %d uid %" PRIu64 " is created", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
TABLE_UID(pTable));
|
||||
|
||||
return pTable;
|
||||
|
@ -740,7 +740,7 @@ _err:
|
|||
static void tsdbFreeTable(STable *pTable) {
|
||||
if (pTable) {
|
||||
if (pTable->name != NULL)
|
||||
tsdbDebug("table %s tid %d uid %" PRIu64 " is destroyed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
tsdbTrace("table %s tid %d uid %" PRIu64 " is freed", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable),
|
||||
TABLE_UID(pTable));
|
||||
tfree(TABLE_NAME(pTable));
|
||||
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -179,7 +179,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
||||
pQueryHandle->allocSize = 0;
|
||||
|
||||
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
|
||||
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
|
||||
free(pQueryHandle);
|
||||
return NULL;
|
||||
}
|
||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
||||
|
||||
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
||||
|
@ -238,11 +241,11 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
|||
|
||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
||||
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
|
||||
changeQueryHandleForLastrowQuery(pQueryHandle);
|
||||
if (pQueryHandle != NULL) {
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||
changeQueryHandleForLastrowQuery(pQueryHandle);
|
||||
}
|
||||
return pQueryHandle;
|
||||
}
|
||||
|
||||
|
@ -264,9 +267,10 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
|
|||
|
||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
||||
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
|
||||
changeQueryHandleForInterpQuery(pQueryHandle);
|
||||
if (pQueryHandle != NULL) {
|
||||
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
|
||||
changeQueryHandleForInterpQuery(pQueryHandle);
|
||||
}
|
||||
return pQueryHandle;
|
||||
}
|
||||
|
||||
|
@ -1522,7 +1526,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
pSecQueryHandle->activeIndex = 0;
|
||||
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
|
||||
|
||||
tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb);
|
||||
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
|
||||
free(pSecQueryHandle);
|
||||
return false;
|
||||
}
|
||||
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
|
@ -1606,7 +1613,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
}
|
||||
|
||||
// TODO: opt by consider the scan order
|
||||
return doHasDataInBuffer(pQueryHandle);
|
||||
bool ret = doHasDataInBuffer(pQueryHandle);
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
||||
|
|
|
@ -69,6 +69,7 @@ int32_t vnodeInitResources() {
|
|||
}
|
||||
|
||||
void vnodeCleanupResources() {
|
||||
|
||||
if (tsDnodeVnodesHash != NULL) {
|
||||
taosHashCleanup(tsDnodeVnodesHash);
|
||||
tsDnodeVnodesHash = NULL;
|
||||
|
@ -137,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
return TSDB_CODE_VND_INIT_FAILED;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel);
|
||||
vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
|
||||
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
||||
|
||||
return code;
|
||||
|
@ -618,6 +619,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
||||
len += snprintf(content + len, maxLen - len, " \"fsync\": %d,\n", pVnodeCfg->cfg.fsyncPeriod);
|
||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
|
||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
|
||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
|
||||
|
@ -782,6 +784,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
|
||||
|
||||
cJSON *fsyncPeriod = cJSON_GetObjectItem(root, "fsync");
|
||||
if (!walLevel || walLevel->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, fsyncPeriod not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->walCfg.fsyncPeriod = fsyncPeriod->valueint;
|
||||
|
||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||
if (!wals || wals->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId);
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "tlog.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tutil.h"
|
||||
#include "ttimer.h"
|
||||
#include "taoserror.h"
|
||||
#include "twal.h"
|
||||
#include "tqueue.h"
|
||||
|
@ -44,6 +45,9 @@ typedef struct {
|
|||
int fd;
|
||||
int keep;
|
||||
int level;
|
||||
int32_t fsyncPeriod;
|
||||
void *timer;
|
||||
void *signature;
|
||||
int max; // maximum number of wal files
|
||||
uint32_t id; // increase continuously
|
||||
int num; // number of wal files
|
||||
|
@ -52,10 +56,23 @@ typedef struct {
|
|||
pthread_mutex_t mutex;
|
||||
} SWal;
|
||||
|
||||
static void *walTmrCtrl = NULL;
|
||||
static int tsWalNum = 0;
|
||||
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
|
||||
static uint32_t walSignature = 0xFAFBFDFE;
|
||||
static int walHandleExistingFiles(const char *path);
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
|
||||
static int walRemoveWalFiles(const char *path);
|
||||
static int walHandleExistingFiles(const char *path);
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
|
||||
static int walRemoveWalFiles(const char *path);
|
||||
static void walProcessFsyncTimer(void *param, void *tmrId);
|
||||
static void walRelease(SWal *pWal);
|
||||
|
||||
static void walModuleInitFunc() {
|
||||
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
|
||||
if (walTmrCtrl == NULL)
|
||||
walModuleInit = PTHREAD_ONCE_INIT;
|
||||
else
|
||||
wDebug("WAL module is initialized");
|
||||
}
|
||||
|
||||
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||
|
@ -64,20 +81,38 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pthread_once(&walModuleInit, walModuleInitFunc);
|
||||
if (walTmrCtrl == NULL) {
|
||||
free(pWal);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&tsWalNum, 1);
|
||||
pWal->fd = -1;
|
||||
pWal->max = pCfg->wals;
|
||||
pWal->id = 0;
|
||||
pWal->num = 0;
|
||||
pWal->level = pCfg->walLevel;
|
||||
pWal->keep = pCfg->keep;
|
||||
pWal->fsyncPeriod = pCfg->fsyncPeriod;
|
||||
pWal->signature = pWal;
|
||||
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||
pthread_mutex_init(&pWal->mutex, NULL);
|
||||
|
||||
if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
|
||||
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
|
||||
if (pWal->timer == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
walRelease(pWal);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (tmkdir(path, 0755) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
|
@ -89,12 +124,11 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
|||
if (pWal && pWal->fd <0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("wal:%s, failed to open(%s)", path, strerror(errno));
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
pWal = NULL;
|
||||
}
|
||||
|
||||
if (pWal) wDebug("wal:%s, it is open, level:%d", path, pWal->level);
|
||||
if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
|
||||
return pWal;
|
||||
}
|
||||
|
||||
|
@ -102,7 +136,8 @@ void walClose(void *handle) {
|
|||
if (handle == NULL) return;
|
||||
|
||||
SWal *pWal = handle;
|
||||
close(pWal->fd);
|
||||
tclose(pWal->fd);
|
||||
if (pWal->timer) taosTmrStopA(&pWal->timer);
|
||||
|
||||
if (pWal->keep == 0) {
|
||||
// remove all files in the directory
|
||||
|
@ -118,9 +153,7 @@ void walClose(void *handle) {
|
|||
wDebug("wal:%s, it is closed and kept", pWal->name);
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
|
||||
free(pWal);
|
||||
walRelease(pWal);
|
||||
}
|
||||
|
||||
int walRenew(void *handle) {
|
||||
|
@ -194,9 +227,9 @@ int walWrite(void *handle, SWalHead *pHead) {
|
|||
void walFsync(void *handle) {
|
||||
|
||||
SWal *pWal = handle;
|
||||
if (pWal == NULL) return;
|
||||
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
|
||||
|
||||
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
|
||||
if (pWal->fsyncPeriod == 0) {
|
||||
if (fsync(pWal->fd) < 0) {
|
||||
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
|
||||
}
|
||||
|
@ -303,6 +336,20 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void walRelease(SWal *pWal) {
|
||||
|
||||
pthread_mutex_destroy(&pWal->mutex);
|
||||
pWal->signature = NULL;
|
||||
free(pWal);
|
||||
|
||||
if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
|
||||
if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
|
||||
walTmrCtrl = NULL;
|
||||
walModuleInit = PTHREAD_ONCE_INIT;
|
||||
wDebug("WAL module is cleaned up");
|
||||
}
|
||||
}
|
||||
|
||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||
char *name = pWal->name;
|
||||
|
||||
|
@ -433,3 +480,15 @@ static int walRemoveWalFiles(const char *path) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
static void walProcessFsyncTimer(void *param, void *tmrId) {
|
||||
SWal *pWal = param;
|
||||
|
||||
if (pWal->signature != pWal) return;
|
||||
if (pWal->fd < 0) return;
|
||||
|
||||
if (fsync(pWal->fd) < 0) {
|
||||
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
|
||||
}
|
||||
|
||||
pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
|
||||
}
|
||||
|
|
|
@ -84,8 +84,17 @@ class WorkerThread:
|
|||
# Let us have a DB connection of our own
|
||||
if (gConfig.per_thread_db_connection): # type: ignore
|
||||
# print("connector_type = {}".format(gConfig.connector_type))
|
||||
self._dbConn = DbConn.createNative() if (
|
||||
gConfig.connector_type == 'native') else DbConn.createRest()
|
||||
if gConfig.connector_type == 'native':
|
||||
self._dbConn = DbConn.createNative()
|
||||
elif gConfig.connector_type == 'rest':
|
||||
self._dbConn = DbConn.createRest()
|
||||
elif gConfig.connector_type == 'mixed':
|
||||
if Dice.throw(2) == 0: # 1/2 chance
|
||||
self._dbConn = DbConn.createNative()
|
||||
else:
|
||||
self._dbConn = DbConn.createRest()
|
||||
else:
|
||||
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
|
||||
|
||||
self._dbInUse = False # if "use db" was executed already
|
||||
|
||||
|
@ -130,22 +139,15 @@ class WorkerThread:
|
|||
while True:
|
||||
tc = self._tc # Thread Coordinator, the overall master
|
||||
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] exited barrier...".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
|
||||
self.crossStepGate() # then per-thread gate, after being tapped
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] exited step gate...".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
|
||||
if not self._tc.isRunning():
|
||||
logger.debug(
|
||||
"[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
||||
logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
|
||||
break
|
||||
|
||||
# Fetch a task from the Thread Coordinator
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] about to fetch task".format(
|
||||
self._tid))
|
||||
logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
|
||||
task = tc.fetchTask()
|
||||
|
||||
# Execute such a task
|
||||
|
@ -154,9 +156,7 @@ class WorkerThread:
|
|||
self._tid, task.__class__.__name__))
|
||||
task.execute(self)
|
||||
tc.saveExecutedTask(task)
|
||||
logger.debug(
|
||||
"[TRD] Worker thread [{}] finished executing task".format(
|
||||
self._tid))
|
||||
logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
|
||||
|
||||
self._dbInUse = False # there may be changes between steps
|
||||
|
||||
|
@ -255,101 +255,124 @@ class ThreadCoordinator:
|
|||
self._runStatus = MainExec.STATUS_STOPPING
|
||||
self._execStats.registerFailure("User Interruption")
|
||||
|
||||
def _runShouldEnd(self, transitionFailed, hasAbortedTask):
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
|
||||
return True
|
||||
if self._runStatus != MainExec.STATUS_RUNNING:
|
||||
return True
|
||||
if transitionFailed:
|
||||
return True
|
||||
if hasAbortedTask:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _hasAbortedTask(self): # from execution of previous step
|
||||
for task in self._executedTasks:
|
||||
if task.isAborted():
|
||||
# print("Task aborted: {}".format(task))
|
||||
# hasAbortedTask = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def _releaseAllWorkerThreads(self, transitionFailed):
|
||||
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
|
||||
# Now not all threads had time to go to sleep
|
||||
logger.debug(
|
||||
"--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
|
||||
|
||||
# A new TE for the new step
|
||||
self._te = None # set to empty first, to signal worker thread to stop
|
||||
if not transitionFailed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
|
||||
self._curStep)) # Now not all threads had time to go to sleep
|
||||
# Worker threads will wake up at this point, and each execute it's own task
|
||||
self.tapAllThreads() # release all worker thread from their "gate"
|
||||
|
||||
def _syncAtBarrier(self):
|
||||
# Now main thread (that's us) is ready to enter a step
|
||||
# let other threads go past the pool barrier, but wait at the
|
||||
# thread gate
|
||||
logger.debug("[TRD] Main thread about to cross the barrier")
|
||||
self.crossStepBarrier()
|
||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
logger.debug("[TRD] Main thread finished crossing the barrier")
|
||||
|
||||
def _doTransition(self):
|
||||
transitionFailed = False
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
# at end of step, transiton the DB state
|
||||
sm.transition(self._executedTasks)
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library,
|
||||
# we cannot share connections across threads
|
||||
if sm.hasDatabase():
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use
|
||||
# db" on behalf of every worker thread
|
||||
except taos.error.ProgrammingError as err:
|
||||
if (err.msg == 'network unavailable'): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
transitionFailed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at
|
||||
# end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
# Get ready for next step
|
||||
logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
|
||||
return transitionFailed
|
||||
|
||||
def run(self):
|
||||
self._pool.createAndStartThreads(self)
|
||||
|
||||
# Coordinate all threads step by step
|
||||
self._curStep = -1 # not started yet
|
||||
maxSteps = gConfig.max_steps # type: ignore
|
||||
|
||||
self._execStats.startExec() # start the stop watch
|
||||
transitionFailed = False
|
||||
hasAbortedTask = False
|
||||
while(self._curStep < maxSteps - 1 and
|
||||
(not transitionFailed) and
|
||||
(self._runStatus == MainExec.STATUS_RUNNING) and
|
||||
(not hasAbortedTask)): # maxStep==10, last curStep should be 9
|
||||
|
||||
if not gConfig.debug:
|
||||
# print this only if we are not in debug mode
|
||||
while not self._runShouldEnd(transitionFailed, hasAbortedTask):
|
||||
if not gConfig.debug: # print this only if we are not in debug mode
|
||||
print(".", end="", flush=True)
|
||||
logger.debug("[TRD] Main thread going to sleep")
|
||||
|
||||
# Now main thread (that's us) is ready to enter a step
|
||||
# let other threads go past the pool barrier, but wait at the
|
||||
# thread gate
|
||||
self.crossStepBarrier()
|
||||
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
|
||||
|
||||
self._syncAtBarrier() # For now just cross the barrier
|
||||
|
||||
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
|
||||
# We use this period to do house keeping work, when all worker
|
||||
# threads are QUIET.
|
||||
hasAbortedTask = False
|
||||
for task in self._executedTasks:
|
||||
if task.isAborted():
|
||||
print("Task aborted: {}".format(task))
|
||||
hasAbortedTask = True
|
||||
break
|
||||
|
||||
if hasAbortedTask: # do transition only if tasks are error free
|
||||
hasAbortedTask = self._hasAbortedTask() # from previous step
|
||||
if hasAbortedTask:
|
||||
logger.info("Aborted task encountered, exiting test program")
|
||||
self._execStats.registerFailure("Aborted Task Encountered")
|
||||
else:
|
||||
try:
|
||||
sm = self._dbManager.getStateMachine()
|
||||
logger.debug("[STT] starting transitions")
|
||||
# at end of step, transiton the DB state
|
||||
sm.transition(self._executedTasks)
|
||||
logger.debug("[STT] transition ended")
|
||||
# Due to limitation (or maybe not) of the Python library,
|
||||
# we cannot share connections across threads
|
||||
if sm.hasDatabase():
|
||||
for t in self._pool.threadList:
|
||||
logger.debug("[DB] use db for all worker threads")
|
||||
t.useDb()
|
||||
# t.execSql("use db") # main thread executing "use
|
||||
# db" on behalf of every worker thread
|
||||
except taos.error.ProgrammingError as err:
|
||||
if (err.msg == 'network unavailable'): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
traceback.print_stack()
|
||||
transitionFailed = True
|
||||
self._te = None # Not running any more
|
||||
self._execStats.registerFailure("Broken DB Connection")
|
||||
# continue # don't do that, need to tap all threads at
|
||||
# end, and maybe signal them to stop
|
||||
else:
|
||||
raise
|
||||
# finally:
|
||||
# pass
|
||||
break # do transition only if tasks are error free
|
||||
|
||||
self.resetExecutedTasks() # clear the tasks after we are done
|
||||
# Ending previous step
|
||||
transitionFailed = self._doTransition() # To start, we end step -1 first
|
||||
# Then we move on to the next step
|
||||
self._releaseAllWorkerThreads(transitionFailed)
|
||||
|
||||
# Get ready for next step
|
||||
logger.debug("<-- Step {} finished".format(self._curStep))
|
||||
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
|
||||
# Now not all threads had time to go to sleep
|
||||
logger.debug(
|
||||
"\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))
|
||||
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
|
||||
logger.debug("Abnormal ending of main thraed")
|
||||
else: # regular ending, workers waiting at "barrier"
|
||||
logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
|
||||
self._syncAtBarrier()
|
||||
|
||||
# A new TE for the new step
|
||||
if not transitionFailed: # only if not failed
|
||||
self._te = TaskExecutor(self._curStep)
|
||||
|
||||
logger.debug(
|
||||
"[TRD] Main thread waking up at step {}, tapping worker threads".format(
|
||||
self._curStep)) # Now not all threads had time to go to sleep
|
||||
# Worker threads will wake up at this point, and each execute it's
|
||||
# own task
|
||||
self.tapAllThreads()
|
||||
|
||||
logger.debug("Main thread ready to finish up...")
|
||||
if not transitionFailed: # only in regular situations
|
||||
self.crossStepBarrier() # Cross it one last time, after all threads finish
|
||||
self._stepBarrier.reset()
|
||||
logger.debug("Main thread in exclusive zone...")
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
self._te = None # No more executor, time to end
|
||||
logger.debug("Main thread tapping all threads one last time...")
|
||||
self.tapAllThreads() # Let the threads run one last time
|
||||
|
||||
logger.debug("\r\n\n--> Main thread ready to finish up...")
|
||||
logger.debug("Main thread joining all threads")
|
||||
self._pool.joinAll() # Get all threads to finish
|
||||
logger.info("\nAll worker threads finished")
|
||||
|
@ -514,7 +537,7 @@ class LinearQueue():
|
|||
|
||||
class DbConn:
|
||||
TYPE_NATIVE = "native-c"
|
||||
TYPE_REST = "rest-api"
|
||||
TYPE_REST = "rest-api"
|
||||
TYPE_INVALID = "invalid"
|
||||
|
||||
@classmethod
|
||||
|
@ -620,9 +643,13 @@ class DbConnRest(DbConn):
|
|||
self.isOpen = False
|
||||
|
||||
def _doSql(self, sql):
|
||||
r = requests.post(self._url,
|
||||
data=sql,
|
||||
auth=HTTPBasicAuth('root', 'taosdata'))
|
||||
try:
|
||||
r = requests.post(self._url,
|
||||
data = sql,
|
||||
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||
except:
|
||||
print("REST API Failure (TODO: more info here)")
|
||||
raise
|
||||
rj = r.json()
|
||||
# Sanity check for the "Json Result"
|
||||
if ('status' not in rj):
|
||||
|
@ -717,7 +744,7 @@ class MyTDSql:
|
|||
class DbConnNative(DbConn):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._type = self.TYPE_REST
|
||||
self._type = self.TYPE_NATIVE
|
||||
self._conn = None
|
||||
self._cursor = None
|
||||
|
||||
|
@ -2254,8 +2281,9 @@ class ClientManager:
|
|||
|
||||
def sigIntHandler(self, signalNumber, frame):
|
||||
if self._status != MainExec.STATUS_RUNNING:
|
||||
print("Ignoring repeated SIGINT...")
|
||||
return # do nothing if it's already not running
|
||||
print("Repeated SIGINT received, forced exit...")
|
||||
# return # do nothing if it's already not running
|
||||
sys.exit(-1)
|
||||
self._status = MainExec.STATUS_STOPPING # immediately set our status
|
||||
|
||||
print("Terminating program...")
|
||||
|
@ -2394,6 +2422,27 @@ def main():
|
|||
|
||||
'''))
|
||||
|
||||
# parser.add_argument('-a', '--auto-start-service', action='store_true',
|
||||
# help='Automatically start/stop the TDengine service (default: false)')
|
||||
# parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
|
||||
# help='Connector type to use: native, rest, or mixed (default: 10)')
|
||||
# parser.add_argument('-d', '--debug', action='store_true',
|
||||
# help='Turn on DEBUG mode for more logging (default: false)')
|
||||
# parser.add_argument('-e', '--run-tdengine', action='store_true',
|
||||
# help='Run TDengine service in foreground (default: false)')
|
||||
# parser.add_argument('-l', '--larger-data', action='store_true',
|
||||
# help='Write larger amount of data during write operations (default: false)')
|
||||
# parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
|
||||
# help='Use a single shared db connection (default: false)')
|
||||
# parser.add_argument('-r', '--record-ops', action='store_true',
|
||||
# help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
|
||||
# parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
|
||||
# help='Maximum number of steps to run (default: 100)')
|
||||
# parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
|
||||
# help='Number of threads to run (default: 10)')
|
||||
# parser.add_argument('-x', '--continue-on-exception', action='store_true',
|
||||
# help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
|
||||
|
||||
parser.add_argument(
|
||||
'-a',
|
||||
'--auto-start-service',
|
||||
|
|
|
@ -111,29 +111,29 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
|
|||
echo "dataDir $DATA_DIR" >> $TAOS_CFG
|
||||
echo "logDir $LOG_DIR" >> $TAOS_CFG
|
||||
echo "debugFlag 131" >> $TAOS_CFG
|
||||
echo "mDebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "dDebugFlag 135" >> $TAOS_CFG
|
||||
echo "vDebugFlag 135" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 135" >> $TAOS_CFG
|
||||
echo "mDebugFlag 131" >> $TAOS_CFG
|
||||
echo "sdbDebugFlag 131" >> $TAOS_CFG
|
||||
echo "dDebugFlag 131" >> $TAOS_CFG
|
||||
echo "vDebugFlag 131" >> $TAOS_CFG
|
||||
echo "tsdbDebugFlag 131" >> $TAOS_CFG
|
||||
echo "cDebugFlag 135" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 135" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 135" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 135" >> $TAOS_CFG
|
||||
echo "jnidebugFlag 131" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 131" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 143" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 131" >> $TAOS_CFG
|
||||
echo "mqttDebugFlag 131" >> $TAOS_CFG
|
||||
echo "qdebugFlag 135" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 135" >> $TAOS_CFG
|
||||
echo "qdebugFlag 131" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 131" >> $TAOS_CFG
|
||||
echo "tmrDebugFlag 131" >> $TAOS_CFG
|
||||
echo "udebugFlag 135" >> $TAOS_CFG
|
||||
echo "sdebugFlag 135" >> $TAOS_CFG
|
||||
echo "wdebugFlag 135" >> $TAOS_CFG
|
||||
echo "udebugFlag 131" >> $TAOS_CFG
|
||||
echo "sdebugFlag 131" >> $TAOS_CFG
|
||||
echo "wdebugFlag 131" >> $TAOS_CFG
|
||||
echo "monitor 0" >> $TAOS_CFG
|
||||
echo "monitorInterval 1" >> $TAOS_CFG
|
||||
echo "http 0" >> $TAOS_CFG
|
||||
echo "numOfThreadsPerCore 2.0" >> $TAOS_CFG
|
||||
echo "defaultPass taosdata" >> $TAOS_CFG
|
||||
echo "numOfLogLines 10000000" >> $TAOS_CFG
|
||||
echo "numOfLogLines 20000000" >> $TAOS_CFG
|
||||
echo "mnodeEqualVnodeNum 0" >> $TAOS_CFG
|
||||
echo "clog 2" >> $TAOS_CFG
|
||||
echo "statusInterval 1" >> $TAOS_CFG
|
||||
|
@ -145,6 +145,6 @@ echo "tableIncStepPerVnode 10000" >> $TAOS_CFG
|
|||
echo "asyncLog 0" >> $TAOS_CFG
|
||||
echo "numOfMnodes 1" >> $TAOS_CFG
|
||||
echo "locale en_US.UTF-8" >> $TAOS_CFG
|
||||
echo "anyIp 0" >> $TAOS_CFG
|
||||
echo "fsync 0" >> $TAOS_CFG
|
||||
|
||||
|
||||
|
|
|
@ -52,9 +52,9 @@ system sh/cfg.sh -n dnode1 -c qdebugFlag -v 131
|
|||
system sh/cfg.sh -n dnode2 -c qdebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode3 -c qdebugFlag -v 131
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c cDebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode2 -c cDebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode3 -c cDebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode1 -c cDebugFlag -v 135
|
||||
system sh/cfg.sh -n dnode2 -c cDebugFlag -v 135
|
||||
system sh/cfg.sh -n dnode3 -c cDebugFlag -v 135
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c udebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode2 -c udebugFlag -v 131
|
||||
|
@ -64,6 +64,10 @@ system sh/cfg.sh -n dnode1 -c wdebugFlag -v 131
|
|||
system sh/cfg.sh -n dnode2 -c wdebugFlag -v 131
|
||||
system sh/cfg.sh -n dnode3 -c wdebugFlag -v 131
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000000
|
||||
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 1000000
|
||||
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 1000000
|
||||
|
||||
print ============== deploy
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "taos.h"
|
||||
#include "tulog.h"
|
||||
#include "ttime.h"
|
||||
|
@ -154,7 +155,7 @@ void *threadFunc(void *param) {
|
|||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, tstrerror(code));
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue