Feature/sangshuduo/td 3408 taosdemo async query (#5730)
* [TD-3408]<feature>: taosdemo support async query. * [TD-3408]<feature>: taosdemo support async query. refactor * [TD-3408]<feature>: taosdemo support async query. refactor 2 * [TD-3408]<feature>: taosdemo support async query. refactor 3 * [TD-3408]<feature>: taosdemo support async query. refactor 4 * [TD-3408]<feature>: taosdemo support specified sql more than one line. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
910b78f290
commit
6a0af1593f
|
@ -67,6 +67,12 @@ enum TEST_MODE {
|
|||
INVAID_TEST
|
||||
};
|
||||
|
||||
enum QUERY_MODE {
|
||||
SYNC_QUERY_MODE, // 0
|
||||
ASYNC_QUERY_MODE, // 1
|
||||
INVALID_MODE
|
||||
};
|
||||
|
||||
#define MAX_SQL_SIZE 65536
|
||||
#define BUFFER_SIZE (65536*2)
|
||||
#define MAX_USERNAME_SIZE 64
|
||||
|
@ -198,7 +204,7 @@ typedef struct SArguments_S {
|
|||
bool verbose_print;
|
||||
bool performance_print;
|
||||
char * output_file;
|
||||
int mode;
|
||||
int query_mode;
|
||||
char * datatype[MAX_NUM_DATATYPE + 1];
|
||||
int len_of_binary;
|
||||
int num_of_CPR;
|
||||
|
@ -351,7 +357,7 @@ typedef struct SpecifiedQueryInfo_S {
|
|||
int rate; // 0: unlimit > 0 loop/s
|
||||
int concurrent;
|
||||
int sqlCount;
|
||||
int subscribeMode; // 0: sync, 1: async
|
||||
int mode; // 0: sync, 1: async
|
||||
int subscribeInterval; // ms
|
||||
int queryTimes;
|
||||
int subscribeRestart;
|
||||
|
@ -365,7 +371,7 @@ typedef struct SuperQueryInfo_S {
|
|||
char sTblName[MAX_TB_NAME_SIZE+1];
|
||||
int rate; // 0: unlimit > 0 loop/s
|
||||
int threadCnt;
|
||||
int subscribeMode; // 0: sync, 1: async
|
||||
int mode; // 0: sync, 1: async
|
||||
int subscribeInterval; // ms
|
||||
int subscribeRestart;
|
||||
int subscribeKeepProgress;
|
||||
|
@ -429,6 +435,8 @@ typedef struct SThreadInfo_S {
|
|||
int64_t maxDelay;
|
||||
int64_t minDelay;
|
||||
|
||||
// query
|
||||
int querySeq; // sequence number of sql command
|
||||
} threadInfo;
|
||||
|
||||
#ifdef WINDOWS
|
||||
|
@ -714,7 +722,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
} else if (strcmp(argv[i], "-s") == 0) {
|
||||
arguments->sqlFile = argv[++i];
|
||||
} else if (strcmp(argv[i], "-q") == 0) {
|
||||
arguments->mode = atoi(argv[++i]);
|
||||
arguments->query_mode = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-T") == 0) {
|
||||
arguments->num_of_threads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") == 0) {
|
||||
|
@ -758,7 +766,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
char *dupstr = strdup(argv[i]);
|
||||
char *running = dupstr;
|
||||
char *token = strsep(&running, ",");
|
||||
while (token != NULL) {
|
||||
while(token != NULL) {
|
||||
if (strcasecmp(token, "INT")
|
||||
&& strcasecmp(token, "FLOAT")
|
||||
&& strcasecmp(token, "TINYINT")
|
||||
|
@ -964,7 +972,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
|
|||
char temp[16000];
|
||||
|
||||
// fetch the records row by row
|
||||
while ((row = taos_fetch_row(res))) {
|
||||
while((row = taos_fetch_row(res))) {
|
||||
if (totalLen >= 100*1024*1024 - 32000) {
|
||||
if (fp) fprintf(fp, "%s", databuf);
|
||||
totalLen = 0;
|
||||
|
@ -986,7 +994,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
|
|||
static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) {
|
||||
TAOS_RES *res = taos_query(taos, command);
|
||||
if (res == NULL || taos_errno(res) != 0) {
|
||||
printf("failed to sql:%s, reason:%s\n", command, taos_errstr(res));
|
||||
errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n",
|
||||
__func__, __LINE__, command, taos_errstr(res));
|
||||
taos_free_result(res);
|
||||
return;
|
||||
}
|
||||
|
@ -1163,7 +1172,8 @@ static int printfInsertMeta() {
|
|||
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
|
||||
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|
||||
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
|
||||
printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision);
|
||||
printf(" precision: \033[33m%s\033[0m\n",
|
||||
g_Dbs.db[i].dbCfg.precision);
|
||||
} else {
|
||||
printf("\033[1m\033[40;31m precision error: %s\033[0m\n",
|
||||
g_Dbs.db[i].dbCfg.precision);
|
||||
|
@ -1171,11 +1181,13 @@ static int printfInsertMeta() {
|
|||
}
|
||||
}
|
||||
|
||||
printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount);
|
||||
printf(" super table count: \033[33m%d\033[0m\n",
|
||||
g_Dbs.db[i].superTblCount);
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
printf(" super table[\033[33m%d\033[0m]:\n", j);
|
||||
|
||||
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName);
|
||||
printf(" stbName: \033[33m%s\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].sTblName);
|
||||
|
||||
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
|
||||
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
|
||||
|
@ -1241,7 +1253,7 @@ static int printfInsertMeta() {
|
|||
g_Dbs.db[i].superTbls[j].sampleFile);
|
||||
printf(" tagsFile: \033[33m%s\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].tagsFile);
|
||||
printf(" columnCount: \033[33m%d\033[0m\n ",
|
||||
printf(" columnCount: \033[33m%d\033[0m\n",
|
||||
g_Dbs.db[i].superTbls[j].columnCount);
|
||||
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
|
||||
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
|
||||
|
@ -1459,41 +1471,61 @@ static void printfQueryMeta() {
|
|||
|
||||
printf("\n");
|
||||
printf("specified table query info: \n");
|
||||
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate);
|
||||
printf("query interval: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.rate);
|
||||
printf("top query times:\033[33m%d\033[0m\n", g_args.query_times);
|
||||
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount);
|
||||
printf("concurrent: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.concurrent);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.sqlCount);
|
||||
printf("specified tbl query times:\n");
|
||||
printf(" \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.queryTimes);
|
||||
printf(" \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.queryTimes);
|
||||
|
||||
if (SUBSCRIBE_TEST == g_args.test_mode) {
|
||||
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode);
|
||||
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
printf("mod: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.mode);
|
||||
printf("interval: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]);
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n",
|
||||
i, g_queryInfo.specifiedQueryInfo.sql[i]);
|
||||
}
|
||||
printf("\n");
|
||||
printf("super table query info: \n");
|
||||
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate);
|
||||
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt);
|
||||
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount);
|
||||
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName);
|
||||
printf("stb query times:\033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.queryTimes);
|
||||
printf("super table query info:\n");
|
||||
printf("query interval: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.rate);
|
||||
printf("threadCnt: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.threadCnt);
|
||||
printf("childTblCount: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.childTblCount);
|
||||
printf("stable name: \033[33m%s\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.sTblName);
|
||||
printf("stb query times:\033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.queryTimes);
|
||||
|
||||
if (SUBSCRIBE_TEST == g_args.test_mode) {
|
||||
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
|
||||
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
printf("mod: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.mode);
|
||||
printf("interval: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.subscribeInterval);
|
||||
printf("restart: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.subscribeRestart);
|
||||
printf("keepProgress: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
|
||||
printf("sqlCount: \033[33m%d\033[0m\n",
|
||||
g_queryInfo.superQueryInfo.sqlCount);
|
||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]);
|
||||
printf(" sql[%d]: \033[33m%s\033[0m\n",
|
||||
i, g_queryInfo.superQueryInfo.sql[i]);
|
||||
}
|
||||
printf("\n");
|
||||
|
||||
|
@ -1637,7 +1669,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
|||
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
while((row = taos_fetch_row(res)) != NULL) {
|
||||
// sys database name : 'log'
|
||||
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
|
||||
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) {
|
||||
|
@ -1670,7 +1702,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
|||
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
|
||||
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
|
||||
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
|
||||
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
|
||||
dbInfos[count]->cachelast =
|
||||
(int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
|
||||
|
||||
tstrncpy(dbInfos[count]->precision,
|
||||
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
|
||||
|
@ -1681,7 +1714,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
|||
|
||||
count++;
|
||||
if (count > MAX_DATABASE_COUNT) {
|
||||
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
|
||||
errorPrint("%s() LN%d, The database count overflow than %d\n",
|
||||
__func__, __LINE__, MAX_DATABASE_COUNT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1691,6 +1725,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
|||
|
||||
static void printfDbInfoForQueryToFile(
|
||||
char* filename, SDbInfo* dbInfos, int index) {
|
||||
|
||||
if (filename[0] == 0)
|
||||
return;
|
||||
|
||||
|
@ -1909,7 +1944,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
if (bytes == 0)
|
||||
break;
|
||||
sent+=bytes;
|
||||
} while (sent < req_str_len);
|
||||
} while(sent < req_str_len);
|
||||
|
||||
memset(response_buf, 0, RESP_BUF_LEN);
|
||||
resp_len = sizeof(response_buf) - 1;
|
||||
|
@ -1927,7 +1962,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
if (bytes == 0)
|
||||
break;
|
||||
received += bytes;
|
||||
} while (received < resp_len);
|
||||
} while(received < resp_len);
|
||||
|
||||
if (received == resp_len) {
|
||||
free(request_buf);
|
||||
|
@ -1951,7 +1986,8 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
|
|||
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
|
||||
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
|
||||
if (NULL == dataBuf) {
|
||||
errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1);
|
||||
errorPrint("%s() LN%d, calloc failed! size:%d\n",
|
||||
__func__, __LINE__, TSDB_MAX_SQL_LEN+1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2155,7 +2191,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
|
|||
}
|
||||
|
||||
char* pTblName = childTblName;
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
while((row = taos_fetch_row(res)) != NULL) {
|
||||
int32_t* len = taos_fetch_lengths(res);
|
||||
tstrncpy(pTblName, (char *)row[0], len[0]+1);
|
||||
//printf("==== sub table name: %s\n", pTblName);
|
||||
|
@ -2218,7 +2254,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
|
|||
int tagIndex = 0;
|
||||
int columnIndex = 0;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
while ((row = taos_fetch_row(res)) != NULL) {
|
||||
while((row = taos_fetch_row(res)) != NULL) {
|
||||
if (0 == count) {
|
||||
count++;
|
||||
continue;
|
||||
|
@ -2765,7 +2801,7 @@ static void createChildTables() {
|
|||
// normal table
|
||||
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
|
||||
int j = 0;
|
||||
while (g_args.datatype[j]) {
|
||||
while(g_args.datatype[j]) {
|
||||
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|
||||
|| (strncasecmp(g_args.datatype[j],
|
||||
"NCHAR", strlen("NCHAR")) == 0)) {
|
||||
|
@ -2824,7 +2860,7 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
while ((readLen = tgetline(&line, &n, fp)) != -1) {
|
||||
while((readLen = tgetline(&line, &n, fp)) != -1) {
|
||||
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
|
||||
line[--readLen] = 0;
|
||||
}
|
||||
|
@ -2888,7 +2924,7 @@ static int readSampleFromCsvFileToMem(
|
|||
assert(superTblInfo->sampleDataBuf);
|
||||
memset(superTblInfo->sampleDataBuf, 0,
|
||||
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
|
||||
while (1) {
|
||||
while(1) {
|
||||
readLen = tgetline(&line, &n, fp);
|
||||
if (-1 == readLen) {
|
||||
if(0 != fseek(fp, 0, SEEK_SET)) {
|
||||
|
@ -2967,7 +3003,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
if (countObj && countObj->type == cJSON_Number) {
|
||||
count = countObj->valueint;
|
||||
} else if (countObj && countObj->type != cJSON_Number) {
|
||||
errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, column count not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
} else {
|
||||
count = 1;
|
||||
|
@ -2976,8 +3013,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
// column info
|
||||
memset(&columnCase, 0, sizeof(StrColumn));
|
||||
cJSON *dataType = cJSON_GetObjectItem(column, "type");
|
||||
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
|
||||
errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__);
|
||||
if (!dataType || dataType->type != cJSON_String
|
||||
|| dataType->valuestring == NULL) {
|
||||
errorPrint("%s() LN%d: failed to read json, column type not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
|
||||
|
@ -2987,7 +3026,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
if (dataLen && dataLen->type == cJSON_Number) {
|
||||
columnCase.dataLen = dataLen->valueint;
|
||||
} else if (dataLen && dataLen->type != cJSON_Number) {
|
||||
debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__);
|
||||
debugPrint("%s() LN%d: failed to read json, column len not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
} else {
|
||||
columnCase.dataLen = 8;
|
||||
|
@ -3007,13 +3047,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
// tags
|
||||
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
|
||||
if (!tags || tags->type != cJSON_Array) {
|
||||
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, tags not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
int tagSize = cJSON_GetArraySize(tags);
|
||||
if (tagSize > MAX_TAG_COUNT) {
|
||||
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT);
|
||||
errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
|
||||
__func__, __LINE__, MAX_TAG_COUNT);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3036,8 +3078,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
// column info
|
||||
memset(&columnCase, 0, sizeof(StrColumn));
|
||||
cJSON *dataType = cJSON_GetObjectItem(tag, "type");
|
||||
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
|
||||
printf("ERROR: failed to read json, tag type not found\n");
|
||||
if (!dataType || dataType->type != cJSON_String
|
||||
|| dataType->valuestring == NULL) {
|
||||
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
|
||||
|
@ -3046,14 +3090,16 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
if (dataLen && dataLen->type == cJSON_Number) {
|
||||
columnCase.dataLen = dataLen->valueint;
|
||||
} else if (dataLen && dataLen->type != cJSON_Number) {
|
||||
printf("ERROR: failed to read json, column len not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, column len not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
} else {
|
||||
columnCase.dataLen = 0;
|
||||
}
|
||||
|
||||
for (int n = 0; n < count; ++n) {
|
||||
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType, MAX_TB_NAME_SIZE);
|
||||
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
|
||||
MAX_TB_NAME_SIZE);
|
||||
superTbls->tags[index].dataLen = columnCase.dataLen;
|
||||
index++;
|
||||
}
|
||||
|
@ -3063,9 +3109,6 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
|
|||
ret = true;
|
||||
|
||||
PARSE_OVER:
|
||||
//free(content);
|
||||
//cJSON_Delete(root);
|
||||
//fclose(fp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -3142,7 +3185,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!gInsertInterval) {
|
||||
g_args.insert_interval = 0;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3163,7 +3207,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!interlaceRows) {
|
||||
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3173,7 +3218,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!maxSqlLen) {
|
||||
g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3183,7 +3229,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!numRecPerReq) {
|
||||
g_args.num_of_RPR = 0xffff;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3509,7 +3556,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
} else if (!dataSource) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, data_source not found\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3584,7 +3632,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
|
||||
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) {
|
||||
if (sampleFile && sampleFile->type == cJSON_String
|
||||
&& sampleFile->valuestring != NULL) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
|
||||
sampleFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
} else if (!sampleFile) {
|
||||
|
@ -3727,9 +3776,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
ret = true;
|
||||
|
||||
PARSE_OVER:
|
||||
//free(content);
|
||||
//cJSON_Delete(root);
|
||||
//fclose(fp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -3795,7 +3841,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
} else if (!gQueryTimes) {
|
||||
g_args.query_times = 1;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3833,35 +3880,45 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
g_queryInfo.specifiedQueryInfo.rate = 0;
|
||||
}
|
||||
|
||||
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times");
|
||||
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
|
||||
"query_times");
|
||||
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
|
||||
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
|
||||
} else if (!specifiedQueryTimes) {
|
||||
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
|
||||
if (concurrent && concurrent->type == cJSON_Number) {
|
||||
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
|
||||
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
|
||||
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
|
||||
__func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount,
|
||||
g_queryInfo.specifiedQueryInfo.concurrent);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
} else if (!concurrent) {
|
||||
g_queryInfo.specifiedQueryInfo.concurrent = 1;
|
||||
}
|
||||
|
||||
cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode");
|
||||
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) {
|
||||
if (0 == strcmp("sync", mode->valuestring)) {
|
||||
g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
|
||||
} else if (0 == strcmp("async", mode->valuestring)) {
|
||||
g_queryInfo.specifiedQueryInfo.subscribeMode = 1;
|
||||
cJSON* queryMode = cJSON_GetObjectItem(specifiedQuery, "mode");
|
||||
if (queryMode && queryMode->type == cJSON_String
|
||||
&& queryMode->valuestring != NULL) {
|
||||
if (0 == strcmp("sync", queryMode->valuestring)) {
|
||||
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
|
||||
} else if (0 == strcmp("async", queryMode->valuestring)) {
|
||||
g_queryInfo.specifiedQueryInfo.mode = ASYNC_QUERY_MODE;
|
||||
} else {
|
||||
printf("ERROR: failed to read json, subscribe mod error\n");
|
||||
errorPrint("%s() LN%d, failed to read json, query mode input error\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
} else {
|
||||
g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
|
||||
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
|
||||
}
|
||||
|
||||
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
|
||||
|
@ -3908,12 +3965,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
if (!superSqls) {
|
||||
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
|
||||
} else if (superSqls->type != cJSON_Array) {
|
||||
printf("ERROR: failed to read json, super sqls not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
} else {
|
||||
int superSqlSize = cJSON_GetArraySize(superSqls);
|
||||
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
|
||||
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
|
||||
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
|
||||
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3965,7 +4024,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
} else if (!superQueryTimes) {
|
||||
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
|
||||
} else {
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
|
||||
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -3984,25 +4044,30 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
//}
|
||||
|
||||
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
|
||||
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) {
|
||||
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE);
|
||||
if (stblname && stblname->type == cJSON_String
|
||||
&& stblname->valuestring != NULL) {
|
||||
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
|
||||
MAX_TB_NAME_SIZE);
|
||||
} else {
|
||||
printf("ERROR: failed to read json, super table name not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* submode = cJSON_GetObjectItem(superQuery, "mode");
|
||||
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) {
|
||||
if (submode && submode->type == cJSON_String
|
||||
&& submode->valuestring != NULL) {
|
||||
if (0 == strcmp("sync", submode->valuestring)) {
|
||||
g_queryInfo.superQueryInfo.subscribeMode = 0;
|
||||
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
|
||||
} else if (0 == strcmp("async", submode->valuestring)) {
|
||||
g_queryInfo.superQueryInfo.subscribeMode = 1;
|
||||
g_queryInfo.superQueryInfo.mode = ASYNC_QUERY_MODE;
|
||||
} else {
|
||||
printf("ERROR: failed to read json, subscribe mod error\n");
|
||||
errorPrint("%s() LN%d, failed to read json, query mode input error\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
} else {
|
||||
g_queryInfo.superQueryInfo.subscribeMode = 0;
|
||||
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
|
||||
}
|
||||
|
||||
cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
|
||||
|
@ -4015,7 +4080,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
|
||||
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) {
|
||||
if (subrestart && subrestart->type == cJSON_String
|
||||
&& subrestart->valuestring != NULL) {
|
||||
if (0 == strcmp("yes", subrestart->valuestring)) {
|
||||
g_queryInfo.superQueryInfo.subscribeRestart = 1;
|
||||
} else if (0 == strcmp("no", subrestart->valuestring)) {
|
||||
|
@ -4049,12 +4115,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
if (!subsqls) {
|
||||
g_queryInfo.superQueryInfo.sqlCount = 0;
|
||||
} else if (subsqls->type != cJSON_Array) {
|
||||
printf("ERROR: failed to read json, super sqls not found\n");
|
||||
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
} else {
|
||||
int superSqlSize = cJSON_GetArraySize(subsqls);
|
||||
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
|
||||
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
|
||||
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
|
||||
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
|
@ -4064,19 +4132,25 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
if (sql == NULL) continue;
|
||||
|
||||
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
|
||||
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
|
||||
printf("ERROR: failed to read json, sql not found\n");
|
||||
if (!sqlStr || sqlStr->type != cJSON_String
|
||||
|| sqlStr->valuestring == NULL) {
|
||||
errorPrint("%s() LN%d, failed to read json, sql not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
|
||||
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
|
||||
MAX_QUERY_SQL_LENGTH);
|
||||
|
||||
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
||||
if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){
|
||||
tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
|
||||
if (result != NULL && result->type == cJSON_String
|
||||
&& result->valuestring != NULL){
|
||||
tstrncpy(g_queryInfo.superQueryInfo.result[j],
|
||||
result->valuestring, MAX_FILE_NAME_LEN);
|
||||
} else if (NULL == result) {
|
||||
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
|
||||
} else {
|
||||
printf("ERROR: failed to read json, sub query result file not found\n");
|
||||
errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
}
|
||||
|
@ -4086,9 +4160,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
|||
ret = true;
|
||||
|
||||
PARSE_OVER:
|
||||
//free(content);
|
||||
//cJSON_Delete(root);
|
||||
//fclose(fp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -5415,7 +5486,7 @@ static void *readTable(void *sarg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
while (taos_fetch_row(pSql) != NULL) {
|
||||
while(taos_fetch_row(pSql) != NULL) {
|
||||
count++;
|
||||
}
|
||||
|
||||
|
@ -5491,7 +5562,7 @@ static void *readMetric(void *sarg) {
|
|||
return NULL;
|
||||
}
|
||||
int count = 0;
|
||||
while (taos_fetch_row(pSql) != NULL) {
|
||||
while(taos_fetch_row(pSql) != NULL) {
|
||||
count++;
|
||||
}
|
||||
t = getCurrentTimeUs() - t;
|
||||
|
@ -5602,7 +5673,7 @@ static int insertTestProcess() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void *superQueryProcess(void *sarg) {
|
||||
static void *specifiedQueryProcess(void *sarg) {
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
|
||||
if (winfo->taos == NULL) {
|
||||
|
@ -5643,22 +5714,25 @@ static void *superQueryProcess(void *sarg) {
|
|||
}
|
||||
|
||||
st = taosGetTimestampUs();
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||
|
||||
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
||||
int64_t t1 = taosGetTimestampUs();
|
||||
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
||||
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
|
||||
if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) {
|
||||
sprintf(tmpFile, "%s-%d",
|
||||
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
|
||||
g_queryInfo.specifiedQueryInfo.result[winfo->querySeq],
|
||||
winfo->threadID);
|
||||
}
|
||||
selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile);
|
||||
selectAndGetResult(winfo->taos,
|
||||
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile);
|
||||
int64_t t2 = taosGetTimestampUs();
|
||||
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
|
||||
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
|
||||
} else {
|
||||
int64_t t1 = taosGetTimestampUs();
|
||||
int retCode = postProceSql(g_queryInfo.host,
|
||||
g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]);
|
||||
g_queryInfo.port,
|
||||
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]);
|
||||
int64_t t2 = taosGetTimestampUs();
|
||||
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
|
||||
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
|
||||
|
@ -5668,7 +5742,7 @@ static void *superQueryProcess(void *sarg) {
|
|||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
et = taosGetTimestampUs();
|
||||
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
|
||||
taosGetSelfPthreadId(), (double)(et - st)/1000.0);
|
||||
|
@ -5698,7 +5772,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
|
|||
//printf("3: %s\n", outSql);
|
||||
}
|
||||
|
||||
static void *subQueryProcess(void *sarg) {
|
||||
static void *superQueryProcess(void *sarg) {
|
||||
char sqlstr[1024];
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
|
||||
|
@ -5791,24 +5865,24 @@ static int queryTestProcess() {
|
|||
pthread_t *pids = NULL;
|
||||
threadInfo *infos = NULL;
|
||||
//==== create sub threads for query from specify table
|
||||
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0
|
||||
&& g_queryInfo.specifiedQueryInfo.concurrent > 0) {
|
||||
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
|
||||
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
|
||||
|
||||
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
|
||||
if (NULL == pids) {
|
||||
if ((nSqlCount > 0) && (nConcurrent > 0)) {
|
||||
|
||||
pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t));
|
||||
infos = malloc(nConcurrent * nSqlCount * sizeof(threadInfo));
|
||||
|
||||
if ((NULL == pids) || (NULL == infos)) {
|
||||
taos_close(taos);
|
||||
ERROR_EXIT("memory allocation failed\n");
|
||||
}
|
||||
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
|
||||
if (NULL == infos) {
|
||||
taos_close(taos);
|
||||
free(pids);
|
||||
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||
}
|
||||
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
|
||||
threadInfo *t_info = infos + i;
|
||||
t_info->threadID = i;
|
||||
for (int i = 0; i < nConcurrent; i++) {
|
||||
for (int j = 0; j < nSqlCount; j++) {
|
||||
threadInfo *t_info = infos + i * nSqlCount + j;
|
||||
t_info->threadID = i * nSqlCount + j;
|
||||
t_info->querySeq = j;
|
||||
|
||||
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
||||
|
||||
|
@ -5827,7 +5901,9 @@ static int queryTestProcess() {
|
|||
|
||||
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
|
||||
|
||||
pthread_create(pids + i, NULL, superQueryProcess, t_info);
|
||||
pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess,
|
||||
t_info);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
g_queryInfo.specifiedQueryInfo.concurrent = 0;
|
||||
|
@ -5841,18 +5917,12 @@ static int queryTestProcess() {
|
|||
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
|
||||
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
|
||||
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
|
||||
if (NULL == pidsOfSub) {
|
||||
free(infos);
|
||||
free(pids);
|
||||
|
||||
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||
}
|
||||
|
||||
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
|
||||
if (NULL == infosOfSub) {
|
||||
free(pidsOfSub);
|
||||
|
||||
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
|
||||
free(infos);
|
||||
free(pids);
|
||||
|
||||
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||
}
|
||||
|
||||
|
@ -5880,7 +5950,7 @@ static int queryTestProcess() {
|
|||
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
||||
startFrom = t_info->end_table_to + 1;
|
||||
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
|
||||
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info);
|
||||
pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info);
|
||||
}
|
||||
|
||||
g_queryInfo.superQueryInfo.threadCnt = threads;
|
||||
|
@ -5888,8 +5958,12 @@ static int queryTestProcess() {
|
|||
g_queryInfo.superQueryInfo.threadCnt = 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
|
||||
pthread_join(pids[i], NULL);
|
||||
if ((nSqlCount > 0) && (nConcurrent > 0)) {
|
||||
for (int i = 0; i < nConcurrent; i++) {
|
||||
for (int j = 0; j < nSqlCount; j++) {
|
||||
pthread_join(pids[i * nSqlCount + j], NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tmfree((char*)pids);
|
||||
|
@ -5920,7 +5994,7 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
|
|||
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
|
||||
TAOS_SUB* tsub = NULL;
|
||||
|
||||
if (g_queryInfo.specifiedQueryInfo.subscribeMode) {
|
||||
if (g_queryInfo.specifiedQueryInfo.mode) {
|
||||
tsub = taos_subscribe(taos,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||
topic, sql, subscribe_callback, (void*)resultFileName,
|
||||
|
@ -5996,13 +6070,13 @@ static void *subSubscribeProcess(void *sarg) {
|
|||
}
|
||||
//et = taosGetTimestampMs();
|
||||
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
|
||||
} while (0);
|
||||
} while(0);
|
||||
|
||||
// start loop to consume result
|
||||
TAOS_RES* res = NULL;
|
||||
while (1) {
|
||||
while(1) {
|
||||
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
|
||||
if (1 == g_queryInfo.superQueryInfo.mode) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -6073,7 +6147,8 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
sprintf(tmpFile, "%s-%d",
|
||||
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
|
||||
}
|
||||
tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
|
||||
tsub[i] = subscribeImpl(winfo->taos,
|
||||
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
|
||||
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
|
||||
taos_close(winfo->taos);
|
||||
return NULL;
|
||||
|
@ -6081,13 +6156,13 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
}
|
||||
//et = taosGetTimestampMs();
|
||||
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
|
||||
} while (0);
|
||||
} while(0);
|
||||
|
||||
// start loop to consume result
|
||||
TAOS_RES* res = NULL;
|
||||
while (1) {
|
||||
while(1) {
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||
if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) {
|
||||
if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -6105,7 +6180,8 @@ static void *superSubscribeProcess(void *sarg) {
|
|||
taos_free_result(res);
|
||||
|
||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||
taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
taos_unsubscribe(tsub[i],
|
||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
}
|
||||
|
||||
taos_close(winfo->taos);
|
||||
|
@ -6308,7 +6384,7 @@ static void setParaFromArg(){
|
|||
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
|
||||
g_Dbs.threadCount = g_args.num_of_threads;
|
||||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
g_Dbs.queryMode = g_args.mode;
|
||||
g_Dbs.queryMode = g_args.query_mode;
|
||||
|
||||
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
|
||||
|
@ -6410,7 +6486,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
|
|||
|
||||
double t = getCurrentTimeUs();
|
||||
|
||||
while ((read_len = tgetline(&line, &line_len, fp)) != -1) {
|
||||
while((read_len = tgetline(&line, &line_len, fp)) != -1) {
|
||||
if (read_len >= MAX_SQL_SIZE) continue;
|
||||
line[--read_len] = '\0';
|
||||
|
||||
|
@ -6473,12 +6549,11 @@ static void testMetaFile() {
|
|||
}
|
||||
|
||||
static void queryResult() {
|
||||
// select
|
||||
if (false == g_Dbs.insert_only) {
|
||||
// query data
|
||||
|
||||
pthread_t read_id;
|
||||
threadInfo *rInfo = malloc(sizeof(threadInfo));
|
||||
assert(rInfo);
|
||||
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
|
||||
rInfo->start_table_from = 0;
|
||||
|
||||
|
@ -6518,7 +6593,6 @@ static void queryResult() {
|
|||
pthread_join(read_id, NULL);
|
||||
taos_close(rInfo->taos);
|
||||
free(rInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void testCmdLine() {
|
||||
|
@ -6536,9 +6610,7 @@ static void testCmdLine() {
|
|||
g_args.test_mode = INSERT_TEST;
|
||||
insertTestProcess();
|
||||
|
||||
if (g_Dbs.insert_only)
|
||||
return;
|
||||
else
|
||||
if (false == g_Dbs.insert_only)
|
||||
queryResult();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue