Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-07-18 15:19:09 +08:00
commit d2c875e774
6 changed files with 305 additions and 174 deletions

View File

@ -382,6 +382,9 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask); bool tInputQueueIsFull(const SStreamTask* pTask);

View File

@ -904,8 +904,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); pInfo->binfo.inputTsOrder);
// prev time window not interpolation yet. // prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
@ -932,7 +932,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
pInfo->binfo.inputTsOrder);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
@ -944,8 +945,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); pInfo->binfo.inputTsOrder);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
// TODO: add to open window? how to close the open windows after input blocks exhausted? // TODO: add to open window? how to close the open windows after input blocks exhausted?
@ -1604,7 +1605,8 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->pUpdateInfo) { if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); pScanInfo->pUpdateInfo =
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate);
} }
pScanInfo->interval = pInfo->interval; pScanInfo->interval = pInfo->interval;
@ -1632,8 +1634,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
int32_t code = int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -1862,7 +1864,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
if (pStateNode->window.pExprs != NULL) { if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0; int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); int32_t code =
initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -2033,12 +2036,13 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} }
} }
bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { return pStore->streamStateCheck(pState, pKey); } bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) {
return pStore->streamStateCheck(pState, pKey);
}
int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
SAggSupporter* pAggSup, SStateStore* pStore) { SAggSupporter* pAggSup, SStateStore* pStore) {
SWinKey key = {.ts = win->skey, .groupId = groupId}; SWinKey key = {.ts = win->skey, .groupId = groupId};
char* value = NULL; char* value = NULL;
int32_t size = pAggSup->resultRowSize; int32_t size = pAggSup->resultRowSize;
@ -2056,7 +2060,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore) { bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
SStateStore* pStore) {
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId}; SWinKey key = {.ts = pWin->skey, .groupId = groupId};
if (!hasIntervalWindow(pState, &key, pStore)) { if (!hasIntervalWindow(pState, &key, pStore)) {
@ -2126,7 +2131,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
} }
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
int32_t numOfCh, SOperatorInfo* pOperator) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData; TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
@ -2347,7 +2353,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
} }
while (1) { while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) ||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) { if (startPos < 0) {
break; break;
@ -2362,7 +2369,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
.groupId = groupId, .groupId = groupId,
}; };
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && !chIds) { if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed &&
!chIds) {
SPullWindowInfo pull = { SPullWindowInfo pull = {
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request // add pull data request
@ -2586,8 +2594,6 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO
while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen); void* key = taosHashGetKey(pIte, &keyLen);
tlen += encodeSWinKey(buf, key); tlen += encodeSWinKey(buf, key);
SRowBuffPos* pPos = *(void**)pIte;
tlen += encodeSRowBuffPos(buf, pPos);
} }
// 2.twAggSup // 2.twAggSup
@ -2647,10 +2653,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
buf = taosDecodeFixedI32(buf, &mapSize); buf = taosDecodeFixedI32(buf, &mapSize);
for (int32_t i = 0; i < mapSize; i++) { for (int32_t i = 0; i < mapSize; i++) {
SWinKey key = {0}; SWinKey key = {0};
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
buf = decodeSWinKey(buf, &key); buf = decodeSWinKey(buf, &key);
buf = decodeSRowBuffPos(buf, pPos); SRowBuffPos* pPos = NULL;
int32_t resSize = pInfo->aggSup.resultRowSize;
pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize);
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
} }
@ -2947,7 +2953,8 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY); int32_t resSize = sizeof(TSKEY);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
} }
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -2976,7 +2983,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) {
} }
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -3094,7 +3102,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// for stream // for stream
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, len, pOperator); doStreamIntervalDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);
@ -3181,13 +3190,15 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState; pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->pUpdateInfo) { if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, pScanInfo->igCheckUpdate); pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate);
} }
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = *pTwSup;
} }
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) { SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, SStorageAPI* pApi) {
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap; pSup->gap = gap;
@ -3297,7 +3308,8 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
int32_t size = 0; int32_t size = 0;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3410,7 +3422,8 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
setSessionWinOutputInfo(pStUpdated, pNextWin); setSessionWinOutputInfo(pStUpdated, pNextWin);
int32_t size = 0; int32_t size = 0;
pNextWin->sessionWin = pCurWin->sessionWin; pNextWin->sessionWin = pCurWin->sessionWin;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pNextWin->pOutputBuf); taosMemoryFreeClear(pNextWin->pOutputBuf);
SET_SESSION_WIN_INVALID(*pNextWin); SET_SESSION_WIN_INVALID(*pNextWin);
@ -3460,7 +3473,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
} }
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3747,8 +3761,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->pBuf = NULL; pGroupResInfo->pBuf = NULL;
} }
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) {
SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version // set output datablock version
pBlock->info.version = pTaskInfo->version; pBlock->info.version = pTaskInfo->version;
@ -3916,6 +3929,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len);
taosMemoryFree(buf);
} }
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
@ -4067,7 +4081,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
} }
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) { if (downstream->fpSet.releaseStreamStateFn) {
@ -4138,7 +4154,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
} }
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -4183,7 +4200,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
// for stream // for stream
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); int32_t res =
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME,
strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamSessionDecodeOpState(buff, len, pOperator, true); doStreamSessionDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff); taosMemoryFree(buff);
@ -4346,7 +4365,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} }
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle); SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle);
if (pOperator == NULL) { if (pOperator == NULL) {
@ -4444,9 +4464,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.sessionWin.groupId = groupId; pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts;
int32_t code = int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize, pKeyData, pAggSup->stateKeySize, compareStateKey,
compareStateKey, &pCurWin->winInfo.pOutputBuf, &size); &pCurWin->winInfo.pOutputBuf, &size);
pCurWin->pStateKey = pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
@ -4456,7 +4476,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf,
&pAggSup->pSessionAPI->stateStore);
pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size);
} }
@ -4472,7 +4493,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
pNextWin->winInfo.pOutputBuf = NULL; pNextWin->winInfo.pOutputBuf = NULL;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin);
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo); SET_SESSION_WIN_INVALID(pNextWin->winInfo);
@ -4800,7 +4822,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
void streamStateReleaseState(SOperatorInfo* pOperator) { void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) { if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream); downstream->fpSet.releaseStreamStateFn(downstream);
@ -4936,7 +4960,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
// for stream // for stream
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); int32_t res =
pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME,
strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamStateDecodeOpState(buff, len, pOperator, true); doStreamStateDecodeOpState(buff, len, pOperator, true);
taosMemoryFree(buff); taosMemoryFree(buff);
@ -5322,8 +5348,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
} }
TSKEY ekey = ascScan ? win.ekey : win.skey; TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); iaInfo->binfo.inputTsOrder);
ASSERT(forwardRows > 0); ASSERT(forwardRows > 0);
// prev time window not interpolation yet. // prev time window not interpolation yet.
@ -5353,8 +5379,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = forwardRows - 1 + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->binfo.inputTsOrder); iaInfo->binfo.inputTsOrder);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
@ -5368,8 +5394,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows = forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); iaInfo->binfo.inputTsOrder);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
@ -5706,13 +5732,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
}; };
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType, .calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode) .deleteMark = getDeleteMark(pIntervalPhyNode)};
};
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
@ -5735,8 +5759,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
pInfo->pState, &pTaskInfo->storageAPI.functionStore); &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -5789,7 +5813,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
// for stream // for stream
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME,
strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
doStreamIntervalDecodeOpState(buff, len, pOperator); doStreamIntervalDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff); taosMemoryFree(buff);

