This commit is contained in:
54liuyao 2024-08-16 10:54:52 +08:00
parent e0178f2871
commit d59eb19c37
10 changed files with 1439 additions and 91 deletions

View File

@ -454,6 +454,27 @@ typedef struct SSteamOpBasicInfo {
bool updateOperatorInfo; bool updateOperatorInfo;
} SSteamOpBasicInfo; } SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
TSKEY prevOriginKey;
SResultRowData cur;
SResultRowData next;
TSKEY nextOriginKey;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
SStorageAPI* pAPI;
STimeWindow winRange;
} SStreamFillSupporter;
typedef struct SStreamScanInfo { typedef struct SStreamScanInfo {
SSteamOpBasicInfo basic; SSteamOpBasicInfo basic;
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
@ -494,6 +515,7 @@ typedef struct SStreamScanInfo {
STimeWindow updateWin; STimeWindow updateWin;
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes; SSDataBlock* pUpdateDataRes;
SStreamFillSupporter* pFillSup;
// status for tmq // status for tmq
SNodeList* pGroupTags; SNodeList* pGroupTags;
SNode* pTagCond; SNode* pTagCond;
@ -775,27 +797,6 @@ typedef struct SStreamPartitionOperatorInfo {
SSDataBlock* pCreateTbRes; SSDataBlock* pCreateTbRes;
} SStreamPartitionOperatorInfo; } SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
TSKEY prevOriginKey;
SResultRowData cur;
SResultRowData next;
TSKEY nextOriginKey;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
SStorageAPI* pAPI;
STimeWindow winRange;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo { typedef struct SStreamFillOperatorInfo {
SSteamOpBasicInfo basic; SSteamOpBasicInfo basic;
SStreamFillSupporter* pFillSup; SStreamFillSupporter* pFillSup;

View File

@ -27,6 +27,21 @@ extern "C" {
#define FILL_POS_MID 2 #define FILL_POS_MID 2
#define FILL_POS_END 3 #define FILL_POS_END 3
#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN)
#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN)
typedef struct SSliceRowData {
TSKEY key;
SResultCellData pRowVal[];
} SSliceRowData;
typedef struct SSlicePoint {
SWinKey key;
SSliceRowData* pLeftRow;
SSliceRowData* pRightRow;
SRowBuffPos* pResPos;
} SSlicePoint;
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
@ -56,6 +71,7 @@ void destroySPoint(void* ptr);
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo); void destroyStreamFillInfo(SStreamFillInfo* pFillInfo);
int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes); int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes);
void resetStreamFillSup(SStreamFillSupporter* pFillSup); void resetStreamFillSup(SStreamFillSupporter* pFillSup);
void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup);
int winPosCmprImpl(const void* pKey1, const void* pKey2); int winPosCmprImpl(const void* pKey1, const void* pKey2);

View File

@ -2184,8 +2184,38 @@ _end:
return code; return code;
} }
int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, static int32_t setDelRangeEndKey(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SWinKey* pEndKey, STimeWindow* pScanRange, bool* pRes) {
int64_t groupId, STimeWindow* pScanRange, STimeWindow* pDelRange) { int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSlicePoint nextPoint = {.key.groupId = pEndKey->groupId};
int32_t vLen = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, pEndKey, &nextPoint.key, (void**)&nextPoint.pResPos, &vLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) {
setPointBuff(&nextPoint, pFillSup);
if (HAS_ROW_DATA(nextPoint.pLeftRow) && pEndKey->ts < nextPoint.pLeftRow->key) {
pScanRange->ekey = nextPoint.pLeftRow->key;
*pRes = true;
} else if (pEndKey->ts < nextPoint.pRightRow->key) {
pScanRange->ekey = nextPoint.pRightRow->key;
*pRes = true;
} else {
*pEndKey = nextPoint.key;
pScanRange->ekey = TMAX(nextPoint.pRightRow->key, nextPoint.key.ts);
*pRes = false;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SInterval* pInterval, TSKEY start, TSKEY end,
int64_t groupId, STimeWindow* pScanRange) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS; int32_t winCode = TSDB_CODE_SUCCESS;
@ -2193,28 +2223,36 @@ int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval,
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC); STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC);
SWinKey startKey = {.groupId = groupId, .ts = sWin.skey}; SWinKey startKey = {.groupId = groupId, .ts = sWin.skey};
pDelRange->skey = sWin.skey;
sWin = getActiveTimeWindow(NULL, &dumyInfo, end, pInterval, TSDB_ORDER_ASC); sWin = getActiveTimeWindow(NULL, &dumyInfo, end, pInterval, TSDB_ORDER_ASC);
SWinKey endKey = {.groupId = groupId, .ts = sWin.ekey}; SWinKey endKey = {.groupId = groupId, .ts = sWin.ekey};
pDelRange->ekey = sWin.ekey;
SWinKey preKey = {.groupId = groupId}; SSlicePoint prevPoint = {.key.groupId = groupId};
code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL, &winCode); SSlicePoint nextPoint = {.key.groupId = groupId};
int32_t vLen = 0;
code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &prevPoint.key, (void**)&prevPoint.pResPos, &vLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) { if (winCode == TSDB_CODE_SUCCESS) {
pScanRange->skey = preKey.ts; setPointBuff(&prevPoint, pFillSup);
if (HAS_ROW_DATA(prevPoint.pRightRow)) {
pScanRange->skey = prevPoint.pRightRow->key;
} else {
pScanRange->skey = prevPoint.pLeftRow->key;
}
} else { } else {
pScanRange->skey = startKey.ts; pScanRange->skey = startKey.ts;
} }
SWinKey nextKey = {.groupId = groupId}; bool res = false;
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL, &winCode); SWinKey curKey = endKey;
code = setDelRangeEndKey(pAggSup, pFillSup, &curKey, pScanRange, &res);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) { if (res == false) {
pScanRange->ekey = nextKey.ts; code = setDelRangeEndKey(pAggSup, pFillSup, &curKey, pScanRange, &res);
} else { QUERY_CHECK_CODE(code, lino, _end);
pScanRange->ekey = endKey.ts; }
if (res == false) {
pScanRange->ekey = TMAX(endKey.ts, pScanRange->ekey);
} }
_end: _end:
@ -2277,10 +2315,9 @@ static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* p
} }
STimeWindow scanRange = {0}; STimeWindow scanRange = {0};
STimeWindow delRange = {0};
ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA); ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA);
code = getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, code = getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, pInfo->pFillSup, &pInfo->interval, startData[i], endData[i], groupId,
&scanRange, &delRange); &scanRange);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false); code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false);
@ -2293,10 +2330,10 @@ static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* p
code = colDataSetVal(pDestGpCol, i, (const char*)&groupId, false); code = colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&delRange.skey, false); code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&scanRange.skey, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&delRange.ekey, false); code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&scanRange.ekey, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
pDestBlock->info.rows++; pDestBlock->info.rows++;
@ -4330,6 +4367,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->pState = pTaskInfo->streamInfo.pState;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pFillSup = NULL;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);

