[TD-3192] <feature>: support supertable limit and offset.

This commit is contained in:
Shuduo Sang 2021-03-08 19:37:33 +08:00
commit 75fe4a1d1b
3 changed files with 142 additions and 89 deletions

View File

@ -56,6 +56,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "taos.h" #include "taos.h"
#include "taoserror.h"
#include "tutil.h" #include "tutil.h"
#define REQ_EXTRA_BUF_LEN 1024 #define REQ_EXTRA_BUF_LEN 1024
@ -184,6 +185,7 @@ typedef struct SArguments_S {
bool insert_only; bool insert_only;
bool answer_yes; bool answer_yes;
bool debug_print; bool debug_print;
bool verbose_print;
char * output_file; char * output_file;
int mode; int mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_DATATYPE + 1];
@ -490,6 +492,7 @@ SArguments g_args = {
false, // use_metric false, // use_metric
false, // insert_only false, // insert_only
false, // debug_print false, // debug_print
false, // verbose_print
false, // answer_yes; false, // answer_yes;
"./output.txt", // output_file "./output.txt", // output_file
0, // mode : sync or async 0, // mode : sync or async
@ -527,7 +530,11 @@ static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL; static FILE * g_fpOfInsertResult = NULL;
#define debugPrint(fmt, ...) \ #define debugPrint(fmt, ...) \
do { if (g_args.debug_print) fprintf(stderr, fmt, __VA_ARGS__); } while(0) do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBUG:" fmt, __VA_ARGS__); } while(0)
#define verbosePrint(fmt, ...) \
do { if (g_args.verbose_print) \
fprintf(stderr, "VERB:" fmt, __VA_ARGS__); } while(0)
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
void printHelp() { void printHelp() {
@ -692,6 +699,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->answer_yes = true; arguments->answer_yes = true;
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true; arguments->debug_print = true;
} else if (strcmp(argv[i], "-gg") == 0) {
arguments->verbose_print = true;
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) { } else if (strcmp(argv[i], "-O") == 0) {
@ -730,7 +739,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
} }
if (arguments->debug_print) { if (arguments->debug_print || arguments->verbose_print) {
printf("###################################################################\n"); printf("###################################################################\n");
printf("# meta file: %s\n", arguments->metaFile); printf("# meta file: %s\n", arguments->metaFile);
printf("# Server IP: %s:%hu\n", printf("# Server IP: %s:%hu\n",
@ -763,6 +772,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# Delete method: %d\n", arguments->method_of_delete); printf("# Delete method: %d\n", arguments->method_of_delete);
printf("# Answer yes when prompt: %d\n", arguments->answer_yes); printf("# Answer yes when prompt: %d\n", arguments->answer_yes);
printf("# Print debug info: %d\n", arguments->debug_print); printf("# Print debug info: %d\n", arguments->debug_print);
printf("# Print verbose info: %d\n", arguments->verbose_print);
printf("###################################################################\n"); printf("###################################################################\n");
if (!arguments->answer_yes) { if (!arguments->answer_yes) {
printf("Press enter key to continue\n\n"); printf("Press enter key to continue\n\n");
@ -806,7 +816,7 @@ static int queryDbExec(TAOS *taos, char *command, int type) {
} }
if (code != 0) { if (code != 0) {
debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); debugPrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
//taos_close(taos); //taos_close(taos);
@ -840,7 +850,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
char* databuf = (char*) calloc(1, 100*1024*1024); char* databuf = (char*) calloc(1, 100*1024*1024);
if (databuf == NULL) { if (databuf == NULL) {
fprintf(stderr, "failed to malloc, warning: save result to file slowly!\n"); fprintf(stderr, "failed to malloc, warning: save result to file slowly!\n");
fclose(fp); if (fp)
fclose(fp);
return ; return ;
} }
@ -998,7 +1009,7 @@ static int printfInsertMeta() {
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i); printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName); printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) { if (0 == g_Dbs.db[i].drop) {
printf(" drop: \033[33mno\033[0m\n"); printf(" drop: \033[33mno\033[0m\n");
}else { }else {
@ -1153,7 +1164,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "database count: %d\n", g_Dbs.dbCount); fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
fprintf(fp, "database[%d]:\n", i); fprintf(fp, "database[%d]:\n", i);
fprintf(fp, " database name: %s\n", g_Dbs.db[i].dbName); fprintf(fp, " database[%d] name: %s\n", i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) { if (0 == g_Dbs.db[i].drop) {
fprintf(fp, " drop: no\n"); fprintf(fp, " drop: no\n");
}else { }else {
@ -1677,6 +1688,7 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
ERROR_EXIT("ERROR storing complete response from socket"); ERROR_EXIT("ERROR storing complete response from socket");
} }
response_buf[RESP_BUF_LEN - 1] = '\0';
printf("Response:\n%s\n", response_buf); printf("Response:\n%s\n", response_buf);
free(request_buf); free(request_buf);
@ -1900,7 +1912,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe
TAOS_RES * res; TAOS_RES * res;
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int count = 0; int count = 0;
//get schema use cmd: describe superTblName; //get schema use cmd: describe superTblName;
snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName); snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName);
res = taos_query(taos, command); res = taos_query(taos, command);
@ -1921,16 +1933,30 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe
} }
if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) { if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) {
tstrncpy(superTbls->tags[tagIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); tstrncpy(superTbls->tags[tagIndex].field,
tstrncpy(superTbls->tags[tagIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
superTbls->tags[tagIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); tstrncpy(superTbls->tags[tagIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
tagIndex++; tagIndex++;
} else { } else {
tstrncpy(superTbls->columns[columnIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); tstrncpy(superTbls->columns[columnIndex].field,
tstrncpy(superTbls->columns[columnIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
superTbls->columns[columnIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
columnIndex++; columnIndex++;
} }
count++; count++;
@ -1945,7 +1971,9 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName; //get all child table name use cmd: select tbname from superTblName;
getAllChildNameOfSuperTable(taos, dbName, getAllChildNameOfSuperTable(taos, dbName,
superTbls->sTblName, &superTbls->childTblName, &superTbls->childTblCount); superTbls->sTblName,
&superTbls->childTblName,
&superTbls->childTblCount);
} }
return 0; return 0;
} }
@ -2013,7 +2041,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
exit(-1); exit(-1);
} }
snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable);
if (use_metric) { if (use_metric) {
char tags[STRING_LEN] = "\0"; char tags[STRING_LEN] = "\0";
@ -2066,13 +2094,13 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
snprintf(command, BUFFER_SIZE, snprintf(command, BUFFER_SIZE,
"create table if not exists %s.%s (ts timestamp%s) tags %s", "create table if not exists %s.%s (ts timestamp%s) tags %s",
dbName, superTbls->sTblName, cols, tags); dbName, superTbls->sTblName, cols, tags);
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName);
return -1; return -1;
} }
debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); verbosePrint("create supertable %s success!\n\n", superTbls->sTblName);
} }
return 0; return 0;
} }
@ -2091,7 +2119,7 @@ static int createDatabases() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) { if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName); sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
return -1; return -1;
@ -2159,7 +2187,7 @@ static int createDatabases() {
"precision \'%s\';", g_Dbs.db[i].dbCfg.precision); "precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
} }
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos); taos_close(taos);
printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
@ -2167,11 +2195,11 @@ static int createDatabases() {
} }
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); verbosePrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists // describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
@ -2259,7 +2287,7 @@ static void* createTable(void *sarg)
} }
len = 0; len = 0;
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer); free(buffer);
return NULL; return NULL;
@ -2274,7 +2302,7 @@ static void* createTable(void *sarg)
} }
if (0 != len) { if (0 != len) {
debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer); debugPrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE); (void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
} }
@ -2312,6 +2340,7 @@ int startMultiThreadCreateChildTable(
t_info->threadID = i; t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE); tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->superTblInfo = superTblInfo; t_info->superTblInfo = superTblInfo;
debugPrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
t_info->taos = taos_connect( t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.host,
g_Dbs.user, g_Dbs.user,
@ -2320,6 +2349,8 @@ int startMultiThreadCreateChildTable(
g_Dbs.port); g_Dbs.port);
if (t_info->taos == NULL) { if (t_info->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(pids);
free(infos);
return -1; return -1;
} }
t_info->start_table_id = last; t_info->start_table_id = last;
@ -2360,7 +2391,7 @@ static void createChildTables() {
continue; continue;
} }
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
@ -2370,26 +2401,25 @@ static void createChildTables() {
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
} }
} else { } else {
// normal table // normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
for (int i = 0; i < MAX_COLUMN_COUNT; i++) { int j = 0;
if (g_args.datatype[i]) { while (g_args.datatype[j]) {
if ((strncasecmp(g_args.datatype[i], "BINARY", strlen("BINARY")) == 0) if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[i], "NCHAR", strlen("NCHAR")) == 0)) { || (strncasecmp(g_args.datatype[j], "NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", i, g_args.datatype[i]); len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", j, g_args.datatype[j]);
} else { } else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", i, g_args.datatype[i]); len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", j, g_args.datatype[j]);
} }
len = strlen(tblColsBuf); len = strlen(tblColsBuf);
} else { j++;
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ")");
break;
} }
}
debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
tblColsBuf);
startMultiThreadCreateChildTable( verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
startMultiThreadCreateChildTable(
tblColsBuf, tblColsBuf,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
g_args.num_of_tables, g_args.num_of_tables,
@ -3633,7 +3663,7 @@ PARSE_OVER:
} }
static bool getInfoFromJsonFile(char* file) { static bool getInfoFromJsonFile(char* file) {
debugPrint("DEBUG - %s %d %s\n", __func__, __LINE__, file); debugPrint("%s %d %s\n", __func__, __LINE__, file);
FILE *fp = fopen(file, "r"); FILE *fp = fopen(file, "r");
if (!fp) { if (!fp) {
@ -3911,7 +3941,7 @@ static void syncWriteForNumberOfTblInOneSql(
} }
tmp_time = time_counter; tmp_time = time_counter;
for (k = 0; k < superTblInfo->rowsPerTbl;) { for (int j = 0; j < superTblInfo->rowsPerTbl;) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) { "sample", strlen("sample"))) {
@ -3947,7 +3977,7 @@ static void syncWriteForNumberOfTblInOneSql(
} }
len += retLen; len += retLen;
//inserted++; //inserted++;
k++; j++;
totalRowsInserted++; totalRowsInserted++;
if (inserted >= superTblInfo->insertRows || if (inserted >= superTblInfo->insertRows ||
@ -3966,7 +3996,7 @@ static void syncWriteForNumberOfTblInOneSql(
send_to_server: send_to_server:
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
int sleep_time = g_args.insert_interval - (et -st); int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time); debugPrint("sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
@ -3983,7 +4013,7 @@ send_to_server:
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer); debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec( int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE); winfo->taos, buffer, INSERT_TYPE);
@ -4124,11 +4154,10 @@ static void* syncWrite(void *sarg) {
int len_of_binary = g_args.len_of_binary; int len_of_binary = g_args.len_of_binary;
int ncols_per_record = 1; // count first col ts int ncols_per_record = 1; // count first col ts
for (int i = 0; i < MAX_COLUMN_COUNT; i ++) { int i = 0;
if (NULL == g_args.datatype[i]) while(g_args.datatype[i]) {
break; i ++;
else ncols_per_record ++;
ncols_per_record ++;
} }
srand((uint32_t)time(NULL)); srand((uint32_t)time(NULL));
@ -4184,14 +4213,14 @@ static void* syncWrite(void *sarg) {
if (i > 0 && g_args.insert_interval if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) { && (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st); int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time); debugPrint("sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
if (g_args.insert_interval) { if (g_args.insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1); int affectedRows = queryDbExec(winfo->taos, buffer, 1);
if (0 <= affectedRows){ if (0 <= affectedRows){
@ -4283,7 +4312,7 @@ static void* syncWriteWithStb(void *sarg) {
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
@ -4291,9 +4320,20 @@ static void* syncWriteWithStb(void *sarg) {
uint64_t inserted = i; uint64_t inserted = i;
uint64_t tmp_time = time_counter; uint64_t tmp_time = time_counter;
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
int sampleUsePos = samplePos; int sampleUsePos = samplePos;
int k = 0; int k = 0;
debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) { for (k = 0; k < g_args.num_of_RPR;) {
int len = 0; int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen); memset(buffer, 0, superTblInfo->maxSqlLen);
@ -4375,6 +4415,7 @@ static void* syncWriteWithStb(void *sarg) {
*/ */
inserted++; inserted++;
k++; k++;
i++;
totalRowsInserted++; totalRowsInserted++;
if (inserted > superTblInfo->insertRows) if (inserted > superTblInfo->insertRows)
@ -4383,17 +4424,6 @@ static void* syncWriteWithStb(void *sarg) {
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break; break;
*/ */
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer); //printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs(); //int64_t t1 = taosGetTimestampMs();
@ -4401,7 +4431,7 @@ static void* syncWriteWithStb(void *sarg) {
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){ if (0 > affectedRows){
@ -4566,6 +4596,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
/*
xxx getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
*/
int ntables = 0; int ntables = 0;
if (superTblInfo) if (superTblInfo)
ntables = superTblInfo->childTblCount; ntables = superTblInfo->childTblCount;
@ -4609,11 +4646,14 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec); start_time = taosGetTimestamp(timePrec);
} else { } else {
taosParseTime( if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp, superTblInfo->startTimestamp,
&start_time, &start_time,
strlen(superTblInfo->startTimestamp), strlen(superTblInfo->startTimestamp),
timePrec, 0); timePrec, 0)) {
printf("ERROR to parse time!\n");
exit(-1);
}
} }
} else { } else {
start_time = 1500000000000; start_time = 1500000000000;
@ -4881,7 +4921,7 @@ static int insertTestProcess() {
if (ret == -1) if (ret == -1)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile); debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) { if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
@ -5079,7 +5119,7 @@ static int queryTestProcess() {
} }
if (0 != g_queryInfo.subQueryInfo.sqlCount) { if (0 != g_queryInfo.subQueryInfo.sqlCount) {
(void)getAllChildNameOfSuperTable(taos, getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName, g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblName,
@ -5117,7 +5157,7 @@ static int queryTestProcess() {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE); (void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
@ -5228,7 +5268,7 @@ void *subSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){ if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
return NULL; return NULL;
} }
@ -5294,7 +5334,7 @@ void *superSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) { if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
return NULL; return NULL;
} }
@ -5376,7 +5416,7 @@ static int subscribeTestProcess() {
} }
if (0 != g_queryInfo.subQueryInfo.sqlCount) { if (0 != g_queryInfo.subQueryInfo.sqlCount) {
(void)getAllChildNameOfSuperTable(taos, getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName, g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblName,
@ -5659,7 +5699,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
} }
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd); debugPrint("%s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
queryDbExec(taos, cmd, NO_INSERT_TYPE); queryDbExec(taos, cmd, NO_INSERT_TYPE);
memset(cmd, 0, MAX_SQL_SIZE); memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0; cmd_len = 0;
@ -5691,14 +5731,7 @@ static void testMetaFile() {
} }
} }
static void testCmdLine() { static void queryResult() {
g_args.test_mode = INSERT_MODE;
insertTestProcess();
if (g_Dbs.insert_only)
return;
// select // select
if (false == g_Dbs.insert_only) { if (false == g_Dbs.insert_only) {
// query data // query data
@ -5744,10 +5777,21 @@ static void testCmdLine() {
} }
} }
static void testCmdLine() {
g_args.test_mode = INSERT_MODE;
insertTestProcess();
if (g_Dbs.insert_only)
return;
else
queryResult();
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args); parse_args(argc, argv, &g_args);
debugPrint("DEBUG - meta file: %s\n", g_args.metaFile); debugPrint("meta file: %s\n", g_args.metaFile);
if (g_args.metaFile) { if (g_args.metaFile) {
initOfInsertMeta(); initOfInsertMeta();

View File

@ -769,6 +769,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
} }
sprintf(tmpBuf, ".select-tbname.tmp"); sprintf(tmpBuf, ".select-tbname.tmp");
(void)remove(tmpBuf); (void)remove(tmpBuf);
free(tblBuf);
close(fd); close(fd);
return -1; return -1;
} }
@ -1523,6 +1524,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
} }
sprintf(tmpBuf, ".show-tables.tmp"); sprintf(tmpBuf, ".show-tables.tmp");
(void)remove(tmpBuf); (void)remove(tmpBuf);
free(tblBuf);
close(fd); close(fd);
return -1; return -1;
} }

