Merge branch 'master' of github.com:taosdata/TDengine into test/chr

This commit is contained in:
tomchon 2021-05-19 13:23:22 +08:00
commit abe9b75667
4 changed files with 285 additions and 122 deletions

View File

@ -5,13 +5,33 @@
"port": 6030, "port": 6030,
"user": "root", "user": "root",
"password": "taosdata", "password": "taosdata",
"databases": "dbx", "databases": "test",
"specified_table_query": "specified_table_query": {
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes", "concurrent": 1,
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] "mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select avg(col1) from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
}
]
}, },
"super_table_query": "super_table_query": {
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", "stblname": "meters",
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] "threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
} }
} }

View File

@ -360,10 +360,11 @@ typedef struct SpecifiedQueryInfo_S {
uint32_t asyncMode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
uint64_t queryTimes; uint64_t queryTimes;
int subscribeRestart; bool subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried; uint64_t totalQueried;
} SpecifiedQueryInfo; } SpecifiedQueryInfo;
@ -374,7 +375,7 @@ typedef struct SuperQueryInfo_S {
uint32_t threadCnt; uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
int subscribeRestart; bool subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
uint64_t queryTimes; uint64_t queryTimes;
int64_t childTblCount; int64_t childTblCount;
@ -382,6 +383,7 @@ typedef struct SuperQueryInfo_S {
uint64_t sqlCount; uint64_t sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
@ -516,11 +518,13 @@ static int taosRandom()
#endif // ifdef Windows #endif // ifdef Windows
static void prompt();
static void prompt2();
static int createDatabasesAndStables(); static int createDatabasesAndStables();
static void createChildTables(); static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, static int postProceSql(char *host, struct sockaddr_in *pServAddr,
char* sqlstr, char *resultFile); uint16_t port, char* sqlstr, char *resultFile);
/* ************ Global variables ************ */ /* ************ Global variables ************ */
@ -1031,10 +1035,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# Print debug info: %d\n", arguments->debug_print); printf("# Print debug info: %d\n", arguments->debug_print);
printf("# Print verbose info: %d\n", arguments->verbose_print); printf("# Print verbose info: %d\n", arguments->verbose_print);
printf("###################################################################\n"); printf("###################################################################\n");
if (!arguments->answer_yes) {
printf("Press enter key to continue\n\n"); prompt();
(void) getchar();
}
} }
} }
@ -1141,12 +1143,14 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
totalLen += len; totalLen += len;
} }
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile); verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
__func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile); appendResultBufToFile(databuf, resultFile);
free(databuf); free(databuf);
} }
static void selectAndGetResult(threadInfo *pThreadInfo, char *command, char* resultFile) static void selectAndGetResult(
threadInfo *pThreadInfo, char *command, char* resultFile)
{ {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command); TAOS_RES *res = taos_query(pThreadInfo->taos, command);
@ -1291,13 +1295,15 @@ static void init_rand_data() {
static int printfInsertMeta() { static int printfInsertMeta() {
SHOW_PARSE_RESULT_START(); SHOW_PARSE_RESULT_START();
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("configDir: \033[33m%s\033[0m\n", configDir); printf("configDir: \033[33m%s\033[0m\n", configDir);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); printf("thread num of create table: \033[33m%d\033[0m\n",
g_Dbs.threadCountByCreateTbl);
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
g_args.insert_interval); g_args.insert_interval);
printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", printf("number of records per req: \033[33m%"PRIu64"\033[0m\n",
@ -1309,7 +1315,8 @@ static int printfInsertMeta() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i); printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName); printf(" database[%d] name: \033[33m%s\033[0m\n",
i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) { if (0 == g_Dbs.db[i].drop) {
printf(" drop: \033[33mno\033[0m\n"); printf(" drop: \033[33mno\033[0m\n");
} else { } else {
@ -1317,40 +1324,51 @@ static int printfInsertMeta() {
} }
if (g_Dbs.db[i].dbCfg.blocks > 0) { if (g_Dbs.db[i].dbCfg.blocks > 0) {
printf(" blocks: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.blocks); printf(" blocks: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.blocks);
} }
if (g_Dbs.db[i].dbCfg.cache > 0) { if (g_Dbs.db[i].dbCfg.cache > 0) {
printf(" cache: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.cache); printf(" cache: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.cache);
} }
if (g_Dbs.db[i].dbCfg.days > 0) { if (g_Dbs.db[i].dbCfg.days > 0) {
printf(" days: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.days); printf(" days: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.days);
} }
if (g_Dbs.db[i].dbCfg.keep > 0) { if (g_Dbs.db[i].dbCfg.keep > 0) {
printf(" keep: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.keep); printf(" keep: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.keep);
} }
if (g_Dbs.db[i].dbCfg.replica > 0) { if (g_Dbs.db[i].dbCfg.replica > 0) {
printf(" replica: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.replica); printf(" replica: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.replica);
} }
if (g_Dbs.db[i].dbCfg.update > 0) { if (g_Dbs.db[i].dbCfg.update > 0) {
printf(" update: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.update); printf(" update: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.update);
} }
if (g_Dbs.db[i].dbCfg.minRows > 0) { if (g_Dbs.db[i].dbCfg.minRows > 0) {
printf(" minRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.minRows); printf(" minRows: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.minRows);
} }
if (g_Dbs.db[i].dbCfg.maxRows > 0) { if (g_Dbs.db[i].dbCfg.maxRows > 0) {
printf(" maxRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.maxRows); printf(" maxRows: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.maxRows);
} }
if (g_Dbs.db[i].dbCfg.comp > 0) { if (g_Dbs.db[i].dbCfg.comp > 0) {
printf(" comp: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.comp); printf(" comp: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.comp);
} }
if (g_Dbs.db[i].dbCfg.walLevel > 0) { if (g_Dbs.db[i].dbCfg.walLevel > 0) {
printf(" walLevel: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.walLevel); printf(" walLevel: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.walLevel);
} }
if (g_Dbs.db[i].dbCfg.fsync > 0) { if (g_Dbs.db[i].dbCfg.fsync > 0) {
printf(" fsync: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.fsync); printf(" fsync: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.fsync);
} }
if (g_Dbs.db[i].dbCfg.quorum > 0) { if (g_Dbs.db[i].dbCfg.quorum > 0) {
printf(" quorum: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.quorum); printf(" quorum: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.quorum);
} }
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
@ -1543,21 +1561,26 @@ static void printfInsertMetaToFile(FILE* fp) {
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
fprintf(fp, " precision: %s\n", g_Dbs.db[i].dbCfg.precision); fprintf(fp, " precision: %s\n",
g_Dbs.db[i].dbCfg.precision);
} else { } else {
fprintf(fp, " precision error: %s\n", g_Dbs.db[i].dbCfg.precision); fprintf(fp, " precision error: %s\n",
g_Dbs.db[i].dbCfg.precision);
} }
} }
fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount); fprintf(fp, " super table count: %"PRIu64"\n",
g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
fprintf(fp, " super table[%d]:\n", j); fprintf(fp, " super table[%d]:\n", j);
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName); fprintf(fp, " stbName: %s\n",
g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "no"); fprintf(fp, " autoCreateTable: %s\n", "no");
} else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { } else if (AUTO_CREATE_SUBTBL
== g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "yes"); fprintf(fp, " autoCreateTable: %s\n", "yes");
} else { } else {
fprintf(fp, " autoCreateTable: %s\n", "error"); fprintf(fp, " autoCreateTable: %s\n", "error");
@ -1565,7 +1588,8 @@ static void printfInsertMetaToFile(FILE* fp) {
if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
fprintf(fp, " childTblExists: %s\n", "no"); fprintf(fp, " childTblExists: %s\n", "no");
} else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { } else if (TBL_ALREADY_EXISTS
== g_Dbs.db[i].superTbls[j].childTblExists) {
fprintf(fp, " childTblExists: %s\n", "yes"); fprintf(fp, " childTblExists: %s\n", "yes");
} else { } else {
fprintf(fp, " childTblExists: %s\n", "error"); fprintf(fp, " childTblExists: %s\n", "error");
@ -1596,8 +1620,10 @@ static void printfInsertMetaToFile(FILE* fp) {
*/ */
fprintf(fp, " interlaceRows: %"PRIu64"\n", fprintf(fp, " interlaceRows: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].interlaceRows); g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRange: %d\n",
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n",
g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " maxSqlLen: %"PRIu64"\n", fprintf(fp, " maxSqlLen: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].maxSqlLen); g_Dbs.db[i].superTbls[j].maxSqlLen);
@ -1605,23 +1631,29 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].timeStampStep); g_Dbs.db[i].superTbls[j].timeStampStep);
fprintf(fp, " startTimestamp: %s\n", fprintf(fp, " startTimestamp: %s\n",
g_Dbs.db[i].superTbls[j].startTimestamp); g_Dbs.db[i].superTbls[j].startTimestamp);
fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat); fprintf(fp, " sampleFormat: %s\n",
fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile); g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile); fprintf(fp, " sampleFile: %s\n",
g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n",
g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount); fprintf(fp, " columnCount: %d\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp( if ((0 == strncasecmp(
g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataType,
"binary", strlen("binary"))) "binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, || (0 == strncasecmp(
g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", strlen("nchar")))) { "nchar", strlen("nchar")))) {
fprintf(fp, "column[%d]:%s(%d) ", k, fprintf(fp, "column[%d]:%s(%d) ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen); g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else { } else {
fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); fprintf(fp, "column[%d]:%s ",
k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
} }
} }
fprintf(fp, "\n"); fprintf(fp, "\n");
@ -1634,7 +1666,8 @@ static void printfInsertMetaToFile(FILE* fp) {
"binary", strlen("binary"))) "binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType,
"nchar", strlen("nchar")))) { "nchar", strlen("nchar")))) {
fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, fprintf(fp, "tag[%d]:%s(%d) ",
k, g_Dbs.db[i].superTbls[j].tags[k].dataType,
g_Dbs.db[i].superTbls[j].tags[k].dataLen); g_Dbs.db[i].superTbls[j].tags[k].dataLen);
} else { } else {
fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType);
@ -3410,10 +3443,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_args.interlace_rows, g_args.num_of_RPR); g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
g_args.num_of_RPR); g_args.num_of_RPR);
if (!g_args.answer_yes) { prompt2();
printf(" press Enter key to continue or Ctrl-C to stop.");
(void)getchar();
}
g_args.interlace_rows = g_args.num_of_RPR; g_args.interlace_rows = g_args.num_of_RPR;
} }
} else if (!interlaceRows) { } else if (!interlaceRows) {
@ -3470,9 +3500,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_args.answer_yes = false; g_args.answer_yes = false;
} }
} else if (!answerPrompt) { } else if (!answerPrompt) {
g_args.answer_yes = false; g_args.answer_yes = true; // default is no, mean answer_yes.
} else { } else {
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -4131,7 +4161,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
"query_times"); "query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
if (specifiedQueryTimes->valueint <= 0) { if (specifiedQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", errorPrint(
"%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, specifiedQueryTimes->valueint); __func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER; goto PARSE_OVER;
@ -4148,7 +4179,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) { if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) { if (concurrent->valueint <= 0) {
errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n", errorPrint(
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent); g_queryInfo.specifiedQueryInfo.concurrent);
@ -4187,15 +4219,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) { if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", restart->valuestring)) { } else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 0; g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else { } else {
printf("ERROR: failed to read json, subscribe restart error\n"); printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
} }
cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
@ -4240,13 +4272,29 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
printf("ERROR: failed to read json, sql not found\n"); printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume
&& resubAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { if ((NULL != result) && (result->type == cJSON_String)
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); && (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, super query result file not found\n"); printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER; goto PARSE_OVER;
@ -4353,27 +4401,27 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (subrestart && subrestart->type == cJSON_String if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) { && subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) { if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", subrestart->valuestring)) { } else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 0; g_queryInfo.superQueryInfo.subscribeRestart = false;
} else { } else {
printf("ERROR: failed to read json, subscribe restart error\n"); printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = true;
} }
cJSON* subkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (subkeepProgress && if (superkeepProgress &&
subkeepProgress->type == cJSON_String superkeepProgress->type == cJSON_String
&& subkeepProgress->valuestring != NULL) { && superkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) { if (0 == strcmp("yes", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", subkeepProgress->valuestring)) { } else if (0 == strcmp("no", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else { } else {
printf("ERROR: failed to read json, subscribe keepProgress error\n"); printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
@ -4411,6 +4459,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH); MAX_QUERY_SQL_LENGTH);
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(sql, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){ && result->valuestring != NULL){
@ -5342,6 +5402,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t start_time = pThreadInfo->start_time; int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (uint64_t i = 0; i < insertRows;) { for (uint64_t i = 0; i < insertRows;) {
@ -5508,7 +5569,8 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) { && rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, pThreadInfo->superTblInfo); generateRowData(data, d, pThreadInfo->superTblInfo);
} else { } else {
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
@ -6022,6 +6084,21 @@ static void *readMetric(void *sarg) {
return NULL; return NULL;
} }
static void prompt()
{
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
(void)getchar();
}
}
static void prompt2()
{
if (!g_args.answer_yes) {
printf(" press Enter key to continue or Ctrl-C to stop.");
(void)getchar();
}
}
static int insertTestProcess() { static int insertTestProcess() {
@ -6042,10 +6119,7 @@ static int insertTestProcess() {
if (g_fpOfInsertResult) if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult); printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) { prompt();
printf("Press enter key to continue\n\n");
(void)getchar();
}
init_rand_data(); init_rand_data();
@ -6317,10 +6391,7 @@ static int queryTestProcess() {
&g_queryInfo.superQueryInfo.childTblCount); &g_queryInfo.superQueryInfo.childTblCount);
} }
if (!g_args.answer_yes) { prompt();
printf("Press enter key to continue\n\n");
(void)getchar();
}
if (g_args.debug_print || g_args.verbose_print) { if (g_args.debug_print || g_args.verbose_print) {
printfQuerySystemInfo(taos); printfQuerySystemInfo(taos);
@ -6473,17 +6544,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
} }
static TAOS_SUB* subscribeImpl( static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, char* resultFileName) { TAOS *taos, char *sql, char* topic, bool restart,
char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, restart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else { } else {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, restart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, 0);
} }
@ -6503,6 +6575,15 @@ static void *superSubscribe(void *sarg) {
if (g_queryInfo.superQueryInfo.sqlCount == 0) if (g_queryInfo.superQueryInfo.sqlCount == 0)
return NULL; return NULL;
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT);
exit(-1);
}
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
@ -6544,7 +6625,9 @@ static void *superSubscribe(void *sarg) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr); __func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) { if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
@ -6552,17 +6635,22 @@ static void *superSubscribe(void *sarg) {
} }
} }
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue; continue;
} }
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(100); // ms taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[subSeq]); res = taos_consume(tsub[subSeq]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
@ -6572,6 +6660,28 @@ static void *superSubscribe(void *sarg) {
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, tmpFile);
} }
consumed[j] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) {
printf("keepProgress:%d, resub super table query: %d\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j);
taos_unsubscribe(tsub[subSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0;
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(
pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
} }
} }
} }
@ -6582,8 +6692,7 @@ static void *superSubscribe(void *sarg) {
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq], taos_unsubscribe(tsub[subSeq], 0);
g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
} }
@ -6628,40 +6737,68 @@ static void *specifiedSubscribe(void *sarg) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
} }
tsub[i] = subscribeImpl(pThreadInfo->taos, tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) { if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
taosMsleep(1000); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]); res = taos_consume(tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, tmpFile);
} }
consumed[i] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed[i] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) {
printf("keepProgress:%d, resub specified query: %d\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i);
consumed[i] = 0;
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
} }
} }
} }
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], taos_unsubscribe(tsub[i], 0);
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
@ -6673,10 +6810,7 @@ static int subscribeTestProcess() {
printfQueryMeta(); printfQueryMeta();
resetAfterAnsiEscape(); resetAfterAnsiEscape();
if (!g_args.answer_yes) { prompt();
printf("Press enter key to continue\n\n");
(void) getchar();
}
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
@ -6715,8 +6849,10 @@ static int subscribeTestProcess() {
exit(-1); exit(-1);
} }
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); pids = malloc(
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1); exit(-1);
@ -6901,17 +7037,21 @@ static void setParaFromArg(){
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) { if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR; g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
} else { } else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) { for (int i = g_Dbs.db[0].superTbls[0].columnCount;
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE); i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0; g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++; g_Dbs.db[0].superTbls[0].columnCount++;
} }
} }
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2; g_Dbs.db[0].superTbls[0].tagCount = 2;
} else { } else {

View File

@ -340,8 +340,11 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
if (pWrite->processedCount >= 100) { if (pWrite->processedCount >= 100) {
vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code), vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code),
pWrite->processedCount); pWrite->processedCount);
pWrite->processedCount = 1; void *handle = pWrite->rpcMsg.handle;
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); taosFreeQitem(pWrite);
vnodeRelease(pVnode);
SRpcMsg rpcRsp = {.handle = handle, .code = code};
rpcSendResponse(&rpcRsp);
} else { } else {
code = vnodePerformFlowCtrl(pWrite); code = vnodePerformFlowCtrl(pWrite);
if (code == 0) { if (code == 0) {

View File

@ -27,15 +27,15 @@ class TDTestCase:
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath): if "community" in selfPath:
projPath = selfPath[: selfPath.find("community")] projPath = selfPath[: selfPath.find("community")]
else: else:
projPath = selfPath[: selfPath.find("tests")] projPath = selfPath[: selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files): if "taosd" in files:
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if "packaging" not in rootRealPath:
buildPath = root[: len(root) - len("/build/bin")] buildPath = root[: len(root) - len("/build/bin")]
break break
return buildPath return buildPath
@ -43,12 +43,12 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
buildPath = self.getBuildPath() buildPath = self.getBuildPath()
if (buildPath == ""): if buildPath == "":
tdLog.exit("taosd not found!") tdLog.exit("taosd not found!")
else: else:
tdLog.info("taosd found in %s" % buildPath) tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/" binPath = buildPath + "/build/bin/"
os.system("yes | %staosdemo -f tools/insert.json" % binPath) os.system("%staosdemo -f tools/insert.json -y" % binPath)
tdSql.execute("use db01") tdSql.execute("use db01")
tdSql.query("select count(*) from stb01") tdSql.query("select count(*) from stb01")