Merge branch 'master' of github.com:taosdata/TDengine into test/chr

This commit is contained in:
tomchon 2021-05-19 17:55:36 +08:00
commit e0a896ffe0
2 changed files with 267 additions and 204 deletions

View File

@ -0,0 +1,41 @@
{
"filetype": "subscribe",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "test",
"specified_table_query": {
"concurrent": 1,
"mode": "async",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select col1 from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
},
{
"sql": "select col2 from meters where col2 > 1;",
"result": "./subscribe_res2.txt"
}
]
},
"super_table_query": {
"stblname": "meters",
"threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
}
}

View File

@ -441,8 +441,9 @@ typedef struct SThreadInfo_S {
uint64_t maxDelay; uint64_t maxDelay;
uint64_t minDelay; uint64_t minDelay;
// query // seq of query or subscribe
uint64_t querySeq; // sequence number of sql command uint64_t querySeq; // sequence number of sql command
} threadInfo; } threadInfo;
#ifdef WINDOWS #ifdef WINDOWS
@ -1107,7 +1108,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
} }
} }
fprintf(fp, "%s", resultBuf); fprintf(fp, "%s", resultBuf);
tmfclose(fp); tmfclose(fp);
} }
@ -1150,7 +1150,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
} }
static void selectAndGetResult( static void selectAndGetResult(
threadInfo *pThreadInfo, char *command, char* resultFile) threadInfo *pThreadInfo, char *command)
{ {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command); TAOS_RES *res = taos_query(pThreadInfo->taos, command);
@ -1161,8 +1161,8 @@ static void selectAndGetResult(
return; return;
} }
if ((resultFile) && (strlen(resultFile))) { if ((strlen(pThreadInfo->fp))) {
appendResultToFile(res, resultFile); appendResultToFile(res, pThreadInfo->fp);
} }
taos_free_result(res); taos_free_result(res);
@ -1170,7 +1170,7 @@ static void selectAndGetResult(
int retCode = postProceSql( int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command, command,
resultFile); pThreadInfo->fp);
if (0 != retCode) { if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
} }
@ -6230,23 +6230,22 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) { (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
} }
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
st = taosGetTimestampMs(); st = taosGetTimestampMs();
selectAndGetResult(pThreadInfo, selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
et = taosGetTimestampMs(); et = taosGetTimestampMs();
printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n", printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
@ -6332,13 +6331,12 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr)); memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[j], g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
selectAndGetResult(pThreadInfo, sqlstr, tmpFile); selectAndGetResult(pThreadInfo, sqlstr);
totalQueried++; totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++; g_queryInfo.superQueryInfo.totalQueried ++;
@ -6407,7 +6405,7 @@ static int queryTestProcess() {
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
@ -6421,11 +6419,12 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
for (int i = 0; i < nConcurrent; i++) { for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nSqlCount; j++) { for (int j = 0; j < nConcurrent; j++) {
threadInfo *t_info = infos + i * nSqlCount + j; uint64_t seq = i * nConcurrent + j;
t_info->threadID = i * nSqlCount + j; threadInfo *t_info = infos + seq;
t_info->querySeq = j; t_info->threadID = seq;
t_info->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
@ -6444,7 +6443,7 @@ static int queryTestProcess() {
t_info->taos = NULL;// TODO: workaround to use separate taos connection; t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery, pthread_create(pids + seq, NULL, specifiedTableQuery,
t_info); t_info);
} }
} }
@ -6531,7 +6530,8 @@ static int queryTestProcess() {
return 0; return 0;
} }
static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { static void stable_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res)); __func__, __LINE__, code, taos_errstr(res));
@ -6539,22 +6539,44 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
} }
if (param) if (param)
appendResultToFile(res, (char*)param); appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result.
}
static void specified_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (param)
appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result. // tao_unscribe() will free result.
} }
static TAOS_SUB* subscribeImpl( static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, bool restart, threadInfo *pThreadInfo,
char* resultFileName) { char *sql, char* topic, bool restart)
{
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(
pThreadInfo->taos,
restart, restart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, stable_sub_callback, (void*)pThreadInfo,
g_queryInfo.superQueryInfo.subscribeInterval);
} else { } else {
tsub = taos_subscribe(taos, tsub = taos_subscribe(
pThreadInfo->taos,
restart, restart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, 0);
} }
@ -6572,13 +6594,8 @@ static void *superSubscribe(void *sarg) {
char subSqlstr[MAX_QUERY_SQL_LENGTH]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.superQueryInfo.sqlCount == 0) if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
return NULL; errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
pThreadInfo->ntables, pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT); MAX_QUERY_SQL_COUNT);
exit(-1); exit(-1);
@ -6612,72 +6629,68 @@ static void *superSubscribe(void *sarg) {
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j); i, pThreadInfo->querySeq);
memset(subSqlstr,0,sizeof(subSqlstr)); memset(subSqlstr,0,sizeof(subSqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i); replaceChildTblName(
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { subSqlstr, i);
sprintf(tmpFile, "%s-%d", if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
} }
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr);
__func__, __LINE__, subSeq, subSqlstr); tsub[i] = subscribeImpl(
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart);
tmpFile); if (NULL == tsub[i]) {
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
}
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result // start loop to consume result
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) {
consumed[i] = 0;
}
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue; continue;
} }
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[subSeq]); res = taos_consume(tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(pThreadInfo->fp, "%s-%d",
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, pThreadInfo->fp);
} }
consumed[j] ++; consumed[i] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >= && (consumed[i] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) { g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub super table query: %d\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j); g_queryInfo.superQueryInfo.subscribeKeepProgress,
taos_unsubscribe(tsub[subSeq], pThreadInfo->querySeq);
taos_unsubscribe(tsub,
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0; consumed[i]= 0;
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; tsub[i] = subscribeImpl(
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", pThreadInfo, subSqlstr, topic,
__func__, __LINE__, subSeq, subSqlstr); g_queryInfo.superQueryInfo.subscribeRestart
tsub[subSeq] = subscribeImpl( );
pThreadInfo->taos, subSqlstr, topic, if (NULL == tsub[i]) {
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
@ -6685,15 +6698,11 @@ static void *superSubscribe(void *sarg) {
} }
} }
} }
}
taos_free_result(res); taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { taos_unsubscribe(tsub[i], 0);
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq], 0);
}
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
@ -6702,10 +6711,7 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.sqlCount == 0)
return NULL;
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
@ -6732,76 +6738,61 @@ static void *specifiedSubscribe(void *sarg) {
} }
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq);
sprintf(topic, "taosdemo-subscribe-%d", i); if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
tsub[i] = subscribeImpl(pThreadInfo->taos, tsub = subscribeImpl(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[i], topic, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.subscribeRestart, topic,
tmpFile); g_queryInfo.specifiedQueryInfo.subscribeRestart);
if (NULL == tsub[i]) { if (NULL == tsub) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
}
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT]; int consumed;
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]); res = taos_consume(tsub);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; consumed ++;
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
}
consumed[i] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed[i] >= && (consumed >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %d\n", printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
consumed[i] = 0; pThreadInfo->querySeq);
taos_unsubscribe(tsub[i], consumed = 0;
taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub[i] = subscribeImpl(pThreadInfo->taos, tsub = subscribeImpl(
g_queryInfo.specifiedQueryInfo.sql[i], topic, pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
tmpFile); topic,
if (NULL == tsub[i]) { g_queryInfo.specifiedQueryInfo.subscribeRestart
);
if (NULL == tsub) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
} }
}
taos_free_result(res); taos_free_result(res);
taos_unsubscribe(tsub, 0);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], 0);
}
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
@ -6836,7 +6827,11 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query for specified table
pthread_t *pidsOfStable = NULL;
threadInfo *infosOfStable = NULL;
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__, __func__, __LINE__,
@ -6850,32 +6845,47 @@ static int subscribeTestProcess() {
} }
pids = malloc( pids = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(pthread_t));
infos = malloc( infos = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
threadInfo *t_info = infos + i; for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
t_info->threadID = i; uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, specifiedSubscribe, t_info); pthread_create(pids + seq, NULL, specifiedSubscribe, t_info);
}
} }
} }
//==== create sub threads for super table query //==== create threads for super table query
pthread_t *pidsOfSub = NULL; if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
threadInfo *infosOfSub = NULL; printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
} else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * pidsOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t)); sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * infosOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfStable) || (NULL == infosOfStable)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__); __func__, __LINE__);
// taos_close(taos); // taos_close(taos);
@ -6897,34 +6907,46 @@ static int subscribeTestProcess() {
} }
uint64_t startFrom = 0; uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) { for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
threadInfo *t_info = infosOfSub + i; for (int j = 0; j < threads; j++) {
t_info->threadID = i; uint64_t seq = i * threads + j;
threadInfo *t_info = infosOfStable + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->start_table_from = startFrom; t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a; t_info->ntables = j<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = t_info->end_table_to + 1; startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info); pthread_create(pidsOfStable + seq,
NULL, superSubscribe, t_info);
}
} }
g_queryInfo.superQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
pthread_join(pidsOfSub[i], NULL); for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
}
}
} }
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
pthread_join(pids[i], NULL); for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
}
} }
tmfree((char*)pids); tmfree((char*)pids);
tmfree((char*)infos); tmfree((char*)infos);
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfStable);
tmfree((char*)infosOfSub); tmfree((char*)infosOfStable);
// taos_close(taos); // taos_close(taos);
return 0; return 0;
} }