fix:get dest table version
This commit is contained in:
parent
fa8d9a62f2
commit
5acf665b7e
|
@ -983,11 +983,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||||
|
|
||||||
/*A(pTask->tbSink.pSchemaWrapper);*/
|
int32_t version = 1;
|
||||||
/*A(pTask->tbSink.pSchemaWrapper->pSchema);*/
|
SMetaInfo info = {0};
|
||||||
|
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
version = info.skmVer;
|
||||||
|
}
|
||||||
|
|
||||||
pTask->tbSink.pTSchema =
|
pTask->tbSink.pTSchema =
|
||||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
|
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
||||||
ASSERT(pTask->tbSink.pTSchema);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
|
||||||
|
print ===== step1
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print ===== step2
|
||||||
|
|
||||||
|
sql create database test vgroups 4;
|
||||||
|
sql use test;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ;
|
||||||
|
sql insert into t1 values(1648791211000,1,2,3);
|
||||||
|
sql insert into t1 values(1648791212000,2,2,3);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop0:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loop0
|
||||||
|
endi
|
||||||
|
|
||||||
|
print drop stream streams1
|
||||||
|
sql drop stream streams1;
|
||||||
|
|
||||||
|
print alter table streamt1 add column c3 double
|
||||||
|
sql alter table streamt1 add column c3 double;
|
||||||
|
|
||||||
|
print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||||
|
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791213000,1,2,3);
|
||||||
|
sql insert into t1 values(1648791214000,1,2,3);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop1:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 2 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 4 then
|
||||||
|
print rows=$rows
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ======over
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue