This commit is contained in:
liuyao 2023-09-26 17:19:21 +08:00
parent ca2325f886
commit c95dc167b7
7 changed files with 287 additions and 38 deletions

View File

@ -32,7 +32,7 @@ typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen); typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen, bool invalid);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num); typedef void* (*_state_buff_create_statekeyfn)(SRowBuffPos* pPos, int64_t num);
@ -53,6 +53,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
void putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
@ -74,7 +75,7 @@ int32_t getRowStateRowSize(SStreamFileState* pFileState);
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen); int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen); int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid);
void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff); void sessionWinStateCleanup(void* pBuff);

View File

@ -3106,6 +3106,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
} }
} }
qDebug("===stream===set state cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
pCurWin->winInfo.sessionWin.win.ekey);
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
SStreamStateCur* pCur = SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
@ -3124,6 +3127,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pNextWin->winInfo.isOutput = true; pNextWin->winInfo.isOutput = true;
} }
pAggSup->stateStore.streamStateFreeCur(pCur); pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===set state next win buff. skey:%" PRId64 ", endkey:%" PRId64, pNextWin->winInfo.sessionWin.win.skey,
pNextWin->winInfo.sessionWin.win.ekey);
} }
int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId, int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
@ -3202,9 +3207,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo curWin = {0}; SStateWindowInfo curWin = {0};
SStateWindowInfo nextWin = {0}; SStateWindowInfo nextWin = {0};
setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
if (IS_VALID_SESSION_WIN(nextWin.winInfo)) {
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
}
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
pAggSup->pResultRows, pSeUpdated, pStDeleted); pAggSup->pResultRows, pSeUpdated, pStDeleted);

View File

@ -89,6 +89,7 @@ static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray*
} }
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL; SArray* pWinStates = NULL;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
@ -105,6 +106,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
int32_t size = taosArrayGetSize(pWinStates); int32_t size = taosArrayGetSize(pWinStates);
if (size == 0) { if (size == 0) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
goto _end; goto _end;
} }
@ -138,29 +140,35 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
void* p = NULL; void* p = NULL;
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true; pNewPos->needFree = true;
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, *pVLen); memcpy(pNewPos->pRowBuff, p, *pVLen);
}
taosMemoryFree(p); taosMemoryFree(p);
(*pVal) = pNewPos; (*pVal) = pNewPos;
code = code_file;
goto _end; goto _end;
} else {
taosMemoryFree(p);
streamFileStateReleaseBuff(pFileState, pNewPos, false);
}
} }
} }
if (index == size - 1) { if (index == size - 1) {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey); (*pVal) = addNewSessionWindow(pFileState, pWinStates, pKey);
code = TSDB_CODE_FAILED;
goto _end; goto _end;
} }
(*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1); (*pVal) = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1);
code = TSDB_CODE_FAILED;
_end: _end:
return (*pVal) != NULL ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; return code;
} }
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
@ -209,7 +217,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) { int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen, bool invalid) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff; SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key; SSessionKey* pWinKey = (SSessionKey*) key;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
@ -221,8 +229,11 @@ int32_t deleteSessionWinStateBuff(void* pBuff, const void *key, size_t keyLen) {
TSKEY gap = 0; TSKEY gap = 0;
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
if (index >= 0) { if (index >= 0) {
SRowBuffPos** ppos = taosArrayGet(pWinStates, index); SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
if (inSessionWindow((*ppos)->pKey, pWinKey->win.skey, gap)) { if (inSessionWindow(pPos->pKey, pWinKey->win.skey, gap)) {
if (invalid) {
pPos->beFlushed = true;
}
taosArrayRemove(pWinStates, index); taosArrayRemove(pWinStates, index);
} }
} }
@ -484,6 +495,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) { if (inSessionWindow(pPos->pKey, startTs, gap) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos; (*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*key = *pDestWinKey; *key = *pDestWinKey;
goto _end; goto _end;
} }
@ -495,27 +507,31 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) { if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) {
(*pVal) = pPos; (*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
*key = *pDestWinKey; *key = *pDestWinKey;
goto _end; goto _end;
} }
} }
if (index + 1 == 0) { if (index + 1 == 0) {
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) { if (!isDeteled(pFileState, endTs)) {
void* p = NULL; void* p = NULL;
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
pNewPos->needFree = true; pNewPos->needFree = true;
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
if (code == TSDB_CODE_SUCCESS) { memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
memcpy(pNewPos->pRowBuff, p, *pVLen); memcpy(pNewPos->pRowBuff, p, *pVLen);
}
taosMemoryFree(p);
(*pVal) = pNewPos; (*pVal) = pNewPos;
taosMemoryFree(p);
code = code_file;
goto _end; goto _end;
} else {
taosMemoryFree(p);
streamFileStateReleaseBuff(pFileState, pNewPos, false);
}
} }
} }