View File

@ -28,12 +28,13 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
tdSql.execute("CREATE TABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int)") tdSql.execute("CREATE TABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int, t3 float, t4 double)")
tdSql.execute("CREATE TABLE D1001 USING meters TAGS ('Beijing.Chaoyang', 2)") tdSql.execute("CREATE TABLE D1001 USING meters TAGS ('Beijing.Chaoyang', 2 , NULL, NULL)")
tdSql.execute("CREATE TABLE D1002 USING meters TAGS ('Beijing.Chaoyang', 3)") tdSql.execute("CREATE TABLE D1002 USING meters TAGS ('Beijing.Chaoyang', 3 , NULL , 1.7)")
tdSql.execute("CREATE TABLE D1003 USING meters TAGS ('Beijing.Chaoyang', 3 , 1.1 , 1.7)")
tdSql.execute("INSERT INTO D1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) (1538548696800, 12.3, 221, 0.31)") tdSql.execute("INSERT INTO D1001 VALUES (1538548685000, 10.3, 219, 0.31) (1538548695000, 12.6, 218, 0.33) (1538548696800, 12.3, 221, 0.31)")
tdSql.execute("INSERT INTO D1002 VALUES (1538548685001, 10.5, 220, 0.28) (1538548696800, 12.3, 221, 0.31)") tdSql.execute("INSERT INTO D1002 VALUES (1538548685001, 10.5, 220, 0.28) (1538548696800, 12.3, 221, 0.31)")
tdSql.execute("INSERT INTO D1003 VALUES (1538548685001, 10.5, 220, 0.28) (1538548696800, 12.3, 221, 0.31)")
tdSql.query("SELECT SUM(current), AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by ts DESC") tdSql.query("SELECT SUM(current), AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by ts DESC")
tdSql.checkRows(3) tdSql.checkRows(3)
tdSql.checkData(0, 0, "2018-10-03 14:38:16") tdSql.checkData(0, 0, "2018-10-03 14:38:16")
@ -49,6 +50,12 @@ class TDTestCase:
tdSql.error("SELECT SUM(current) as s, AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by s ASC") tdSql.error("SELECT SUM(current) as s, AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by s ASC")
tdSql.error("SELECT SUM(current) as s, AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by s DESC") tdSql.error("SELECT SUM(current) as s, AVG(voltage) FROM meters WHERE groupId > 1 INTERVAL(1s) GROUP BY location order by s DESC")
#add for TD-3170
tdSql.query("select avg(current) from meters group by t3;")
tdSql.checkData(0, 0, 11.6)
tdSql.query("select avg(current) from meters group by t4;")
tdSql.query("select avg(current) from meters group by t3,t4;")
def stop(self): def stop(self):
tdSql.close() tdSql.close()