Hotfix/sangshuduo/td 4187 taosdemo keepprogress for develop (#6158)
* [TD-4240]<fix>: taosdemo subscribe more than max query sql count. * add consumed. * [TD-4187]<fix>: taosdemo keepProgress. add resubAfterConsume process. * [TD-4187]<fix>: taosdemo support keepProgress. * [TD-4187]<fix>: taosdemo support keepProgress. for develop branch. Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
a06be90f6f
commit
6f1b157b17
|
@ -5,13 +5,33 @@
|
||||||
"port": 6030,
|
"port": 6030,
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"databases": "dbx",
|
"databases": "test",
|
||||||
"specified_table_query":
|
"specified_table_query": {
|
||||||
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
|
"concurrent": 1,
|
||||||
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
|
"mode": "sync",
|
||||||
},
|
"interval": 1000,
|
||||||
"super_table_query":
|
"restart": "yes",
|
||||||
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
|
"keepProgress": "yes",
|
||||||
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
|
"resubAfterConsume": 10,
|
||||||
}
|
"sqls": [
|
||||||
|
{
|
||||||
|
"sql": "select avg(col1) from meters where col1 > 1;",
|
||||||
|
"result": "./subscribe_res0.txt"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"super_table_query": {
|
||||||
|
"stblname": "meters",
|
||||||
|
"threads": 1,
|
||||||
|
"mode": "sync",
|
||||||
|
"interval": 1000,
|
||||||
|
"restart": "yes",
|
||||||
|
"keepProgress": "yes",
|
||||||
|
"sqls": [
|
||||||
|
{
|
||||||
|
"sql": "select col1 from xxxx where col1 > 10;",
|
||||||
|
"result": "./subscribe_res1.txt"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -372,6 +372,7 @@ typedef struct SpecifiedQueryInfo_S {
|
||||||
int subscribeKeepProgress;
|
int subscribeKeepProgress;
|
||||||
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
||||||
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
||||||
|
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
|
||||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
||||||
uint64_t totalQueried;
|
uint64_t totalQueried;
|
||||||
} SpecifiedQueryInfo;
|
} SpecifiedQueryInfo;
|
||||||
|
@ -390,6 +391,7 @@ typedef struct SuperQueryInfo_S {
|
||||||
uint64_t sqlCount;
|
uint64_t sqlCount;
|
||||||
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
||||||
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
||||||
|
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
|
||||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
||||||
|
|
||||||
char* childTblName;
|
char* childTblName;
|
||||||
|
@ -407,7 +409,7 @@ typedef struct SQueryMetaInfo_S {
|
||||||
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
|
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
|
||||||
|
|
||||||
SpecifiedQueryInfo specifiedQueryInfo;
|
SpecifiedQueryInfo specifiedQueryInfo;
|
||||||
SuperQueryInfo superQueryInfo;
|
SuperQueryInfo superQueryInfo;
|
||||||
uint64_t totalQueried;
|
uint64_t totalQueried;
|
||||||
} SQueryMetaInfo;
|
} SQueryMetaInfo;
|
||||||
|
|
||||||
|
@ -4287,6 +4289,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
||||||
}
|
}
|
||||||
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
|
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
|
||||||
|
|
||||||
|
cJSON* resubAfterConsume =
|
||||||
|
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
|
||||||
|
if (resubAfterConsume
|
||||||
|
&& resubAfterConsume->type == cJSON_Number) {
|
||||||
|
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
|
||||||
|
= resubAfterConsume->valueint;
|
||||||
|
} else if (!resubAfterConsume) {
|
||||||
|
//printf("failed to read json, subscribe interval no found\n");
|
||||||
|
//goto PARSE_OVER;
|
||||||
|
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
||||||
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) {
|
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) {
|
||||||
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
|
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
|
||||||
|
@ -4456,6 +4470,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
||||||
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
|
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
|
||||||
MAX_QUERY_SQL_LENGTH);
|
MAX_QUERY_SQL_LENGTH);
|
||||||
|
|
||||||
|
cJSON* superResubAfterConsume =
|
||||||
|
cJSON_GetObjectItem(sql, "resubAfterConsume");
|
||||||
|
if (superResubAfterConsume
|
||||||
|
&& superResubAfterConsume->type == cJSON_Number) {
|
||||||
|
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
|
||||||
|
superResubAfterConsume->valueint;
|
||||||
|
} else if (!superResubAfterConsume) {
|
||||||
|
//printf("failed to read json, subscribe interval no found\n");
|
||||||
|
//goto PARSE_OVER;
|
||||||
|
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
|
||||||
|
}
|
||||||
|
|
||||||
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
||||||
if (result != NULL && result->type == cJSON_String
|
if (result != NULL && result->type == cJSON_String
|
||||||
&& result->valuestring != NULL){
|
&& result->valuestring != NULL){
|
||||||
|
@ -6546,17 +6572,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
|
||||||
}
|
}
|
||||||
|
|
||||||
static TAOS_SUB* subscribeImpl(
|
static TAOS_SUB* subscribeImpl(
|
||||||
TAOS *taos, char *sql, char* topic, char* resultFileName) {
|
TAOS *taos, char *sql, char* topic, bool restart,
|
||||||
|
char* resultFileName) {
|
||||||
TAOS_SUB* tsub = NULL;
|
TAOS_SUB* tsub = NULL;
|
||||||
|
|
||||||
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
|
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
|
||||||
tsub = taos_subscribe(taos,
|
tsub = taos_subscribe(taos,
|
||||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
restart,
|
||||||
topic, sql, subscribe_callback, (void*)resultFileName,
|
topic, sql, subscribe_callback, (void*)resultFileName,
|
||||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||||
} else {
|
} else {
|
||||||
tsub = taos_subscribe(taos,
|
tsub = taos_subscribe(taos,
|
||||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
restart,
|
||||||
topic, sql, NULL, NULL, 0);
|
topic, sql, NULL, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6610,6 +6637,7 @@ static void *superSubscribe(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t subSeq;
|
||||||
char topic[32] = {0};
|
char topic[32] = {0};
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
|
@ -6623,10 +6651,12 @@ static void *superSubscribe(void *sarg) {
|
||||||
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
|
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
||||||
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
|
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
|
||||||
__func__, __LINE__, subSeq, subSqlstr);
|
__func__, __LINE__, subSeq, subSqlstr);
|
||||||
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
|
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic,
|
||||||
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
|
tmpFile);
|
||||||
if (NULL == tsub[subSeq]) {
|
if (NULL == tsub[subSeq]) {
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -6634,6 +6664,10 @@ static void *superSubscribe(void *sarg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int consumed[MAX_QUERY_SQL_COUNT];
|
||||||
|
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
|
||||||
|
consumed[i] = 0;
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
TAOS_RES* res = NULL;
|
TAOS_RES* res = NULL;
|
||||||
while(1) {
|
while(1) {
|
||||||
|
@ -6643,7 +6677,7 @@ static void *superSubscribe(void *sarg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
||||||
taosMsleep(100); // ms
|
taosMsleep(100); // ms
|
||||||
res = taos_consume(tsub[subSeq]);
|
res = taos_consume(tsub[subSeq]);
|
||||||
if (res) {
|
if (res) {
|
||||||
|
@ -6654,6 +6688,28 @@ static void *superSubscribe(void *sarg) {
|
||||||
pThreadInfo->threadID);
|
pThreadInfo->threadID);
|
||||||
appendResultToFile(res, tmpFile);
|
appendResultToFile(res, tmpFile);
|
||||||
}
|
}
|
||||||
|
consumed[j] ++;
|
||||||
|
|
||||||
|
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
|
||||||
|
&& (consumed[j] >=
|
||||||
|
g_queryInfo.superQueryInfo.resubAfterConsume[j])) {
|
||||||
|
printf("keepProgress:%d, resub super table query: %d\n",
|
||||||
|
g_queryInfo.superQueryInfo.subscribeKeepProgress, j);
|
||||||
|
taos_unsubscribe(tsub[subSeq],
|
||||||
|
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||||
|
consumed[j]= 0;
|
||||||
|
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
||||||
|
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
|
||||||
|
__func__, __LINE__, subSeq, subSqlstr);
|
||||||
|
tsub[subSeq] = subscribeImpl(
|
||||||
|
pThreadInfo->taos, subSqlstr, topic,
|
||||||
|
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||||
|
tmpFile);
|
||||||
|
if (NULL == tsub[subSeq]) {
|
||||||
|
taos_close(pThreadInfo->taos);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6663,9 +6719,8 @@ static void *superSubscribe(void *sarg) {
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
|
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
|
||||||
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
|
||||||
taos_unsubscribe(tsub[subSeq],
|
taos_unsubscribe(tsub[subSeq], 0);
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6713,14 +6768,22 @@ static void *specifiedSubscribe(void *sarg) {
|
||||||
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
||||||
}
|
}
|
||||||
tsub[i] = subscribeImpl(pThreadInfo->taos,
|
tsub[i] = subscribeImpl(pThreadInfo->taos,
|
||||||
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
|
g_queryInfo.specifiedQueryInfo.sql[i], topic,
|
||||||
|
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||||
|
tmpFile);
|
||||||
if (NULL == tsub[i]) {
|
if (NULL == tsub[i]) {
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// start loop to consume result
|
// start loop to consume result
|
||||||
TAOS_RES* res = NULL;
|
TAOS_RES* res = NULL;
|
||||||
|
|
||||||
|
int consumed[MAX_QUERY_SQL_COUNT];
|
||||||
|
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
|
||||||
|
consumed[i] = 0;
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||||
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
|
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
|
||||||
|
@ -6736,14 +6799,32 @@ static void *specifiedSubscribe(void *sarg) {
|
||||||
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
|
||||||
appendResultToFile(res, tmpFile);
|
appendResultToFile(res, tmpFile);
|
||||||
}
|
}
|
||||||
|
consumed[i] ++;
|
||||||
|
|
||||||
|
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
|
||||||
|
&& (consumed[i] >=
|
||||||
|
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) {
|
||||||
|
printf("keepProgress:%d, resub specified query: %d\n",
|
||||||
|
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i);
|
||||||
|
consumed[i] = 0;
|
||||||
|
taos_unsubscribe(tsub[i],
|
||||||
|
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||||
|
tsub[i] = subscribeImpl(pThreadInfo->taos,
|
||||||
|
g_queryInfo.specifiedQueryInfo.sql[i], topic,
|
||||||
|
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||||
|
tmpFile);
|
||||||
|
if (NULL == tsub[i]) {
|
||||||
|
taos_close(pThreadInfo->taos);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
|
|
||||||
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
|
||||||
taos_unsubscribe(tsub[i],
|
taos_unsubscribe(tsub[i], 0);
|
||||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_close(pThreadInfo->taos);
|
taos_close(pThreadInfo->taos);
|
||||||
|
|
Loading…
Reference in New Issue