Merge pull request #28854 from taosdata/fix/TD-32942

fix(stream):fix issue for streamscan
This commit is contained in:
Shengliang Guan 2024-11-21 14:47:02 +08:00 committed by GitHub
commit 6b4d966bf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 93 additions and 15 deletions

View File

@ -3410,6 +3410,8 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t*
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
*pLen = len; *pLen = len;
_end: _end:
@ -3475,21 +3477,20 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
goto _end; goto _end;
} }
if (pInfo->pUpdateInfo != NULL) { void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); if (!pUpInfo) {
if (!pUpInfo) { lino = __LINE__;
lino = __LINE__; goto _end;
goto _end; }
} code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo);
code = pInfo->stateStore.updateInfoDeserialize(pDeCoder, pUpInfo); if (code == TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) { pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->pUpdateInfo = pUpInfo;
pInfo->pUpdateInfo = pUpInfo; qDebug("%s line:%d. stream scan updateinfo deserialize success", __func__, __LINE__);
} else { } else {
taosMemoryFree(pUpInfo); taosMemoryFree(pUpInfo);
lino = __LINE__; code = TSDB_CODE_SUCCESS;
goto _end; qDebug("%s line:%d. stream scan did not have updateinfo", __func__, __LINE__);
}
} }
if (tDecodeIsEnd(pDeCoder)) { if (tDecodeIsEnd(pDeCoder)) {
@ -3509,6 +3510,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
lino = __LINE__; lino = __LINE__;
goto _end; goto _end;
} }
qDebug("%s last scan range %d. %" PRId64 ",%" PRId64, __func__, __LINE__, pInfo->lastScanRange.skey, pInfo->lastScanRange.ekey);
_end: _end:
if (pDeCoder != NULL) { if (pDeCoder != NULL) {

View File

@ -445,6 +445,11 @@ int32_t updateInfoSerialize(SEncoder* pEncoder, const SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
if (!pInfo) { if (!pInfo) {
if (tEncodeI32(pEncoder, -1) < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
uDebug("%s line:%d. it did not have updateinfo", __func__, __LINE__);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -550,6 +555,10 @@ int32_t updateInfoDeserialize(SDecoder* pDeCoder, SUpdateInfo* pInfo) {
int32_t size = 0; int32_t size = 0;
if (tDecodeI32(pDeCoder, &size) < 0) return -1; if (tDecodeI32(pDeCoder, &size) < 0) return -1;
if (size < 0) {
return -1;
}
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);

View File

@ -0,0 +1,67 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
run tsim/stream/checkTaskStatus.sim
sleep 70000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print select * from streamt2;
sql select * from streamt2;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT