add checksum

This commit is contained in:
liuyao 2023-07-17 18:04:17 +08:00
parent 14b9d920ba
commit d45adeaa4e
2 changed files with 277 additions and 23 deletions

View File

@ -18,6 +18,7 @@
#include "functionMgt.h" #include "functionMgt.h"
#include "operator.h" #include "operator.h"
#include "querytask.h" #include "querytask.h"
#include "tchecksum.h"
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -2567,12 +2568,14 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) {
return buf; return buf;
} }
int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return 0; return 0;
} }
void* pData = (buf == NULL) ? NULL : *buf;
// 1.pResultRowHashTable // 1.pResultRowHashTable
int32_t tlen = 0; int32_t tlen = 0;
int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable); int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable);
@ -2613,15 +2616,32 @@ int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) {
// 5.dataVersion // 5.dataVersion
tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); tlen += taosEncodeFixedI64(buf, pInfo->dataVersion);
// 6.checksum
if (buf) {
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
tlen += taosEncodeFixedU32(buf, cksum);
} else {
tlen += sizeof(uint32_t);
}
return tlen; return tlen;
} }
void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) { void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return ; return ;
} }
// 6.checksum
int32_t dataLen = len - sizeof(uint32_t);
void* pCksum = POINTER_SHIFT(buf, dataLen);
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
ASSERT(0); // debug
qError("stream interval state is invalid");
return;
}
// 1.pResultRowHashTable // 1.pResultRowHashTable
int32_t mapSize = 0; int32_t mapSize = 0;
buf = taosDecodeFixedI32(buf, &mapSize); buf = taosDecodeFixedI32(buf, &mapSize);
@ -2662,10 +2682,10 @@ void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) {
void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t len = doStreamIntervalEncodeOpState(NULL, pOperator); int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
len = doStreamIntervalEncodeOpState(&pBuf, pOperator); len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf); taosMemoryFree(buf);
@ -2816,8 +2836,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamIntervalSaveCheckpoint(pOperator);
pAPI->stateStore.streamStateCommit(pInfo->pState); pAPI->stateStore.streamStateCommit(pInfo->pState);
doStreamIntervalSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else { } else {
@ -3075,7 +3095,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t len = 0; int32_t len = 0;
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
} }
@ -3794,12 +3814,14 @@ void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen)
return buf; return buf;
} }
int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) { int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return 0; return 0;
} }
void* pData = (buf == NULL) ? NULL : *buf;
// 1.streamAggSup.pResultRows // 1.streamAggSup.pResultRows
int32_t tlen = 0; int32_t tlen = 0;
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
@ -3821,21 +3843,42 @@ int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) {
tlen += taosEncodeFixedI32(buf, size); tlen += taosEncodeFixedI32(buf, size);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
tlen += doStreamSessionEncodeOpState(buf, pChOp); tlen += doStreamSessionEncodeOpState(buf, 0, pChOp, false);
} }
// 4.dataVersion // 4.dataVersion
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
// 5.checksum
if (isParent) {
if (buf) {
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
tlen += taosEncodeFixedU32(buf, cksum);
} else {
tlen += sizeof(uint32_t);
}
}
return tlen; return tlen;
} }
void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) { void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return buf; return buf;
} }
// 5.checksum
if (isParent) {
int32_t dataLen = len - sizeof(uint32_t);
void* pCksum = POINTER_SHIFT(buf, dataLen);
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
ASSERT(0); // debug
qError("stream interval state is invalid");
return buf;
}
}
// 1.streamAggSup.pResultRows // 1.streamAggSup.pResultRows
int32_t mapSize = 0; int32_t mapSize = 0;
buf = taosDecodeFixedI32(buf, &mapSize); buf = taosDecodeFixedI32(buf, &mapSize);
@ -3856,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
buf = doStreamSessionDecodeOpState(buf, pChOp); buf = doStreamSessionDecodeOpState(buf, 0, pChOp, false);
} }
// 4.dataVersion // 4.dataVersion
@ -3866,10 +3909,10 @@ void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) {
void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t len = doStreamSessionEncodeOpState(NULL, pOperator); int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len); void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf; void* pBuf = buf;
len = doStreamSessionEncodeOpState(&pBuf, pOperator); len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
} }
@ -3941,8 +3984,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator);
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamSessionSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else { } else {
@ -4141,7 +4184,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
int32_t len = 0; int32_t len = 0;
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, pOperator); doStreamSessionDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff); taosMemoryFree(buff);
} }
@ -4242,8 +4285,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator);
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamSessionSaveCheckpoint(pOperator);
continue; continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
@ -4541,12 +4584,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
} }
} }
int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) { int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return 0; return 0;
} }
void* pData = (buf == NULL) ? NULL : *buf;
// 1.streamAggSup.pResultRows // 1.streamAggSup.pResultRows
int32_t tlen = 0; int32_t tlen = 0;
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
@ -4568,21 +4613,42 @@ int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) {
tlen += taosEncodeFixedI32(buf, size); tlen += taosEncodeFixedI32(buf, size);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
tlen += doStreamSessionEncodeOpState(buf, pChOp); tlen += doStreamStateEncodeOpState(buf, 0, pChOp, false);
} }
// 4.dataVersion // 4.dataVersion
tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); tlen += taosEncodeFixedI32(buf, pInfo->dataVersion);
// 5.checksum
if (isParent) {
if (buf) {
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
tlen += taosEncodeFixedU32(buf, cksum);
} else {
tlen += sizeof(uint32_t);
}
}
return tlen; return tlen;
} }
void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) { void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) { if (!pInfo) {
return buf; return buf;
} }
// 5.checksum
if (isParent) {
int32_t dataLen = len - sizeof(uint32_t);
void* pCksum = POINTER_SHIFT(buf, dataLen);
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
ASSERT(0); // debug
qError("stream interval state is invalid");
return buf;
}
}
// 1.streamAggSup.pResultRows // 1.streamAggSup.pResultRows
int32_t mapSize = 0; int32_t mapSize = 0;
buf = taosDecodeFixedI32(buf, &mapSize); buf = taosDecodeFixedI32(buf, &mapSize);
@ -4603,7 +4669,7 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
buf = doStreamStateDecodeOpState(buf, pChOp); buf = doStreamStateDecodeOpState(buf, 0, pChOp, false);
} }
// 4.dataVersion // 4.dataVersion
@ -4611,6 +4677,16 @@ void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) {
return buf; return buf;
} }
void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true);
void* buf = taosMemoryCalloc(1, len);
void* pBuf = buf;
len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len);
}
static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
@ -4665,8 +4741,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamSessionSaveCheckpoint(pOperator);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
doStreamSessionSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else { } else {
@ -4861,7 +4937,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t len = 0; int32_t len = 0;
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamStateDecodeOpState(buff, pOperator); doStreamStateDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff); taosMemoryFree(buff);
} }
@ -5530,8 +5606,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single interval"); printDataBlock(pBlock, "single interval");
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
doStreamIntervalSaveCheckpoint(pOperator);
pAPI->stateStore.streamStateCommit(pInfo->pState); pAPI->stateStore.streamStateCommit(pInfo->pState);
doStreamIntervalSaveCheckpoint(pOperator);
pInfo->reCkBlock = true; pInfo->reCkBlock = true;
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
@ -5714,7 +5790,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t len = 0; int32_t len = 0;
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, pOperator); doStreamIntervalDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
} }

View File

@ -0,0 +1,178 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 state_window(b);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
print waiting for checkpoint generation 1 ......
sleep 25000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791233003,4,3,3,1.1);
$loop_count = 0
loop2:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop2
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop2
endi
if $data02 != 6 then
print =====data02=$data02
goto loop2
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop2
endi
if $data12 != 4 then
print =====data12=$data12
goto loop2
endi
print step 2
print restart taosd 02 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791233004,5,3,3,1.1);
loop20:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop20
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop20
endi
if $data02 != 6 then
print =====data02=$data02
goto loop20
endi
# row 1
if $data11 != 2 then
print =====data11=$data11
goto loop20
endi
if $data12 != 9 then
print =====data12=$data12
goto loop20
endi
print end---------------------------------
system sh/exec.sh -n dnode1 -s stop -x SIGINT