View File

@ -723,6 +723,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
} }
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
streamStateReleaseBuf(pState, pos, true); streamStateReleaseBuf(pState, pos, true);
putFreeBuff(pState->pFileState, pos);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code); key->win.ekey, key->groupId, code);
} else { } else {

View File

@ -57,7 +57,13 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo; typedef SRowBuffPos SRowBuffInfo;
int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) { int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen, bool invalid) {
if (invalid) {
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
if (pos) {
(*pos)->beFlushed = true;
}
}
return tSimpleHashRemove(pBuff, pKey, keyLen); return tSimpleHashRemove(pBuff, pKey, keyLen);
} }
@ -230,7 +236,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
putFreeBuff(pFileState, pPos); putFreeBuff(pFileState, pPos);
if (!all) { if (!all) {
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
} }
destroyRowBuffPos(pPos); destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode); tdListPopNode(pFileState->usedBuffs, pNode);
@ -247,10 +253,10 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi
SListNode* pNode = NULL; SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && i < max) { while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) { if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) {
tdListAppend(pFlushList, &pPos); tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
tdListPopNode(pFileState->usedBuffs, pNode); tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) { if (pPos->pRowBuff) {
@ -284,7 +290,7 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
if (pPos->beUsed == used) { if (pPos->beUsed == used) {
tdListAppend(pFlushList, &pPos); tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen); pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen, false);
tdListPopNode(pFileState->usedBuffs, pNode); tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
if (pPos->pRowBuff) { if (pPos->pRowBuff) {
@ -427,7 +433,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
} }
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen, true);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -462,7 +468,9 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
recoverSessionRowBuff(pFileState, pPos); recoverSessionRowBuff(pFileState, pPos);
(*pVal) = pPos->pRowBuff; (*pVal) = pPos->pRowBuff;
if (!pPos->needFree) {
tdListPrepend(pFileState->usedBuffs, &pPos); tdListPrepend(pFileState->usedBuffs, &pPos);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -509,12 +517,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); if (pPos->beFlushed || !pPos->pRowBuff) {
if (pPos->beFlushed) {
continue; continue;
} }
pPos->beFlushed = true; pPos->beFlushed = true;
qDebug("===stream===flushed start:%" PRId64 ", end:%" PRId64 , ((SSessionKey*)pPos->pKey)->win.skey, ((SSessionKey*)pPos->pKey)->win.ekey);
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch); streamStateClearBatch(batch);

View File

@ -8,6 +8,8 @@ sleep 500
sql connect sql connect
print step1=============
sql create database test vgroups 1; sql create database test vgroups 1;
sql use test; sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
@ -71,7 +73,7 @@ if $rows != 29 then
goto loop1 goto loop1
endi endi
print step2=============
sql create database test2 vgroups 10; sql create database test2 vgroups 10;
sql use test2; sql use test2;
@ -137,12 +139,12 @@ if $rows != 29 then
goto loop3 goto loop3
endi endi
print step2============= print step3=============
sql create database test1 vgroups 1; sql create database test1 vgroups 1;
sql use test1; sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once ignore expired 0 ignore update 1 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s); sql create stream streams1 trigger at_once ignore expired 0 ignore update 0 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s);
sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,1,2,3,1.1); sql insert into t1 values(1648791213000,1,2,3,1.1);

View File

