Hotfix/sangshuduo/td 4238 taosdemo async subscribe (#6170)

* [TD-4238]<fix>: taosdemo async subscribe.

* [TD-4238]<fix>: taosdemo async subscribe

subsribe sql command do not use aggregation functions.

* [TD-4238]<fix>: taosdemo async subscribe.

interval.

* [TD-4238]<fix>: taosdemo async subscribe.

fix super table sub result file.

* fix specified sub sync mode.

Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
Shuduo Sang 2021-05-20 14:01:32 +08:00 committed by GitHub
parent 78b8473e03
commit 4697cdfdaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 4 deletions

View File

@ -6682,13 +6682,18 @@ static void *superSubscribe(void *sarg) {
continue; continue;
} }
taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]); res = taos_consume(tsub[i]);
if (res) { if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp); appendResultToFile(res, pThreadInfo->fp);
} }
consumed[i] ++; consumed[i] ++;
@ -6783,9 +6788,15 @@ static void *specifiedSubscribe(void *sarg) {
continue; continue;
} }
taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub); res = taos_consume(tsub);
if (res) { if (res) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
}
consumed ++; consumed ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed >= && (consumed >=