single table subscribe is done

This commit is contained in:
localvar 2020-05-12 16:49:43 +08:00
parent a36937624e
commit 577cf92fdf
2 changed files with 56 additions and 31 deletions

View File

@ -4185,6 +4185,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) { } else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
if(pQInfo->groupInfo.numOfTables == 1) {
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SGroupItem* pItem = taosArrayGet(pa, 0);
cond.twindow = pItem->info->win;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo); pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
} }
} }
@ -4903,6 +4909,12 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo;
tidInfo.uid = pQuery->current->id.uid;
tidInfo.tid = pQuery->current->id.tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
} }
if (!isTSCompQuery(pQuery)) { if (!isTSCompQuery(pQuery)) {
@ -5195,7 +5207,6 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key); pTableIdInfo->key = htobe64(pTableIdInfo->key);
printf("createTableIdList: uid = %ld, key = %ld\n", pTableIdInfo->uid, pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo); taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
@ -5759,7 +5770,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
// not a problem at present because we only use their 1st int64_t field // not a problem at present because we only use their 1st int64_t field
STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id ); STableIdInfo* pTableId = taosArraySearch( pTableIdList, compareTableIdInfo, &id );
if (pTableId != NULL ) { if (pTableId != NULL ) {
printf("create QInfoImpl: %ld %ld\n", pTableId->uid, pTableId->key);
window.skey = pTableId->key; window.skey = pTableId->key;
} else { } else {
window.skey = INT64_MIN; window.skey = INT64_MIN;

View File

@ -56,32 +56,46 @@ void run_test(TAOS* taos) {
taos_query(taos, "drop database if exists test;"); taos_query(taos, "drop database if exists test;");
usleep(100000); usleep(100000);
taos_query(taos, "create database test tables 5;"); //taos_query(taos, "create database test tables 5;");
taos_query(taos, "create database test;");
usleep(100000); usleep(100000);
taos_query(taos, "use test;"); taos_query(taos, "use test;");
usleep(100000);
taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); usleep(100000);
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); taos_query(taos, "create table t0 using meters tags(0);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); taos_query(taos, "create table t1 using meters tags(1);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); taos_query(taos, "create table t2 using meters tags(2);");
taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "create table t3 using meters tags(3);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"); taos_query(taos, "create table t4 using meters tags(4);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); taos_query(taos, "create table t5 using meters tags(5);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); taos_query(taos, "create table t6 using meters tags(6);");
taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); taos_query(taos, "create table t7 using meters tags(7);");
taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "create table t8 using meters tags(8);");
taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "create table t9 using meters tags(9);");
taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');");
taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"); taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription // super tables subscription
usleep(1000000);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub); TAOS_RES* res = taos_consume(tsub);
@ -90,23 +104,23 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');"); taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');"); taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');"); taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');"); taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription // keep progress information and restart subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 24); check_row_count(__LINE__, res, 24);
@ -133,7 +147,7 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');"); taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
@ -197,7 +211,7 @@ int main(int argc, char *argv[]) {
// init TAOS // init TAOS
taos_init(); taos_init();
TAOS* taos = taos_connect(host, user, passwd, "test", 0); TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) { if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1); exit(1);
@ -209,6 +223,7 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
taos_query(taos, "use test;");
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (async) { if (async) {
// create an asynchronized subscription, the callback function will be called every 1s // create an asynchronized subscription, the callback function will be called every 1s