diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 406544b306..a8af72d36a 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -389,7 +389,6 @@ typedef struct SpecifiedQueryInfo_S { char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume[MAX_QUERY_SQL_COUNT]; - int endAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char topic[MAX_QUERY_SQL_COUNT][32]; int consumed[MAX_QUERY_SQL_COUNT]; @@ -412,7 +411,6 @@ typedef struct SuperQueryInfo_S { char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume; - int endAfterConsume; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; @@ -4378,17 +4376,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); - cJSON* endAfterConsume = - cJSON_GetObjectItem(specifiedQuery, "endAfterConsume"); - if (endAfterConsume - && endAfterConsume->type == cJSON_Number) { - g_queryInfo.specifiedQueryInfo.endAfterConsume[j] - = endAfterConsume->valueint; - } else if (!endAfterConsume) { - // default value is -1, which mean infinite loop - g_queryInfo.specifiedQueryInfo.endAfterConsume[j] = -1; - } - cJSON* resubAfterConsume = cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); if (resubAfterConsume @@ -4396,8 +4383,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = resubAfterConsume->valueint; } else if (!resubAfterConsume) { - // default value is -1, which mean do not resub - g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1; + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1; } cJSON *result = cJSON_GetObjectItem(sql, "result"); @@ -4541,26 +4529,16 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } - cJSON* superEndAfterConsume = - cJSON_GetObjectItem(superQuery, "endAfterConsume"); - if (superEndAfterConsume - && superEndAfterConsume->type == cJSON_Number) { - g_queryInfo.superQueryInfo.endAfterConsume = - superEndAfterConsume->valueint; - } else if (!superEndAfterConsume) { - // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.endAfterConsume = -1; - } - cJSON* superResubAfterConsume = - cJSON_GetObjectItem(superQuery, "endAfterConsume"); + cJSON_GetObjectItem(superQuery, "resubAfterConsume"); if (superResubAfterConsume && superResubAfterConsume->type == cJSON_Number) { - g_queryInfo.superQueryInfo.endAfterConsume = + g_queryInfo.superQueryInfo.resubAfterConsume = superResubAfterConsume->valueint; } else if (!superResubAfterConsume) { - // default value is -1, which mean do not resub - g_queryInfo.superQueryInfo.endAfterConsume = -1; + //printf("failed to read json, subscribe interval no found\n"); + ////goto PARSE_OVER; + g_queryInfo.superQueryInfo.resubAfterConsume = 1; } // supert table sqls @@ -7284,10 +7262,7 @@ static void *superSubscribe(void *sarg) { uint64_t st = 0, et = 0; - while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) - || (g_queryInfo.superQueryInfo.endAfterConsume < - consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) { - + while(1) { for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { tsubSeq = i - pThreadInfo->start_table_from; @@ -7316,7 +7291,7 @@ static void *superSubscribe(void *sarg) { } consumed[tsubSeq] ++; - if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1) + if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) && (consumed[tsubSeq] >= g_queryInfo.superQueryInfo.resubAfterConsume)) { printf("keepProgress:%d, resub super table query: %"PRIu64"\n", @@ -7398,10 +7373,7 @@ static void *specifiedSubscribe(void *sarg) { // start loop to consume result g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; - while((g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq] == -1) - || (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] < - g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) { - + while(1) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } @@ -7420,7 +7392,7 @@ static void *specifiedSubscribe(void *sarg) { } g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; - if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1) + if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) && (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] >= g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { printf("keepProgress:%d, resub specified query: %"PRIu64"\n",