@ -0,0 +1,217 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1=============
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 into streamt3 as select _wstart, count(*) c1 from t1 state_window(a);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,2,3,1.1);
sql insert into t1 values(1648791215000,3,2,3,1.1);
sql insert into t1 values(1648791217000,4,2,3,1.1);
sql insert into t1 values(1648791219000,5,2,3,1.1);
sql insert into t1 values(1648791221000,6,2,3,1.0);
sql insert into t1 values(1648791223000,7,2,3,1.0);
sql insert into t1 values(1648791225000,8,2,3,1.0);
sql insert into t1 values(1648791227000,9,2,3,1.0);
sql insert into t1 values(1648791229000,10,2,3,1.0);
sql insert into t1 values(1648791231000,11,2,3,1.0);
sql insert into t1 values(1648791233000,12,2,3,1.1);
sql insert into t1 values(1648791235000,13,2,3,1.1);
sql insert into t1 values(1648791237000,14,2,3,1.1);
sql insert into t1 values(1648791239000,15,2,3,1.1);
sql insert into t1 values(1648791241000,16,2,3,1.0);
sql insert into t1 values(1648791243000,17,2,3,1.0);
sql insert into t1 values(1648791245000,18,2,3,1.0);
sql insert into t1 values(1648791247000,19,2,3,1.0);
sql insert into t1 values(1648791249000,20,2,3,1.0);
sql insert into t1 values(1648791251000,21,2,3,1.0);
sql insert into t1 values(1648791253000,22,2,3,1.1);
sql insert into t1 values(1648791255000,23,2,3,1.1);
sql insert into t1 values(1648791257000,24,2,3,1.1);
sql insert into t1 values(1648791259000,25,2,3,1.1);
sql insert into t1 values(1648791261000,26,2,3,1.0);
sql insert into t1 values(1648791263000,27,2,3,1.0);
sql insert into t1 values(1648791265000,28,2,3,1.0);
sql insert into t1 values(1648791267000,29,2,3,1.0);
sql insert into t1 values(1648791269000,30,2,3,1.0);
$loop_count = 0
loop8:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop8
endi
sql insert into t1 values(1648791211001,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
sql insert into t1 values(1648791215001,3,2,3,1.1);
sql insert into t1 values(1648791217001,4,2,3,1.1);
sql insert into t1 values(1648791219001,5,2,3,1.1);
sql insert into t1 values(1648791221001,6,2,3,1.0);
sql insert into t1 values(1648791223001,7,2,3,1.0);
sql insert into t1 values(1648791225001,8,2,3,1.0);
sql insert into t1 values(1648791227001,9,2,3,1.0);
sql insert into t1 values(1648791229001,10,2,3,1.0);
$loop_count = 0
loop9:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop9
endi
if $data01 != 2 then
print =====data01=$data01
goto loop9
endi
if $data91 != 2 then
print =====data91=$data91
goto loop9
endi
sql insert into t1 values(1648791231001,11,2,3,1.0);
sql insert into t1 values(1648791233001,12,2,3,1.1);
sql insert into t1 values(1648791235001,13,2,3,1.1);
sql insert into t1 values(1648791237001,14,2,3,1.1);
sql insert into t1 values(1648791239001,15,2,3,1.1);
sql insert into t1 values(1648791241001,16,2,3,1.0);
sql insert into t1 values(1648791243001,17,2,3,1.0);
sql insert into t1 values(1648791245001,18,2,3,1.0);
sql insert into t1 values(1648791247001,19,2,3,1.0);
sql insert into t1 values(1648791249001,20,2,3,1.0);
$loop_count = 0
loop10:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop10
endi
if $data[10][1] != 2 then
print =====data[10][1]=$data[10][1]
goto loop10
endi
if $data[19][1] != 2 then
print =====data[19][1]=$data[19][1]
goto loop10
endi
sql insert into t1 values(1648791251001,21,2,3,1.0);
sql insert into t1 values(1648791253001,22,2,3,1.1);
sql insert into t1 values(1648791255001,23,2,3,1.1);
sql insert into t1 values(1648791257001,24,2,3,1.1);
#///////////////////////
$loop_count = 0
loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop11
endi
if $data[20][1] != 2 then
print =====[20][1]=$[20][1]
goto loop11
endi
#///////////////////////
sql insert into t1 values(1648791259001,25,2,3,1.1);
sql insert into t1 values(1648791261001,26,2,3,1.0);
sql insert into t1 values(1648791263001,27,2,3,1.0);
sql insert into t1 values(1648791265001,28,2,3,1.0);
sql insert into t1 values(1648791267001,29,2,3,1.0);
sql insert into t1 values(1648791269001,30,2,3,1.0);
$loop_count = 0
loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3;
sql select * from streamt3;
if $rows != 30 then
print =====rows=$rows
goto loop11
endi
if $data[20][1] != 2 then
print =====[20][1]=$[20][1]
goto loop11
endi
if $data[29][1] != 2 then
print =====[29][1]=$[29][1]
goto loop11
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT