Merge branch 'fix/newCheckpoint' into enh/triggerCheckPoint2
This commit is contained in:
commit
039befa3d6
|
@ -310,6 +310,7 @@ struct SStreamTask {
|
||||||
SStreamId streamTaskId;
|
SStreamId streamTaskId;
|
||||||
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
||||||
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
|
||||||
|
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
|
||||||
|
|
||||||
// output
|
// output
|
||||||
union {
|
union {
|
||||||
|
@ -569,6 +570,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
||||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
|
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskGetInputQItems(const SStreamTask* pTask);
|
||||||
|
|
||||||
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
||||||
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
||||||
|
@ -619,7 +621,6 @@ int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
|
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
|
||||||
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMsgCb = &pSnode->msgCb;
|
pTask->pMsgCb = &pSnode->msgCb;
|
||||||
pTask->pMeta = pSnode->pMeta;
|
pTask->pMeta = pSnode->pMeta;
|
||||||
|
taosThreadMutexInit(&pTask->lock, NULL);
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
|
|
|
@ -928,16 +928,20 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
taosThreadMutexInit(&pTask->lock, NULL);
|
||||||
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
|
||||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
|
||||||
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
|
||||||
|
|
||||||
if (pTask->chkInfo.checkpointId != 0) {
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
|
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
|
||||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||||
|
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
||||||
|
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||||
|
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -268,17 +268,25 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
|
int32_t numOfItems = streamTaskGetInputQItems(pTask);
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
||||||
|
|
||||||
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
|
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
|
||||||
|
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pItem != NULL) {
|
if (pItem != NULL) {
|
||||||
noDataInWal = false;
|
noDataInWal = false;
|
||||||
code = tAppendDataToInputQueue(pTask, pItem);
|
code = tAppendDataToInputQueue(pTask, pItem);
|
||||||
|
@ -292,7 +300,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) {
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) {
|
||||||
code = streamSchedExec(pTask);
|
code = streamSchedExec(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
|
|
@ -2594,8 +2594,6 @@ int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pO
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
|
||||||
void* key = taosHashGetKey(pIte, &keyLen);
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
tlen += encodeSWinKey(buf, key);
|
tlen += encodeSWinKey(buf, key);
|
||||||
SRowBuffPos* pPos = *(void**)pIte;
|
|
||||||
tlen += encodeSRowBuffPos(buf, pPos);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.twAggSup
|
// 2.twAggSup
|
||||||
|
@ -2655,10 +2653,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SWinKey key = {0};
|
SWinKey key = {0};
|
||||||
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
|
|
||||||
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
|
|
||||||
buf = decodeSWinKey(buf, &key);
|
buf = decodeSWinKey(buf, &key);
|
||||||
buf = decodeSRowBuffPos(buf, pPos);
|
SRowBuffPos* pPos = NULL;
|
||||||
|
int32_t resSize = pInfo->aggSup.resultRowSize;
|
||||||
|
pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize);
|
||||||
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
|
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3818,14 +3816,13 @@ void* decodeSSessionKey(void* buf, SSessionKey* key) {
|
||||||
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
||||||
tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen);
|
|
||||||
tlen += encodeSSessionKey(buf, &key->sessionWin);
|
tlen += encodeSSessionKey(buf, &key->sessionWin);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen);
|
key->pOutputBuf = NULL;
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,13 +133,18 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
pTask->checkpointingId = pReq->checkpointId;
|
pTask->checkpointingId = pReq->checkpointId;
|
||||||
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
|
||||||
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
|
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
|
||||||
// inputQ, to make sure all blocks with less version have been handled by this task already.
|
// inputQ, to make sure all blocks with less version have been handled by this task already.
|
||||||
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||||
|
@ -180,15 +185,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||||
// todo: sink node needs alignment??
|
|
||||||
/* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK_READY;
|
|
||||||
|
|
||||||
// update the child Id for downstream tasks
|
|
||||||
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
|
|
||||||
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id);
|
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
|
||||||
} else {*/
|
|
||||||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||||
|
|
||||||
// update the child Id for downstream tasks
|
// update the child Id for downstream tasks
|
||||||
|
@ -204,8 +200,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (taskLevel == TASK_LEVEL__SINK) {
|
if (taskLevel == TASK_LEVEL__SINK) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK_READY;
|
pTask->status.taskStatus = TASK_STATUS__CK_READY;
|
||||||
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
|
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
|
||||||
|
|
|
@ -295,7 +295,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) {
|
int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
|
||||||
int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue);
|
int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue);
|
||||||
int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall);
|
int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall);
|
||||||
|
|
||||||
|
|
|
@ -273,6 +273,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||||
|
taosThreadMutexDestroy(&pTask->lock);
|
||||||
|
|
||||||
if (pTask->id.idStr != NULL) {
|
if (pTask->id.idStr != NULL) {
|
||||||
taosMemoryFree((void*)pTask->id.idStr);
|
taosMemoryFree((void*)pTask->id.idStr);
|
||||||
|
|
|
@ -14,6 +14,7 @@ sql use test;
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
|
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
|
||||||
|
sql create stream streams1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
sql insert into t1 values(1648791213001,2,2,3,1.1);
|
sql insert into t1 values(1648791213001,2,2,3,1.1);
|
||||||
|
|
||||||
|
@ -45,6 +46,23 @@ if $data02 != 3 then
|
||||||
goto loop0
|
goto loop0
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop01:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 0 then
|
||||||
|
print =====rows=$rows expect 1
|
||||||
|
goto loop01
|
||||||
|
endi
|
||||||
|
|
||||||
print waiting for checkpoint generation 1 ......
|
print waiting for checkpoint generation 1 ......
|
||||||
|
|
||||||
sleep 25000
|
sleep 25000
|
||||||
|
@ -126,6 +144,36 @@ if $data12 != 4 then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop3:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
print select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows expect 2
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
print step 2
|
print step 2
|
||||||
|
|
||||||
print restart taosd 02 ......
|
print restart taosd 02 ......
|
||||||
|
@ -136,7 +184,7 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
sql insert into t1 values(1648791223004,5,2,3,1.1);
|
sql insert into t1 values(1648791223004,5,2,3,1.1);
|
||||||
|
|
||||||
loop20:
|
loop4:
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
||||||
sql select * from streamt;
|
sql select * from streamt;
|
||||||
|
@ -148,29 +196,58 @@ endi
|
||||||
|
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
print =====rows=$rows expect 2
|
print =====rows=$rows expect 2
|
||||||
goto loop20
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
# row 0
|
# row 0
|
||||||
if $data01 != 3 then
|
if $data01 != 3 then
|
||||||
print =====data01=$data01
|
print =====data01=$data01
|
||||||
goto loop20
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data02 != 6 then
|
if $data02 != 6 then
|
||||||
print =====data02=$data02
|
print =====data02=$data02
|
||||||
goto loop20
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
# row 1
|
# row 1
|
||||||
if $data11 != 2 then
|
if $data11 != 2 then
|
||||||
print =====data11=$data11
|
print =====data11=$data11
|
||||||
goto loop20
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data12 != 9 then
|
if $data12 != 9 then
|
||||||
print =====data12=$data12
|
print =====data12=$data12
|
||||||
goto loop20
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
print select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows expect 2
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 3 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop5
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print end---------------------------------
|
print end---------------------------------
|
||||||
|
|
Loading…
Reference in New Issue