subscribe: fix bug in multi-vnode subscription

This commit is contained in:
localvar 2020-01-18 14:43:29 +08:00
parent fa8baf8022
commit 35976735df
2 changed files with 18 additions and 14 deletions

View File

@ -362,6 +362,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
} }
for (int retry = 0; retry < 3; retry++) { for (int retry = 0; retry < 3; retry++) {
tscRemoveFromSqlList(pSql);
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) {
tscTrace("begin meter synchronization"); tscTrace("begin meter synchronization");
char* sqlstr = pSql->sqlstr; char* sqlstr = pSql->sqlstr;
@ -380,6 +382,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->thandle = NULL; pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->cmd.type = type; pSql->cmd.type = type;
tscGetMeterMetaInfo(&pSql->cmd, 0)->vnodeIndex = 0;
} }
tscDoQuery(pSql); tscDoQuery(pSql);

View File

@ -49,7 +49,7 @@ void check_row_count(int line, TAOS_RES* res, int expected) {
void run_test(TAOS* taos) { void run_test(TAOS* taos) {
taos_query(taos, "drop database test;"); taos_query(taos, "drop database if exists test;");
usleep(100000); usleep(100000);
taos_query(taos, "create database test tables 5;"); taos_query(taos, "create database test tables 5;");
@ -86,21 +86,26 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');"); taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');"); taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');"); taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription // keep progress information and restart subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 22); check_row_count(__LINE__, res, 24);
// keep progress information and continue previous subscription // keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
@ -112,27 +117,22 @@ void run_test(TAOS* taos) {
taos_unsubscribe(tsub, 0); taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 22); check_row_count(__LINE__, res, 24);
// single meter subscription // single meter subscription
taos_unsubscribe(tsub, 0); taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0); tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 4); check_row_count(__LINE__, res, 5);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
taos_unsubscribe(tsub, 0); taos_unsubscribe(tsub, 0);
} }