|
|
|
@ -366,6 +366,7 @@ typedef struct SpecifiedQueryInfo_S {
|
|
|
|
|
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
|
|
|
|
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
|
|
|
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
|
|
|
|
int totalQueried;
|
|
|
|
|
} SpecifiedQueryInfo;
|
|
|
|
|
|
|
|
|
|
typedef struct SuperQueryInfo_S {
|
|
|
|
@ -385,6 +386,7 @@ typedef struct SuperQueryInfo_S {
|
|
|
|
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
|
|
|
|
|
|
|
|
|
char* childTblName;
|
|
|
|
|
int totalQueried;
|
|
|
|
|
} SuperQueryInfo;
|
|
|
|
|
|
|
|
|
|
typedef struct SQueryMetaInfo_S {
|
|
|
|
@ -398,6 +400,7 @@ typedef struct SQueryMetaInfo_S {
|
|
|
|
|
|
|
|
|
|
SpecifiedQueryInfo specifiedQueryInfo;
|
|
|
|
|
SuperQueryInfo superQueryInfo;
|
|
|
|
|
int totalQueried;
|
|
|
|
|
} SQueryMetaInfo;
|
|
|
|
|
|
|
|
|
|
typedef struct SThreadInfo_S {
|
|
|
|
@ -2602,8 +2605,8 @@ static int createDatabasesAndStables() {
|
|
|
|
|
|
|
|
|
|
static void* createTable(void *sarg)
|
|
|
|
|
{
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
|
|
|
|
|
|
|
|
|
int64_t lastPrintTime = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
@ -2621,15 +2624,15 @@ static void* createTable(void *sarg)
|
|
|
|
|
|
|
|
|
|
verbosePrint("%s() LN%d: Creating table from %d to %d\n",
|
|
|
|
|
__func__, __LINE__,
|
|
|
|
|
winfo->start_table_from, winfo->end_table_to);
|
|
|
|
|
pThreadInfo->start_table_from, pThreadInfo->end_table_to);
|
|
|
|
|
|
|
|
|
|
for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) {
|
|
|
|
|
for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
|
|
|
|
|
if (0 == g_Dbs.use_metric) {
|
|
|
|
|
snprintf(buffer, buff_len,
|
|
|
|
|
"create table if not exists %s.%s%d %s;",
|
|
|
|
|
winfo->db_name,
|
|
|
|
|
pThreadInfo->db_name,
|
|
|
|
|
g_args.tb_prefix, i,
|
|
|
|
|
winfo->cols);
|
|
|
|
|
pThreadInfo->cols);
|
|
|
|
|
} else {
|
|
|
|
|
if (superTblInfo == NULL) {
|
|
|
|
|
errorPrint("%s() LN%d, use metric, but super table info is NULL\n",
|
|
|
|
@ -2658,8 +2661,8 @@ static void* createTable(void *sarg)
|
|
|
|
|
len += snprintf(buffer + len,
|
|
|
|
|
buff_len - len,
|
|
|
|
|
"if not exists %s.%s%d using %s.%s tags %s ",
|
|
|
|
|
winfo->db_name, superTblInfo->childTblPrefix,
|
|
|
|
|
i, winfo->db_name,
|
|
|
|
|
pThreadInfo->db_name, superTblInfo->childTblPrefix,
|
|
|
|
|
i, pThreadInfo->db_name,
|
|
|
|
|
superTblInfo->sTblName, tagsValBuf);
|
|
|
|
|
free(tagsValBuf);
|
|
|
|
|
batchNum++;
|
|
|
|
@ -2673,7 +2676,7 @@ static void* createTable(void *sarg)
|
|
|
|
|
|
|
|
|
|
len = 0;
|
|
|
|
|
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
|
|
|
|
|
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)){
|
|
|
|
|
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)){
|
|
|
|
|
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
|
|
|
|
free(buffer);
|
|
|
|
|
return NULL;
|
|
|
|
@ -2682,14 +2685,14 @@ static void* createTable(void *sarg)
|
|
|
|
|
int64_t currentPrintTime = taosGetTimestampMs();
|
|
|
|
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
|
|
|
|
printf("thread[%d] already create %d - %d tables\n",
|
|
|
|
|
winfo->threadID, winfo->start_table_from, i);
|
|
|
|
|
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
|
|
|
|
|
lastPrintTime = currentPrintTime;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (0 != len) {
|
|
|
|
|
verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer);
|
|
|
|
|
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) {
|
|
|
|
|
if (0 != queryDbExec(pThreadInfo->taos, buffer, NO_INSERT_TYPE, false)) {
|
|
|
|
|
errorPrint( "queryDbExec() failed. buffer:\n%s\n", buffer);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -3694,7 +3697,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|
|
|
|
__func__, __LINE__);
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
cJSON *multiThreadWriteOneTbl =
|
|
|
|
|
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
|
|
|
|
|
if (multiThreadWriteOneTbl
|
|
|
|
@ -3711,7 +3714,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|
|
|
|
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
|
|
|
|
|
goto PARSE_OVER;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
|
|
|
|
|
if (interlaceRows && interlaceRows->type == cJSON_Number) {
|
|
|
|
|
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
|
|
|
|
@ -5159,45 +5162,45 @@ free_and_statistics_2:
|
|
|
|
|
|
|
|
|
|
static void* syncWrite(void *sarg) {
|
|
|
|
|
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
|
|
|
|
|
|
|
|
|
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
|
|
|
|
|
|
|
|
|
|
if (interlaceRows > 0) {
|
|
|
|
|
// interlace mode
|
|
|
|
|
return syncWriteInterlace(winfo);
|
|
|
|
|
return syncWriteInterlace(pThreadInfo);
|
|
|
|
|
} else {
|
|
|
|
|
// progressive mode
|
|
|
|
|
return syncWriteProgressive(winfo);
|
|
|
|
|
return syncWriteProgressive(pThreadInfo);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void callBack(void *param, TAOS_RES *res, int code) {
|
|
|
|
|
threadInfo* winfo = (threadInfo*)param;
|
|
|
|
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
|
|
|
|
threadInfo* pThreadInfo = (threadInfo*)param;
|
|
|
|
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
|
|
|
|
|
|
|
|
|
int insert_interval =
|
|
|
|
|
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
|
|
|
|
if (insert_interval) {
|
|
|
|
|
winfo->et = taosGetTimestampUs();
|
|
|
|
|
if (((winfo->et - winfo->st)/1000) < insert_interval) {
|
|
|
|
|
taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms
|
|
|
|
|
pThreadInfo->et = taosGetTimestampUs();
|
|
|
|
|
if (((pThreadInfo->et - pThreadInfo->st)/1000) < insert_interval) {
|
|
|
|
|
taosMsleep(insert_interval - (pThreadInfo->et - pThreadInfo->st)/1000); // ms
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen);
|
|
|
|
|
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen);
|
|
|
|
|
char data[MAX_DATA_SIZE];
|
|
|
|
|
char *pstr = buffer;
|
|
|
|
|
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix,
|
|
|
|
|
winfo->start_table_from);
|
|
|
|
|
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
|
|
|
|
|
if (winfo->counter >= g_args.num_of_RPR) {
|
|
|
|
|
winfo->start_table_from++;
|
|
|
|
|
winfo->counter = 0;
|
|
|
|
|
pstr += sprintf(pstr, "insert into %s.%s%d values", pThreadInfo->db_name, pThreadInfo->tb_prefix,
|
|
|
|
|
pThreadInfo->start_table_from);
|
|
|
|
|
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
|
|
|
|
|
if (pThreadInfo->counter >= g_args.num_of_RPR) {
|
|
|
|
|
pThreadInfo->start_table_from++;
|
|
|
|
|
pThreadInfo->counter = 0;
|
|
|
|
|
}
|
|
|
|
|
if (winfo->start_table_from > winfo->end_table_to) {
|
|
|
|
|
tsem_post(&winfo->lock_sem);
|
|
|
|
|
if (pThreadInfo->start_table_from > pThreadInfo->end_table_to) {
|
|
|
|
|
tsem_post(&pThreadInfo->lock_sem);
|
|
|
|
|
free(buffer);
|
|
|
|
|
taos_free_result(res);
|
|
|
|
|
return;
|
|
|
|
@ -5205,46 +5208,46 @@ static void callBack(void *param, TAOS_RES *res, int code) {
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < g_args.num_of_RPR; i++) {
|
|
|
|
|
int rand_num = taosRandom() % 100;
|
|
|
|
|
if (0 != winfo->superTblInfo->disorderRatio
|
|
|
|
|
&& rand_num < winfo->superTblInfo->disorderRatio) {
|
|
|
|
|
int64_t d = winfo->lastTs - (taosRandom() % winfo->superTblInfo->disorderRange + 1);
|
|
|
|
|
generateRowData(data, d, winfo->superTblInfo);
|
|
|
|
|
if (0 != pThreadInfo->superTblInfo->disorderRatio
|
|
|
|
|
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
|
|
|
|
|
int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
|
|
|
|
|
generateRowData(data, d, pThreadInfo->superTblInfo);
|
|
|
|
|
} else {
|
|
|
|
|
generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
|
|
|
|
|
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
|
|
|
|
|
}
|
|
|
|
|
pstr += sprintf(pstr, "%s", data);
|
|
|
|
|
winfo->counter++;
|
|
|
|
|
pThreadInfo->counter++;
|
|
|
|
|
|
|
|
|
|
if (winfo->counter >= winfo->superTblInfo->insertRows) {
|
|
|
|
|
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (insert_interval) {
|
|
|
|
|
winfo->st = taosGetTimestampUs();
|
|
|
|
|
pThreadInfo->st = taosGetTimestampUs();
|
|
|
|
|
}
|
|
|
|
|
taos_query_a(winfo->taos, buffer, callBack, winfo);
|
|
|
|
|
taos_query_a(pThreadInfo->taos, buffer, callBack, pThreadInfo);
|
|
|
|
|
free(buffer);
|
|
|
|
|
|
|
|
|
|
taos_free_result(res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *asyncWrite(void *sarg) {
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = winfo->superTblInfo;
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
|
|
|
|
|
|
|
|
|
winfo->st = 0;
|
|
|
|
|
winfo->et = 0;
|
|
|
|
|
winfo->lastTs = winfo->start_time;
|
|
|
|
|
pThreadInfo->st = 0;
|
|
|
|
|
pThreadInfo->et = 0;
|
|
|
|
|
pThreadInfo->lastTs = pThreadInfo->start_time;
|
|
|
|
|
|
|
|
|
|
int insert_interval =
|
|
|
|
|
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
|
|
|
|
|
if (insert_interval) {
|
|
|
|
|
winfo->st = taosGetTimestampUs();
|
|
|
|
|
pThreadInfo->st = taosGetTimestampUs();
|
|
|
|
|
}
|
|
|
|
|
taos_query_a(winfo->taos, "show databases", callBack, winfo);
|
|
|
|
|
taos_query_a(pThreadInfo->taos, "show databases", callBack, pThreadInfo);
|
|
|
|
|
|
|
|
|
|
tsem_wait(&(winfo->lock_sem));
|
|
|
|
|
tsem_wait(&(pThreadInfo->lock_sem));
|
|
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
@ -5774,10 +5777,10 @@ static int insertTestProcess() {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *specifiedQueryProcess(void *sarg) {
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
static void *specifiedTableQuery(void *sarg) {
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
|
|
|
|
|
if (winfo->taos == NULL) {
|
|
|
|
|
if (pThreadInfo->taos == NULL) {
|
|
|
|
|
TAOS * taos = NULL;
|
|
|
|
|
taos = taos_connect(g_queryInfo.host,
|
|
|
|
|
g_queryInfo.user,
|
|
|
|
@ -5786,17 +5789,17 @@ static void *specifiedQueryProcess(void *sarg) {
|
|
|
|
|
g_queryInfo.port);
|
|
|
|
|
if (taos == NULL) {
|
|
|
|
|
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
|
|
|
|
|
winfo->threadID, taos_errstr(NULL));
|
|
|
|
|
pThreadInfo->threadID, taos_errstr(NULL));
|
|
|
|
|
return NULL;
|
|
|
|
|
} else {
|
|
|
|
|
winfo->taos = taos;
|
|
|
|
|
pThreadInfo->taos = taos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char sqlStr[MAX_DB_NAME_SIZE + 5];
|
|
|
|
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
|
|
|
|
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
errorPrint( "use database %s failed!\n\n",
|
|
|
|
|
g_queryInfo.dbName);
|
|
|
|
|
return NULL;
|
|
|
|
@ -5807,11 +5810,15 @@ static void *specifiedQueryProcess(void *sarg) {
|
|
|
|
|
|
|
|
|
|
int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
|
|
|
|
|
|
|
|
|
|
int totalQueried = 0;
|
|
|
|
|
int64_t lastPrintTime = taosGetTimestampMs();
|
|
|
|
|
int64_t startTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
while(queryTimes --) {
|
|
|
|
|
if (g_queryInfo.specifiedQueryInfo.rate && (et - st) <
|
|
|
|
|
(int64_t)g_queryInfo.specifiedQueryInfo.rate*1000) {
|
|
|
|
|
taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
|
|
|
|
|
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
|
|
|
|
|
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
st = taosGetTimestampUs();
|
|
|
|
@ -5819,13 +5826,13 @@ static void *specifiedQueryProcess(void *sarg) {
|
|
|
|
|
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
|
|
|
|
|
int64_t t1 = taosGetTimestampUs();
|
|
|
|
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
|
|
|
|
if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) {
|
|
|
|
|
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[winfo->querySeq],
|
|
|
|
|
winfo->threadID);
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
|
|
|
|
|
pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
selectAndGetResult(winfo->taos,
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile);
|
|
|
|
|
selectAndGetResult(pThreadInfo->taos,
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile);
|
|
|
|
|
int64_t t2 = taosGetTimestampUs();
|
|
|
|
|
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
|
|
|
|
|
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
|
|
|
|
@ -5833,25 +5840,37 @@ static void *specifiedQueryProcess(void *sarg) {
|
|
|
|
|
int64_t t1 = taosGetTimestampUs();
|
|
|
|
|
int retCode = postProceSql(g_queryInfo.host,
|
|
|
|
|
g_queryInfo.port,
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]);
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
|
|
|
|
|
if (0 != retCode) {
|
|
|
|
|
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
int64_t t2 = taosGetTimestampUs();
|
|
|
|
|
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
|
|
|
|
|
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
|
|
|
|
|
|
|
|
|
|
if (0 != retCode) {
|
|
|
|
|
printf("====restful return fail, threadID[%d]\n", winfo->threadID);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
totalQueried ++;
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.totalQueried ++;
|
|
|
|
|
|
|
|
|
|
et = taosGetTimestampUs();
|
|
|
|
|
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
|
|
|
|
|
taosGetSelfPthreadId(), (double)(et - st)/1000.0);
|
|
|
|
|
|
|
|
|
|
int64_t currentPrintTime = taosGetTimestampMs();
|
|
|
|
|
int64_t endTs = taosGetTimestampMs();
|
|
|
|
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
|
|
|
|
printf("thread[%d] has currently completed queries: %d, QPS: %10.2f\n",
|
|
|
|
|
pThreadInfo->threadID,
|
|
|
|
|
totalQueried,
|
|
|
|
|
totalQueried/((endTs-startTs)/1000.0));
|
|
|
|
|
}
|
|
|
|
|
lastPrintTime = currentPrintTime;
|
|
|
|
|
}
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
|
|
|
|
|
static void replaceChildTblName(char* inSql, char* outSql, int tblIndex) {
|
|
|
|
|
char sourceString[32] = "xxxx";
|
|
|
|
|
char subTblName[MAX_TB_NAME_SIZE*3];
|
|
|
|
|
sprintf(subTblName, "%s.%s",
|
|
|
|
@ -5873,11 +5892,11 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
|
|
|
|
|
//printf("3: %s\n", outSql);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *superQueryProcess(void *sarg) {
|
|
|
|
|
static void *superTableQuery(void *sarg) {
|
|
|
|
|
char sqlstr[1024];
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
|
|
|
|
|
if (winfo->taos == NULL) {
|
|
|
|
|
if (pThreadInfo->taos == NULL) {
|
|
|
|
|
TAOS * taos = NULL;
|
|
|
|
|
taos = taos_connect(g_queryInfo.host,
|
|
|
|
|
g_queryInfo.user,
|
|
|
|
@ -5886,10 +5905,10 @@ static void *superQueryProcess(void *sarg) {
|
|
|
|
|
g_queryInfo.port);
|
|
|
|
|
if (taos == NULL) {
|
|
|
|
|
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
|
|
|
|
|
winfo->threadID, taos_errstr(NULL));
|
|
|
|
|
pThreadInfo->threadID, taos_errstr(NULL));
|
|
|
|
|
return NULL;
|
|
|
|
|
} else {
|
|
|
|
|
winfo->taos = taos;
|
|
|
|
|
pThreadInfo->taos = taos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -5897,33 +5916,49 @@ static void *superQueryProcess(void *sarg) {
|
|
|
|
|
int64_t et = (int64_t)g_queryInfo.superQueryInfo.rate*1000;
|
|
|
|
|
|
|
|
|
|
int queryTimes = g_queryInfo.superQueryInfo.queryTimes;
|
|
|
|
|
int totalQueried = 0;
|
|
|
|
|
int64_t startTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
int64_t lastPrintTime = taosGetTimestampMs();
|
|
|
|
|
while(queryTimes --) {
|
|
|
|
|
if (g_queryInfo.superQueryInfo.rate
|
|
|
|
|
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) {
|
|
|
|
|
taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
|
|
|
|
|
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
|
|
|
|
|
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
st = taosGetTimestampUs();
|
|
|
|
|
for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) {
|
|
|
|
|
for (int i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
|
|
|
|
|
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
|
|
|
|
|
memset(sqlstr,0,sizeof(sqlstr));
|
|
|
|
|
replaceSubTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
|
|
|
|
|
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
|
|
|
|
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
|
|
|
|
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.superQueryInfo.result[j],
|
|
|
|
|
winfo->threadID);
|
|
|
|
|
pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
selectAndGetResult(winfo->taos, sqlstr, tmpFile);
|
|
|
|
|
selectAndGetResult(pThreadInfo->taos, sqlstr, tmpFile);
|
|
|
|
|
|
|
|
|
|
totalQueried++;
|
|
|
|
|
g_queryInfo.superQueryInfo.totalQueried ++;
|
|
|
|
|
|
|
|
|
|
int64_t currentPrintTime = taosGetTimestampMs();
|
|
|
|
|
int64_t endTs = taosGetTimestampMs();
|
|
|
|
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
|
|
|
|
printf("thread[%d] has currently completed queries: %d, QPS: %10.2f\n",
|
|
|
|
|
pThreadInfo->threadID,
|
|
|
|
|
totalQueried,
|
|
|
|
|
totalQueried/((endTs-startTs)/1000.0));
|
|
|
|
|
}
|
|
|
|
|
lastPrintTime = currentPrintTime;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
et = taosGetTimestampUs();
|
|
|
|
|
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
|
|
|
|
|
taosGetSelfPthreadId(),
|
|
|
|
|
winfo->start_table_from,
|
|
|
|
|
winfo->end_table_to,
|
|
|
|
|
pThreadInfo->start_table_from,
|
|
|
|
|
pThreadInfo->end_table_to,
|
|
|
|
|
(double)(et - st)/1000000.0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -5969,6 +6004,8 @@ static int queryTestProcess() {
|
|
|
|
|
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
|
|
|
|
|
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
|
|
|
|
|
|
|
|
|
|
int64_t startTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
if ((nSqlCount > 0) && (nConcurrent > 0)) {
|
|
|
|
|
|
|
|
|
|
pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t));
|
|
|
|
@ -6002,7 +6039,7 @@ static int queryTestProcess() {
|
|
|
|
|
|
|
|
|
|
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
|
|
|
|
|
|
|
|
|
|
pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess,
|
|
|
|
|
pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery,
|
|
|
|
|
t_info);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -6051,7 +6088,7 @@ static int queryTestProcess() {
|
|
|
|
|
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
|
|
|
|
startFrom = t_info->end_table_to + 1;
|
|
|
|
|
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
|
|
|
|
|
pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info);
|
|
|
|
|
pthread_create(pidsOfSub + i, NULL, superTableQuery, t_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_queryInfo.superQueryInfo.threadCnt = threads;
|
|
|
|
@ -6078,6 +6115,14 @@ static int queryTestProcess() {
|
|
|
|
|
tmfree((char*)infosOfSub);
|
|
|
|
|
|
|
|
|
|
// taos_close(taos);// TODO: workaround to use separate taos connection;
|
|
|
|
|
int64_t endTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
int totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
|
|
|
|
|
g_queryInfo.superQueryInfo.totalQueried;
|
|
|
|
|
|
|
|
|
|
printf("==== completed total queries: %d, the QPS of all threads: %10.2f====\n",
|
|
|
|
|
totalQueried,
|
|
|
|
|
totalQueried/((endTs-startTs)/1000.0));
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -6114,12 +6159,12 @@ static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultF
|
|
|
|
|
return tsub;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
static void *superSubscribe(void *sarg) {
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
char subSqlstr[1024];
|
|
|
|
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
|
|
|
|
|
|
|
|
|
if (winfo->taos == NULL) {
|
|
|
|
|
if (pThreadInfo->taos == NULL) {
|
|
|
|
|
TAOS * taos = NULL;
|
|
|
|
|
taos = taos_connect(g_queryInfo.host,
|
|
|
|
|
g_queryInfo.user,
|
|
|
|
@ -6128,17 +6173,17 @@ static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
g_queryInfo.port);
|
|
|
|
|
if (taos == NULL) {
|
|
|
|
|
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
|
|
|
|
|
winfo->threadID, taos_errstr(NULL));
|
|
|
|
|
pThreadInfo->threadID, taos_errstr(NULL));
|
|
|
|
|
return NULL;
|
|
|
|
|
} else {
|
|
|
|
|
winfo->taos = taos;
|
|
|
|
|
pThreadInfo->taos = taos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
|
|
|
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
|
|
|
|
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
errorPrint( "use database %s failed!\n\n",
|
|
|
|
|
g_queryInfo.dbName);
|
|
|
|
|
return NULL;
|
|
|
|
@ -6149,7 +6194,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
do {
|
|
|
|
|
//if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) {
|
|
|
|
|
// taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
|
|
|
|
|
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
|
|
|
|
|
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
//st = taosGetTimestampMs();
|
|
|
|
@ -6157,15 +6202,15 @@ static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
|
|
|
|
sprintf(topic, "taosdemo-subscribe-%d", i);
|
|
|
|
|
memset(subSqlstr,0,sizeof(subSqlstr));
|
|
|
|
|
replaceSubTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i);
|
|
|
|
|
replaceChildTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i);
|
|
|
|
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
|
|
|
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
|
|
|
|
|
g_queryInfo.superQueryInfo.result[i], pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile);
|
|
|
|
|
tsub[i] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
|
|
|
|
|
if (NULL == tsub[i]) {
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -6187,7 +6232,7 @@ static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.superQueryInfo.result[i],
|
|
|
|
|
winfo->threadID);
|
|
|
|
|
pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
getResult(res, tmpFile);
|
|
|
|
|
}
|
|
|
|
@ -6199,15 +6244,15 @@ static void *subSubscribeProcess(void *sarg) {
|
|
|
|
|
taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
threadInfo *winfo = (threadInfo *)sarg;
|
|
|
|
|
static void *specifiedSubscribe(void *sarg) {
|
|
|
|
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
|
|
|
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
|
|
|
|
|
|
|
|
|
|
if (winfo->taos == NULL) {
|
|
|
|
|
if (pThreadInfo->taos == NULL) {
|
|
|
|
|
TAOS * taos = NULL;
|
|
|
|
|
taos = taos_connect(g_queryInfo.host,
|
|
|
|
|
g_queryInfo.user,
|
|
|
|
@ -6216,18 +6261,18 @@ static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
g_queryInfo.port);
|
|
|
|
|
if (taos == NULL) {
|
|
|
|
|
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
|
|
|
|
|
winfo->threadID, taos_errstr(NULL));
|
|
|
|
|
pThreadInfo->threadID, taos_errstr(NULL));
|
|
|
|
|
return NULL;
|
|
|
|
|
} else {
|
|
|
|
|
winfo->taos = taos;
|
|
|
|
|
pThreadInfo->taos = taos;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char sqlStr[MAX_TB_NAME_SIZE*2];
|
|
|
|
|
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
|
|
|
|
|
debugPrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
|
|
|
|
|
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -6236,7 +6281,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
do {
|
|
|
|
|
//if (g_queryInfo.specifiedQueryInfo.rate && (et - st) < g_queryInfo.specifiedQueryInfo.rate*1000) {
|
|
|
|
|
// taosMsleep(g_queryInfo.specifiedQueryInfo.rate*1000 - (et - st)); // ms
|
|
|
|
|
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
|
|
|
|
|
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
|
|
|
|
|
//}
|
|
|
|
|
|
|
|
|
|
//st = taosGetTimestampMs();
|
|
|
|
@ -6246,12 +6291,12 @@ static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
|
|
|
|
if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
tsub[i] = subscribeImpl(winfo->taos,
|
|
|
|
|
tsub[i] = subscribeImpl(pThreadInfo->taos,
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
|
|
|
|
|
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -6272,7 +6317,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
|
|
|
|
|
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
|
|
|
|
|
sprintf(tmpFile, "%s-%d",
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
|
|
|
|
}
|
|
|
|
|
getResult(res, tmpFile);
|
|
|
|
|
}
|
|
|
|
@ -6285,7 +6330,7 @@ static void *superSubscribeProcess(void *sarg) {
|
|
|
|
|
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taos_close(winfo->taos);
|
|
|
|
|
taos_close(pThreadInfo->taos);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -6343,7 +6388,7 @@ static int subscribeTestProcess() {
|
|
|
|
|
threadInfo *t_info = infos + i;
|
|
|
|
|
t_info->threadID = i;
|
|
|
|
|
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
|
|
|
|
|
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
|
|
|
|
|
pthread_create(pids + i, NULL, specifiedSubscribe, t_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//==== create sub threads for query from sub table
|
|
|
|
@ -6386,7 +6431,7 @@ static int subscribeTestProcess() {
|
|
|
|
|
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
|
|
|
|
|
startFrom = t_info->end_table_to + 1;
|
|
|
|
|
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
|
|
|
|
|
pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info);
|
|
|
|
|
pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_queryInfo.superQueryInfo.threadCnt = threads;
|
|
|
|
|