Revert "[TD-4533]<fix>: taosdemo resub if resubAfterConsume != -1 (#6243)" (#6289)

This reverts commit 0c4075e09f.
This commit is contained in:
Shuduo Sang 2021-05-29 09:12:19 +08:00 committed by GitHub
parent 5694f03f1a
commit 5456f3a009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 40 deletions

View File

@ -389,7 +389,6 @@ typedef struct SpecifiedQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT]; int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32]; char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT]; int consumed[MAX_QUERY_SQL_COUNT];
@ -412,7 +411,6 @@ typedef struct SuperQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume; int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
@ -4378,17 +4376,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON* endAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "endAfterConsume");
if (endAfterConsume
&& endAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.endAfterConsume[j]
= endAfterConsume->valueint;
} else if (!endAfterConsume) {
// default value is -1, which mean infinite loop
g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1;
}
cJSON* resubAfterConsume = cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume if (resubAfterConsume
@ -4396,8 +4383,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint; = resubAfterConsume->valueint;
} else if (!resubAfterConsume) { } else if (!resubAfterConsume) {
// default value is -1, which mean do not resub //printf("failed to read json, subscribe interval no found\n");
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; //goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
} }
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
@ -4541,26 +4529,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} }
cJSON* superEndAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume");
if (superEndAfterConsume
&& superEndAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume =
superEndAfterConsume->valueint;
} else if (!superEndAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.endAfterConsume = -1;
}
cJSON* superResubAfterConsume = cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "endAfterConsume"); cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if (superResubAfterConsume if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) { && superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.endAfterConsume = g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint; superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) { } else if (!superResubAfterConsume) {
// default value is -1, which mean do not resub //printf("failed to read json, subscribe interval no found\n");
g_queryInfo.superQueryInfo.endAfterConsume = -1; ////goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
} }
// supert table sqls // supert table sqls
@ -7284,10 +7262,7 @@ static void *superSubscribe(void *sarg) {
uint64_t st = 0, et = 0; uint64_t st = 0, et = 0;
while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) while(1) {
|| (g_queryInfo.superQueryInfo.endAfterConsume <
consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from; tsubSeq = i - pThreadInfo->start_table_from;
@ -7316,7 +7291,7 @@ static void *superSubscribe(void *sarg) {
} }
consumed[tsubSeq] ++; consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[tsubSeq] >= && (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume)) { g_queryInfo.superQueryInfo.resubAfterConsume)) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
@ -7398,10 +7373,7 @@ static void *specifiedSubscribe(void *sarg) {
// start loop to consume result // start loop to consume result
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1) while(1) {
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
@ -7420,7 +7392,7 @@ static void *specifiedSubscribe(void *sarg) {
} }
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1) if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >= && (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n", printf("keepProgress:%d, resub specified query: %"PRIu64"\n",