Merge remote-tracking branch 'origin/develop' into feature/m2d
This commit is contained in:
commit
3a80b71bb6
|
@ -403,7 +403,7 @@ typedef struct SuperQueryInfo_S {
|
||||||
uint64_t sqlCount;
|
uint64_t sqlCount;
|
||||||
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
|
||||||
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
|
||||||
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
|
int resubAfterConsume;
|
||||||
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
|
||||||
|
|
||||||
char* childTblName;
|
char* childTblName;
|
||||||
|
@ -4495,6 +4495,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
|
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cJSON* superResubAfterConsume =
|
||||||
|
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
|
||||||
|
if (superResubAfterConsume
|
||||||
|
&& superResubAfterConsume->type == cJSON_Number) {
|
||||||
|
g_queryInfo.superQueryInfo.resubAfterConsume =
|
||||||
|
superResubAfterConsume->valueint;
|
||||||
|
} else if (!superResubAfterConsume) {
|
||||||
|
//printf("failed to read json, subscribe interval no found\n");
|
||||||
|
////goto PARSE_OVER;
|
||||||
|
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
|
||||||
|
}
|
||||||
|
|
||||||
// sqls
|
// sqls
|
||||||
cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
|
cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
|
||||||
if (!subsqls) {
|
if (!subsqls) {
|
||||||
|
@ -4526,18 +4538,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
|
||||||
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
|
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
|
||||||
MAX_QUERY_SQL_LENGTH);
|
MAX_QUERY_SQL_LENGTH);
|
||||||
|
|
||||||
cJSON* superResubAfterConsume =
|
|
||||||
cJSON_GetObjectItem(sql, "resubAfterConsume");
|
|
||||||
if (superResubAfterConsume
|
|
||||||
&& superResubAfterConsume->type == cJSON_Number) {
|
|
||||||
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
|
|
||||||
superResubAfterConsume->valueint;
|
|
||||||
} else if (!superResubAfterConsume) {
|
|
||||||
//printf("failed to read json, subscribe interval no found\n");
|
|
||||||
//goto PARSE_OVER;
|
|
||||||
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
cJSON *result = cJSON_GetObjectItem(sql, "result");
|
||||||
if (result != NULL && result->type == cJSON_String
|
if (result != NULL && result->type == cJSON_String
|
||||||
&& result->valuestring != NULL){
|
&& result->valuestring != NULL){
|
||||||
|
@ -6855,6 +6855,8 @@ static void *superSubscribe(void *sarg) {
|
||||||
}
|
}
|
||||||
TAOS_RES* res = NULL;
|
TAOS_RES* res = NULL;
|
||||||
|
|
||||||
|
uint64_t st = 0, et = 0;
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
for (uint64_t i = pThreadInfo->start_table_from;
|
for (uint64_t i = pThreadInfo->start_table_from;
|
||||||
i <= pThreadInfo->end_table_to; i++) {
|
i <= pThreadInfo->end_table_to; i++) {
|
||||||
|
@ -6863,7 +6865,12 @@ static void *superSubscribe(void *sarg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
st = taosGetTimestampMs();
|
||||||
|
performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et));
|
||||||
res = taos_consume(tsub[tsubSeq]);
|
res = taos_consume(tsub[tsubSeq]);
|
||||||
|
et = taosGetTimestampMs();
|
||||||
|
performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st));
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
||||||
sprintf(pThreadInfo->fp, "%s-%d",
|
sprintf(pThreadInfo->fp, "%s-%d",
|
||||||
|
@ -6881,7 +6888,7 @@ static void *superSubscribe(void *sarg) {
|
||||||
|
|
||||||
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
|
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
|
||||||
&& (consumed[tsubSeq] >=
|
&& (consumed[tsubSeq] >=
|
||||||
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
|
g_queryInfo.superQueryInfo.resubAfterConsume)) {
|
||||||
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
|
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
|
||||||
g_queryInfo.superQueryInfo.subscribeKeepProgress,
|
g_queryInfo.superQueryInfo.subscribeKeepProgress,
|
||||||
pThreadInfo->querySeq);
|
pThreadInfo->querySeq);
|
||||||
|
|
Loading…
Reference in New Issue