fix issue for streamscan
This commit is contained in:
parent
62e609c558
commit
53ae014354
|
@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t*
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
|
||||
|
||||
*pLen = len;
|
||||
|
||||
_end:
|
||||
|
@ -3475,21 +3477,19 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
goto _end;
|
||||
}
|
||||
|
||||
if (pInfo->pUpdateInfo != NULL) {
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
pInfo->pUpdateInfo = pUpInfo;
|
||||
} else {
|
||||
taosMemoryFree(pUpInfo);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
pInfo->pUpdateInfo = pUpInfo;
|
||||
} else {
|
||||
taosMemoryFree(pUpInfo);
|
||||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (tDecodeIsEnd(pDeCoder)) {
|
||||
|
@ -3509,6 +3509,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
lino = __LINE__;
|
||||
goto _end;
|
||||
}
|
||||
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
|
||||
|
||||
_end:
|
||||
if (pDeCoder != NULL) {
|
||||
|
|
|
@ -445,6 +445,7 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
if (!pInfo) {
|
||||
tEncodeI32(pEncoder, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -550,6 +551,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
|
|||
|
||||
int32_t size = 0;
|
||||
if (tDecodeI32(pDeCoder, &size) < 0) return -1;
|
||||
|
||||
if (size < 0) {
|
||||
return -1;
|
||||
}
|
||||
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
|
||||
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
|
||||
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sleep 70000
|
||||
|
||||
|
||||
print restart taosd 01 ......
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3000a,1,1,1);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print select * from streamt1;
|
||||
sql select * from streamt1;
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
if $rows == 0 then
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
print select * from streamt2;
|
||||
sql select * from streamt2;
|
||||
|
||||
print $data00 $data01 $data02
|
||||
|
||||
if $rows == 0 then
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue