add ci and fix issue

This commit is contained in:
54liuyao 2024-09-25 10:29:55 +08:00
parent 2d56c8f058
commit cc3ec61089
3 changed files with 153 additions and 11 deletions

View File

@ -757,6 +757,8 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
(void**)&pPrevPoint->pResPos, &preVLen, &tmpRes);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== set stream interp resutl prev buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pPrevPoint->key.ts, pPrevPoint->key.groupId, tmpRes);
if (tmpRes == TSDB_CODE_SUCCESS) {
QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
setPointBuff(pPrevPoint, pFillSup);

View File

@ -233,10 +233,11 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
qTrace("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
@ -257,7 +258,15 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1 || index == 0) {
if (index >= 0) {
SWinKey* pCurKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pCurKey, pKey) == 0) {
index--;
} else {
qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
}
}
if (index == -1) {
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
@ -276,15 +285,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
streamStateFreeCur(pCur);
return code;
} else {
SWinKey* pPrevKey = NULL;
SWinKey* pCurKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pCurKey, pKey) == 0) {
pPrevKey = taosArrayGet(pWinStates, index - 1);
} else {
pPrevKey = taosArrayGet(pWinStates, index);
qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
}
SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
*pResKey = *pPrevKey;
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
}

View File

@ -0,0 +1,139 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a) as a, interp(b) as b, now from t1 every(2s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now,1,1,1,1.1) (now + 10s,2,2,2,2.1) (now + 20s,3,3,3,3.1);
print sql select * from t1;
sql select * from t1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where a == 1;
sql select * from streamt where a == 1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop0
endi
$loop_count = 0
loop1:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where a == 2;
sql select * from streamt where a == 2;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop1
endi
$loop_count = 0
loop2:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where a == 3;
sql select * from streamt where a == 3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop2
endi
sleep 4000
$loop_count = 0
loop3:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select * from streamt where a == 3;
sql select * from streamt where a == 3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 5 then
print ======rows=$rows
goto loop3
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT