Merge branch 'master' of github.com:taosdata/TDengine into test/chr

This commit is contained in:
tomchon 2021-05-25 10:37:33 +08:00
commit 53bba6113c
3 changed files with 242 additions and 239 deletions

View File

@ -363,7 +363,7 @@ typedef struct SDbs_S {
typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint64_t concurrent;
uint32_t concurrent;
uint64_t sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
@ -374,6 +374,9 @@ typedef struct SpecifiedQueryInfo_S {
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES* res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
} SpecifiedQueryInfo;
@ -418,7 +421,8 @@ typedef struct SThreadInfo_S {
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision;
char fp[4096];
char filePath[4096];
FILE *fp;
char tb_prefix[MAX_TB_NAME_SIZE];
uint64_t start_table_from;
uint64_t end_table_to;
@ -451,6 +455,7 @@ typedef struct SThreadInfo_S {
// seq of query or subscribe
uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub;
} threadInfo;
@ -532,7 +537,7 @@ static int createDatabasesAndStables();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
uint16_t port, char* sqlstr, char *resultFile);
uint16_t port, char* sqlstr, threadInfo *pThreadInfo);
/* ************ Global variables ************ */
@ -1112,24 +1117,22 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0;
}
static void appendResultBufToFile(char *resultBuf, char *resultFile)
static void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo)
{
FILE *fp = NULL;
if (resultFile[0] != 0) {
fp = fopen(resultFile, "at");
if (fp == NULL) {
errorPrint(
"%s() LN%d, failed to open result file: %s, result will not save to file\n",
__func__, __LINE__, resultFile);
return;
}
fprintf(fp, "%s", resultBuf);
tmfclose(fp);
}
pThreadInfo->fp = fopen(pThreadInfo->filePath, "at");
if (pThreadInfo->fp == NULL) {
errorPrint(
"%s() LN%d, failed to open result file: %s, result will not save to file\n",
__func__, __LINE__, pThreadInfo->filePath);
return;
}
fprintf(pThreadInfo->fp, "%s", resultBuf);
tmfclose(pThreadInfo->fp);
pThreadInfo->fp = NULL;
}
static void appendResultToFile(TAOS_RES *res, char* resultFile) {
static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
TAOS_ROW row = NULL;
int num_rows = 0;
int num_fields = taos_field_count(res);
@ -1147,10 +1150,11 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
// fetch the records row by row
while((row = taos_fetch_row(res))) {
if (totalLen >= 100*1024*1024 - 32000) {
appendResultBufToFile(databuf, resultFile);
totalLen = 0;
memset(databuf, 0, 100*1024*1024);
if ((strlen(pThreadInfo->filePath) > 0)
&& (totalLen >= 100*1024*1024 - 32000)) {
appendResultBufToFile(databuf, pThreadInfo);
totalLen = 0;
memset(databuf, 0, 100*1024*1024);
}
num_rows++;
int len = taos_print_row(temp, row, fields, num_fields);
@ -1161,8 +1165,10 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
__func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile);
__func__, __LINE__, databuf, pThreadInfo->filePath);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(databuf, pThreadInfo);
}
free(databuf);
}
@ -1178,16 +1184,14 @@ static void selectAndGetResult(
return;
}
if ((strlen(pThreadInfo->fp))) {
appendResultToFile(res, pThreadInfo->fp);
}
fetchResult(res, pThreadInfo);
taos_free_result(res);
} else if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command,
pThreadInfo->fp);
pThreadInfo);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
}
@ -1720,7 +1724,7 @@ static void printfQueryMeta() {
printf("query interval: \033[33m%"PRIu64" ms\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryInterval);
printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%"PRIu64"\033[0m\n",
printf("concurrent: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.concurrent);
printf("mod: \033[33m%s\033[0m\n",
(g_queryInfo.specifiedQueryInfo.asyncMode)?"async":"sync");
@ -2017,13 +2021,13 @@ static void printfQuerySystemInfo(TAOS * taos) {
// show variables
res = taos_query(taos, "show variables;");
//appendResultToFile(res, filename);
//fetchResult(res, filename);
xDumpResultToFile(filename, res);
// show dnodes
res = taos_query(taos, "show dnodes;");
xDumpResultToFile(filename, res);
//appendResultToFile(res, filename);
//fetchResult(res, filename);
// show databases
res = taos_query(taos, "show databases;");
@ -2059,7 +2063,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
}
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
char* sqlstr, char *resultFile)
char* sqlstr, threadInfo *pThreadInfo)
{
char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s";
@ -2195,8 +2199,8 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
response_buf[RESP_BUF_LEN - 1] = '\0';
printf("Response:\n%s\n", response_buf);
if (resultFile) {
appendResultBufToFile(response_buf, resultFile);
if (strlen(pThreadInfo->filePath) > 0) {
appendResultBufToFile(response_buf, pThreadInfo);
}
free(request_buf);
@ -2959,18 +2963,18 @@ static int startMultiThreadCreateChildTable(
b = ntables % threads;
for (int64_t i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->superTblInfo = superTblInfo;
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
pThreadInfo->superTblInfo = superTblInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
t_info->taos = taos_connect(
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
db_name,
g_Dbs.port);
if (t_info->taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint( "%s() LN%d, Failed to connect to TDengine, reason:%s\n",
__func__, __LINE__, taos_errstr(NULL));
free(pids);
@ -2978,14 +2982,14 @@ static int startMultiThreadCreateChildTable(
return -1;
}
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->use_metric = true;
t_info->cols = cols;
t_info->minDelay = UINT64_MAX;
pthread_create(pids + i, NULL, createTable, t_info);
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
pthread_create(pids + i, NULL, createTable, pThreadInfo);
}
for (int i = 0; i < threads; i++) {
@ -2993,8 +2997,8 @@ static int startMultiThreadCreateChildTable(
}
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
taos_close(t_info->taos);
threadInfo *pThreadInfo = infos + i;
taos_close(pThreadInfo->taos);
}
free(pids);
@ -4199,7 +4203,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint(
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
@ -4266,24 +4270,28 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
// sqls
cJSON* superSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!superSqls) {
cJSON* specifiedSqls = cJSON_GetObjectItem(specifiedQuery, "sqls");
if (!specifiedSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
} else if (specifiedSqls->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
int superSqlSize = cJSON_GetArraySize(specifiedSqls);
if (superSqlSize * g_queryInfo.specifiedQueryInfo.concurrent
> MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql(%d) * concurrent(%d) overflow, max is %d\n",
__func__, __LINE__,
superSqlSize,
g_queryInfo.specifiedQueryInfo.concurrent,
MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.specifiedQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
cJSON* sql = cJSON_GetArrayItem(specifiedSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
@ -4459,16 +4467,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
}
// sqls
cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!subsqls) {
// supert table sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) {
} else if (superSqls->type != cJSON_Array) {
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(subsqls);
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
@ -4477,7 +4485,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(subsqls, j);
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
@ -5823,49 +5831,49 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
t_info->time_precision = timePrec;
t_info->superTblInfo = superTblInfo;
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
t_info->start_time = start_time;
t_info->minDelay = UINT64_MAX;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//t_info->taos = taos;
t_info->taos = taos_connect(
//pThreadInfo->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
if (NULL == pThreadInfo->taos) {
errorPrint(
"connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
} else {
t_info->taos = NULL;
pThreadInfo->taos = NULL;
}
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
/* } else {
t_info->start_table_from = 0;
t_info->ntables = superTblInfo->childTblCount;
t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint();
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init(&(t_info->lock_sem), 0, 0);
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, t_info);
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
}
}
@ -5880,27 +5888,27 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double avgDelay = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows);
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalInsertRows += t_info->totalInsertRows;
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += t_info->totalAffectedRows;
g_args.totalInsertRows += t_info->totalInsertRows;
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
totalDelay += pThreadInfo->totalDelay;
cntDelay += pThreadInfo->cntDelay;
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
@ -5956,26 +5964,26 @@ static void startMultiThreadInsertData(int threads, char* db_name,
static void *readTable(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
char command[BUFFER_SIZE] = "\0";
uint64_t sTime = rinfo->start_time;
char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a");
uint64_t sTime = pThreadInfo->start_time;
char *tb_prefix = pThreadInfo->tb_prefix;
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
errorPrint( "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
errorPrint( "fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT;
/* if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
// }
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
@ -6028,17 +6036,17 @@ static void *readTable(void *sarg) {
static void *readMetric(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS *taos = pThreadInfo->taos;
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(rinfo->fp, "a");
FILE *fp = fopen(pThreadInfo->filePath, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
printf("fopen %s fail, reason:%s.\n", pThreadInfo->filePath, strerror(errno));
return NULL;
}
int64_t num_of_DPT = rinfo->superTblInfo->insertRows;
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
@ -6237,8 +6245,8 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
@ -6338,8 +6346,8 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.superQueryInfo.result[j] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
@ -6429,9 +6437,9 @@ static int queryTestProcess() {
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
@ -6448,10 +6456,10 @@ static int queryTestProcess() {
}
}
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
t_info);
pThreadInfo);
}
}
} else {
@ -6491,15 +6499,15 @@ static int queryTestProcess() {
uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, t_info);
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
g_queryInfo.superQueryInfo.threadCnt = threads;
@ -6546,7 +6554,7 @@ static void stable_sub_callback(
}
if (param)
appendResultToFile(res, ((threadInfo *)param)->fp);
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
@ -6559,7 +6567,7 @@ static void specified_sub_callback(
}
if (param)
appendResultToFile(res, ((threadInfo *)param)->fp);
fetchResult(res, (threadInfo *)param);
// tao_unscribe() will free result.
}
@ -6613,18 +6621,15 @@ static void *superSubscribe(void *sarg) {
}
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
@ -6654,7 +6659,7 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSqlstr, i);
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
@ -6697,16 +6702,16 @@ static void *superSubscribe(void *sarg) {
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
fetchResult(res, pThreadInfo);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
fetchResult(res, pThreadInfo);
}
consumed[tsubSeq] ++;
@ -6747,21 +6752,18 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub = NULL;
// TAOS_SUB* tsub = NULL;
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host,
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
} else {
pThreadInfo->taos = taos;
}
}
@ -6773,69 +6775,70 @@ static void *specifiedSubscribe(void *sarg) {
return NULL;
}
char topic[32] = {0};
sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
"taosdemo-subscribe-%"PRIu64"-%d",
pThreadInfo->querySeq,
pThreadInfo->threadID);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq] != NULL) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
tsub = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) {
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
// start loop to consume result
TAOS_RES* res = NULL;
int consumed;
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while(1) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
res = taos_consume(tsub);
if (res) {
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo);
}
consumed ++;
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed >=
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
consumed = 0;
taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub = subscribeImpl(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) {
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
taos_free_result(res);
taos_unsubscribe(tsub, 0);
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->querySeq], 0);
taos_close(pThreadInfo->taos);
return NULL;
@ -6905,18 +6908,18 @@ static int subscribeTestProcess() {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, t_info);
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo);
}
}
}
//==== create threads for super table query
if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, super table query sqlCount %"PRIu64".\n",
debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount);
} else {
@ -6955,17 +6958,17 @@ static int subscribeTestProcess() {
uint64_t startFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *t_info = infosOfStable + seq;
t_info->threadID = seq;
t_info->querySeq = i;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
t_info->start_table_from = startFrom;
t_info->ntables = j<b?a+1:a;
t_info->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, t_info);
NULL, superSubscribe, pThreadInfo);
}
}
@ -7244,47 +7247,47 @@ static void queryResult() {
// query data
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
assert(rInfo);
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0;
threadInfo *pThreadInfo = malloc(sizeof(threadInfo));
assert(pThreadInfo);
pThreadInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
pThreadInfo->start_table_from = 0;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//pThreadInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(rInfo->tb_prefix,
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE);
} else {
rInfo->ntables = g_args.num_of_tables;
rInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(rInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
}
rInfo->taos = taos_connect(
pThreadInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (rInfo->taos == NULL) {
if (pThreadInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(rInfo);
free(pThreadInfo);
exit(-1);
}
tstrncpy(rInfo->fp, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, rInfo);
pthread_create(&read_id, NULL, readTable, pThreadInfo);
} else {
pthread_create(&read_id, NULL, readMetric, rInfo);
pthread_create(&read_id, NULL, readMetric, pThreadInfo);
}
pthread_join(read_id, NULL);
taos_close(rInfo->taos);
free(rInfo);
taos_close(pThreadInfo->taos);
free(pThreadInfo);
}
static void testCmdLine() {

View File

@ -22,7 +22,7 @@ from queue import Queue, Empty
from .shared.config import Config
from .shared.db import DbTarget, DbConn
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
from .shared.types import DirPath
from .shared.types import DirPath, IpcStream
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
# from crash_gen.db import DbConn, DbTarget
@ -177,13 +177,12 @@ quorum 2
return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance
cmdLine = []
if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes']
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
return cmdLine
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
else:
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def _getDnodes(self, dbc):
dbc.query("show dnodes")
@ -281,16 +280,16 @@ class TdeSubProcess:
return '[TdeSubProc: pid = {}, status = {}]'.format(
self.getPid(), self.getStatus() )
def getStdOut(self) -> BinaryIO :
def getIpcStdOut(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDOUT IPC")
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
return typing.cast(BinaryIO, self._popen.stdout)
return typing.cast(IpcStream, self._popen.stdout)
def getStdErr(self) -> BinaryIO :
def getIpcStdErr(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDERR IPC")
return typing.cast(BinaryIO, self._popen.stderr)
return typing.cast(IpcStream, self._popen.stderr)
# Now it's always running, since we matched the life cycle
# def isRunning(self):
@ -301,11 +300,6 @@ class TdeSubProcess:
def _start(self, cmdLine) -> Popen :
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
# if self.subProcess: # already there
# raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
@ -314,9 +308,8 @@ class TdeSubProcess:
# print(myEnv)
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
print("Starting TDengine: {}".format(cmdLine))
# useShell = True # Needed to pass environments into it
return Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars
@ -732,19 +725,19 @@ class ServiceManagerThread:
self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(subProc.getStdOut(), self._ipcQueue, logDir))
args=(subProc.getIpcStdOut(), self._ipcQueue, logDir))
self._thread.daemon = True # thread dies with the program
self._thread.start()
time.sleep(0.01)
if not self._thread.is_alive(): # What happened?
Logging.info("Failed to started process to monitor STDOUT")
Logging.info("Failed to start process to monitor STDOUT")
self.stop()
raise CrashGenError("Failed to start thread to monitor STDOUT")
Logging.info("Successfully started process to monitor STDOUT")
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(subProc.getStdErr(), self._ipcQueue, logDir))
args=(subProc.getIpcStdErr(), self._ipcQueue, logDir))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
time.sleep(0.01)
@ -887,14 +880,19 @@ class ServiceManagerThread:
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
return None
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str
) -> Generator[TextChunk, None, None]:
'''
Take an input stream with binary data, produced a generator of decoded
"text chunks", and also save the original binary data in a log file.
Take an input stream with binary data (likely from Popen), produced a generator of decoded
"text chunks".
Side effect: it also save the original binary data in a log file.
'''
os.makedirs(logDir, exist_ok=True)
logF = open(os.path.join(logDir, logFile), 'wb')
if logF is None:
Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile))
return
for bChunk in iter(streamIn.readline, b''):
logF.write(bChunk) # Write to log file immediately
tChunk = self._decodeBinaryChunk(bChunk) # decode
@ -902,14 +900,14 @@ class ServiceManagerThread:
yield tChunk # TODO: split into actual text lines
# At the end...
streamIn.close() # Close the stream
logF.close() # Close the output file
streamIn.close() # Close the incoming stream
logF.close() # Close the log file
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str):
'''
The infinite routine that processes the STDOUT stream for the sub process being managed.
:param stdOut: the IO stream object used to fetch the data from
:param ipcStdOut: the IO stream object used to fetch the data from
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
:param logDir: where we should dump a verbatim output file
'''
@ -917,7 +915,7 @@ class ServiceManagerThread:
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# stdOut.readline() # Skip the first output? TODO: remove?
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') :
queue.put(tChunk) # tChunk garanteed not to be None
self._printProgress("_i")
@ -940,12 +938,12 @@ class ServiceManagerThread:
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
self.setStatus(Status.STATUS_STOPPED)
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str):
# os.makedirs(logDir, exist_ok=True)
# logFile = os.path.join(logDir,'stderr.log')
# fErr = open(logFile, 'wb')
# for line in iter(err.readline, b''):
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') :
queue.put(tChunk) # tChunk garanteed not to be None
# fErr.write(line)
Logging.info("TDengine STDERR: {}".format(tChunk))

View File

@ -1,4 +1,4 @@
from typing import Any, List, Dict, NewType
from typing import Any, BinaryIO, List, Dict, NewType
from enum import Enum
DirPath = NewType('DirPath', str)
@ -26,3 +26,5 @@ class TdDataType(Enum):
TdColumns = Dict[str, TdDataType]
TdTags = Dict[str, TdDataType]
IpcStream = NewType('IpcStream', BinaryIO)