View File

@ -438,18 +438,14 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) {
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SCheckpointInfo info;
if (pTask == NULL) {
goto _err;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue;
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
chkpId = TMAX(chkpId, pTask->chkInfo.checkpointId); chkpId = TMAX(chkpId, info.checkpointId);
taosMemoryFree(pTask); // fix mem leak later
} }
_err: _err:

View File

@ -219,6 +219,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path
} }
// const char* path = NULL; // const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
taosMemoryFree(pReader);
return -1; return -1;
} }

View File

@ -132,6 +132,35 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;

View File

@ -14,6 +14,7 @@ sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql create stream streams1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1); sql insert into t1 values(1648791213001,2,2,3,1.1);
@ -45,6 +46,23 @@ if $data02 != 3 then
goto loop0 goto loop0
endi endi
$loop_count = 0
loop01:
sleep 1000
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 0 then
print =====rows=$rows expect 1
goto loop01
endi
print waiting for checkpoint generation 1 ...... print waiting for checkpoint generation 1 ......
sleep 25000 sleep 25000
@ -126,6 +144,36 @@ if $data12 != 4 then
goto loop2 goto loop2
endi endi
$loop_count = 0
loop3:
sleep 1000
print select * from streamt1;
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 2
goto loop3
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 6 then
print =====data02=$data02
goto loop3
endi
print step 2 print step 2
print restart taosd 02 ...... print restart taosd 02 ......
@ -136,7 +184,7 @@ system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791223004,5,2,3,1.1); sql insert into t1 values(1648791223004,5,2,3,1.1);
loop20: loop4:
sleep 1000 sleep 1000
sql select * from streamt; sql select * from streamt;
@ -148,29 +196,58 @@ endi
if $rows != 2 then if $rows != 2 then
print =====rows=$rows expect 2 print =====rows=$rows expect 2
goto loop20 goto loop4
endi endi
# row 0 # row 0
if $data01 != 3 then if $data01 != 3 then
print =====data01=$data01 print =====data01=$data01
goto loop20 goto loop4
endi endi
if $data02 != 6 then if $data02 != 6 then
print =====data02=$data02 print =====data02=$data02
goto loop20 goto loop4
endi endi
# row 1 # row 1
if $data11 != 2 then if $data11 != 2 then
print =====data11=$data11 print =====data11=$data11
goto loop20 goto loop4
endi endi
if $data12 != 9 then if $data12 != 9 then
print =====data12=$data12 print =====data12=$data12
goto loop20 goto loop4
endi
$loop_count = 0
loop5:
sleep 1000
print select * from streamt1;
sql select * from streamt1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 2
goto loop5
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop5
endi
if $data02 != 6 then
print =====data02=$data02
goto loop5
endi endi
print end--------------------------------- print end---------------------------------