View File

@ -29,23 +29,9 @@
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN)
#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN)
#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN) #define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN)
#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN) #define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN)
typedef struct SSliceRowData {
TSKEY key;
SResultCellData pRowVal[];
} SSliceRowData;
typedef struct SSlicePoint {
SWinKey key;
SSliceRowData* pLeftRow;
SSliceRowData* pRightRow;
SRowBuffPos* pResPos;
} SSlicePoint;
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) {
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0);
} }
@ -628,7 +614,7 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock,
static void setResultRowData(SSliceRowData** ppRowData, void* pBuff) { (*ppRowData) = (SSliceRowData*)pBuff; } static void setResultRowData(SSliceRowData** ppRowData, void* pBuff) { (*ppRowData) = (SSliceRowData*)pBuff; }
static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) {
if (pFillSup->type != TSDB_FILL_LINEAR) { if (pFillSup->type != TSDB_FILL_LINEAR) {
setResultRowData(&pPoint->pRightRow, pPoint->pResPos->pRowBuff); setResultRowData(&pPoint->pRightRow, pPoint->pResPos->pRowBuff);
pPoint->pLeftRow = pPoint->pRightRow; pPoint->pLeftRow = pPoint->pRightRow;
@ -1344,14 +1330,6 @@ static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRe
uint16_t opType = pOperator->operatorType; uint16_t opType = pOperator->operatorType;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
doBuildDeleteResultImpl(&pAggSup->stateStore, pAggSup->pState, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pDelRes;
goto _end;
}
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
if (pInfo->pRes->info.rows != 0) { if (pInfo->pRes->info.rows != 0) {
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
@ -1407,16 +1385,16 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIds = (uint64_t*)pGroupCol->pData; uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsCalStarts = (TSKEY*)pCalStartCol->pData; TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* tsCalEnds = (TSKEY*)pCalEndCol->pData; TSKEY* tsEnds = (TSKEY*)pEndCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY ts = tsStarts[i];
TSKEY endCalTs = tsEnds[i];
uint64_t groupId = groupIds[i];
SWinKey key = {.ts = ts, .groupId = groupId};
while (1) { while (1) {
TSKEY ts = tsCalStarts[i];
TSKEY endCalTs = tsCalEnds[i];
uint64_t groupId = groupIds[i];
SWinKey key = {.ts = ts, .groupId = groupId};
SWinKey nextKey = {.groupId = groupId}; SWinKey nextKey = {.groupId = groupId};
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode); code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -1424,8 +1402,6 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock
break; break;
} }
(void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey));
void* tmp = taosArrayPush(pDelWins, &key);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
pAggSup->stateStore.streamStateDel(pAggSup->pState, &key); pAggSup->stateStore.streamStateDel(pAggSup->pState, &key);
if (winCode != TSDB_CODE_SUCCESS) { if (winCode != TSDB_CODE_SUCCESS) {
@ -1526,9 +1502,15 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
setStreamOperatorState(&pInfo->basic, pBlock->info.type); setStreamOperatorState(&pInfo->basic, pBlock->info.type);
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_DELETE_RESULT: { case STREAM_DELETE_RESULT:
case STREAM_DELETE_DATA: {
code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins); code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
(*ppRes) = pInfo->pDelRes;
printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
goto _end;
} break; } break;
case STREAM_NORMAL: case STREAM_NORMAL:
case STREAM_INVALID: { case STREAM_INVALID: {
@ -1560,9 +1542,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
doStreamTimeSliceImpl(pOperator, pBlock); doStreamTimeSliceImpl(pOperator, pBlock);
} }
if (!pInfo->destHasPrimaryKey) { if (pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
} else {
copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
} }
@ -1665,6 +1645,44 @@ int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t initTimeSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
SStreamFillSupporter* pFillSup) {
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
pScanInfo->tsColIndex = tsColIndex;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
code = initTimeSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pFillSup);
return code;
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->igCheckUpdate = true;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->pUpdateInfo) {
code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
&pScanInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
}
pScanInfo->twAggSup = *pTwSup;
pScanInfo->pFillSup = pFillSup;
pScanInfo->interval = pFillSup->interval;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** ppOptInfo) { SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1789,13 +1807,12 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
if (downstream) { if (downstream) {
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
SStreamScanInfo* pScanInfo = downstream->info; &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);
pScanInfo->igCheckUpdate = true; QUERY_CHECK_CODE(code, lino, _error);
}
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup,
&pInfo->basic);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
} }
(*ppOptInfo) = pOperator; (*ppOptInfo) = pOperator;
return code; return code;

View File

@ -284,7 +284,6 @@ _end:
} }
void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) { void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) {
deleteRowBuff(pFileState, pKey, sizeof(SWinKey));
SSHashObj* pSearchBuff = getSearchBuff(pFileState); SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (!ppBuff) { if (!ppBuff) {

View File

@ -246,10 +246,6 @@ int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKe
} }
void streamStateFillDel(SStreamState* pState, const SWinKey* key) { void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
if (pState->pFileState) {
deleteHashSortRowBuff(pState->pFileState, key);
return;
}
int32_t code = streamStateFillDel_rocksdb(pState, key); int32_t code = streamStateFillDel_rocksdb(pState, key);
qTrace("%s at line %d res %d", __func__, __LINE__, code); qTrace("%s at line %d res %d", __func__, __LINE__, code);
} }

View File

@ -569,10 +569,12 @@ int32_t addRowBuffIfNotExist(SStreamFileState* pFileState, void* pKey, int32_t k
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey)); pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen); SRowBuffPos** pos = tSimpleHashGet(pFileState->rowStateBuff, pKey, keyLen);
if (pos) { if (pos) {
*pVLen = pFileState->rowSize; if (pVal != NULL) {
*pVal = *pos; *pVLen = pFileState->rowSize;
(*pos)->beUsed = true; *pVal = *pos;
(*pos)->beFlushed = false; (*pos)->beUsed = true;
(*pos)->beFlushed = false;
}
goto _end; goto _end;
} }
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
@ -616,11 +618,17 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff); qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
if (pFileState->searchBuff != NULL) {
deleteHashSortRowBuff(pFileState, pKey);
}
} }
int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (pFileState->searchBuff != NULL) {
deleteHashSortRowBuff(pFileState, pKey);
}
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -0,0 +1,507 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop0
endi
if $data11 != 8 then
print ======data11=$data11
goto loop0
endi
if $data21 != 10 then
print ======data21=$data21
goto loop0
endi
if $data31 != 15 then
print ======data31=$data31
goto loop0
endi
if $data41 != 15 then
print ======data41=$data41
goto loop0
endi
print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop1
endi
if $data11 != 8 then
print ======data11=$data11
goto loop1
endi
if $data21 != 8 then
print ======data21=$data21
goto loop1
endi
if $data31 != 8 then
print ======data31=$data31
goto loop1
endi
if $data41 != 8 then
print ======data41=$data41
goto loop1
endi
print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop2
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop2
endi
if $data11 != 8 then
print ======data11=$data11
goto loop2
endi
if $data21 != 8 then
print ======data21=$data21
goto loop2
endi
if $data31 != 8 then
print ======data31=$data31
goto loop2
endi
print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000;
sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(prev);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop3
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop3
endi
print step2
print =============== create database
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop4
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop4
endi
if $data11 != 8 then
print ======data11=$data11
goto loop4
endi
if $data21 != 10 then
print ======data21=$data21
goto loop4
endi
if $data31 != 4 then
print ======data31=$data31
goto loop4
endi
if $data41 != 4 then
print ======data41=$data41
goto loop4
endi
print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop5
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop5
endi
if $data11 != 8 then
print ======data11=$data11
goto loop5
endi
if $data21 != 4 then
print ======data21=$data21
goto loop5
endi
if $data31 != 4 then
print ======data31=$data31
goto loop5
endi
if $data41 != 4 then
print ======data41=$data41
goto loop5
endi
print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop6:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop6
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop6
endi
if $data11 != 4 then
print ======data11=$data11
goto loop6
endi
if $data21 != 4 then
print ======data21=$data21
goto loop6
endi
if $data31 != 4 then
print ======data31=$data31
goto loop6
endi
print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000;
sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(next);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop7
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop7
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,508 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != NULL then
print ======data01=$data01
goto loop0
endi
if $data11 != 8 then
print ======data11=$data11
goto loop0
endi
if $data21 != 10 then
print ======data21=$data21
goto loop0
endi
if $data31 != NULL then
print ======data31=$data31
goto loop0
endi
if $data41 != NULL then
print ======data41=$data41
goto loop0
endi
print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != NULL then
print ======data01=$data01
goto loop1
endi
if $data11 != 8 then
print ======data11=$data11
goto loop1
endi
if $data21 != NULL then
print ======data21=$data21
goto loop1
endi
if $data31 != NULL then
print ======data31=$data31
goto loop1
endi
if $data41 != NULL then
print ======data41=$data41
goto loop1
endi
print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop2
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop2
endi
if $data11 != NULL then
print ======data11=$data11
goto loop2
endi
if $data21 != NULL then
print ======data21=$data21
goto loop2
endi
if $data31 != NULL then
print ======data31=$data31
goto loop2
endi
print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000;
sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop3
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop3
endi
print step2
print =============== create database
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(value,100,200,300,400);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop4
endi
# row 0
if $data01 != 100 then
print ======data01=$data01
goto loop4
endi
if $data11 != 8 then
print ======data11=$data11
goto loop4
endi
if $data21 != 10 then
print ======data21=$data21
goto loop4
endi
if $data31 != 100 then
print ======data31=$data31
goto loop4
endi
if $data41 != 100 then
print ======data41=$data41
goto loop4
endi
print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop5
endi
# row 0
if $data01 != 100 then
print ======data01=$data01
goto loop5
endi
if $data11 != 8 then
print ======data11=$data11
goto loop5
endi
if $data21 != 100 then
print ======data21=$data21
goto loop5
endi
if $data31 != 100 then
print ======data31=$data31
goto loop5
endi
if $data41 != 100 then
print ======data41=$data41
goto loop5
endi
print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop6:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop6
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop6
endi
if $data11 != 100 then
print ======data11=$data11
goto loop6
endi
if $data21 != 100 then
print ======data21=$data21
goto loop6
endi
if $data31 != 100 then
print ======data31=$data31
goto loop6
endi
print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000;
sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(value,100,200,300,400);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop7
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop7
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,258 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(linear);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212001,1,1,1,1.0) (1648791214000,8,1,1,1.0) (1648791215000,10,1,1,1.0) (1648791215009,15,1,1,1.0) (1648791217001,4,1,1,1.0);
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linera);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop0
endi
if $data11 != 8 then
print ======data11=$data11
goto loop0
endi
if $data21 != 10 then
print ======data21=$data21
goto loop0
endi
if $data31 != 9 then
print ======data31=$data31
goto loop0
endi
if $data41 != 4 then
print ======data41=$data41
goto loop0
endi
print 1 sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
sql delete from t1 where ts >= 1648791215000 and ts <= 1648791216000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 5 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop1
endi
if $data11 != 8 then
print ======data11=$data11
goto loop1
endi
if $data21 != 6 then
print ======data21=$data21
goto loop1
endi
if $data31 != 5 then
print ======data31=$data31
goto loop1
endi
if $data41 != 4 then
print ======data41=$data41
goto loop1
endi
print 2 sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
sql delete from t1 where ts >= 1648791212000 and ts <= 1648791213000;
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop2
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop2
endi
if $data11 != 6 then
print ======data11=$data11
goto loop2
endi
if $data21 != 5 then
print ======data21=$data21
goto loop2
endi
if $data31 != 4 then
print ======data31=$data31
goto loop2
endi
print 3 sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000;
sql delete from t1 where ts >= 1648791217000 and ts <= 1648791218000
print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791212000, 1648791217001) every(1s) fill(linear);
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop3
endi
# row 0
if $data01 != 8 then
print ======data01=$data01
goto loop3
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT