Merge pull request #28942 from taosdata/fix/TD-33020

enh(stream):optimize stream force window close
This commit is contained in:
Shengliang Guan 2024-11-29 17:12:34 +08:00 committed by GitHub
commit 82e0e8a873
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 714 additions and 206 deletions

View File

@ -93,7 +93,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen); const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen);
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen); SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen);
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId); int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId);
void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff); void sessionWinStateCleanup(void* pBuff);

View File

@ -475,6 +475,7 @@ typedef struct SStreamFillSupporter {
STimeWindow winRange; STimeWindow winRange;
int32_t pkColBytes; int32_t pkColBytes;
__compar_fn_t comparePkColFn; __compar_fn_t comparePkColFn;
int32_t* pOffsetInfo;
} SStreamFillSupporter; } SStreamFillSupporter;
typedef struct SStreamScanInfo { typedef struct SStreamScanInfo {
@ -822,10 +823,11 @@ typedef struct SStreamFillOperatorInfo {
int32_t primaryTsCol; int32_t primaryTsCol;
int32_t primarySrcSlotId; int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo; SStreamFillInfo* pFillInfo;
SStreamAggSupporter* pStreamAggSup;
SArray* pCloseTs; SArray* pCloseTs;
SArray* pUpdated; SArray* pUpdated;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SStreamState* pState;
SStateStore stateStore;
} SStreamFillOperatorInfo; } SStreamFillOperatorInfo;
typedef struct SStreamTimeSliceOperatorInfo { typedef struct SStreamTimeSliceOperatorInfo {
@ -884,6 +886,7 @@ typedef struct SStreamIntervalSliceOperatorInfo {
struct SOperatorInfo* pOperator; struct SOperatorInfo* pOperator;
bool hasFill; bool hasFill;
bool hasInterpoFunc; bool hasInterpoFunc;
int32_t* pOffsetInfo;
} SStreamIntervalSliceOperatorInfo; } SStreamIntervalSliceOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)

View File

@ -150,7 +150,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo); int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo); int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pInfo);

View File

@ -90,19 +90,21 @@ int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap);
int winPosCmprImpl(const void* pKey1, const void* pKey2); int winPosCmprImpl(const void* pKey1, const void* pKey2);
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index); SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo);
int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol); int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol);
void destroyFlusedppPos(void* ppRes); void destroyFlusedppPos(void* ppRes);
void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
SGroupResInfo* pGroupResInfo); SGroupResInfo* pGroupResInfo);
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol); int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo);
int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull); int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull);
int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode, int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
struct SOperatorInfo** ppOptInfo); struct SOperatorInfo** ppOptInfo);
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated); int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated);
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -119,7 +119,8 @@ typedef struct SStreamFillInfo {
int32_t delIndex; int32_t delIndex;
uint64_t curGroupId; uint64_t curGroupId;
bool hasNext; bool hasNext;
SResultRowData* pNonFillRow; SResultRowData* pNonFillRow;
void* pTempBuff;
} SStreamFillInfo; } SStreamFillInfo;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);

View File

@ -614,7 +614,7 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); code = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, &pOptr); code = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo, pHandle, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr); code = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo, &pOptr);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {

View File

@ -100,6 +100,8 @@ void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
taosMemoryFree(pFillSup->next.pRowVal); taosMemoryFree(pFillSup->next.pRowVal);
taosMemoryFree(pFillSup->nextNext.pRowVal); taosMemoryFree(pFillSup->nextNext.pRowVal);
taosMemoryFree(pFillSup->pOffsetInfo);
taosMemoryFree(pFillSup); taosMemoryFree(pFillSup);
} }
@ -129,6 +131,7 @@ void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
pFillInfo->pLinearInfo = NULL; pFillInfo->pLinearInfo = NULL;
taosArrayDestroy(pFillInfo->delRanges); taosArrayDestroy(pFillInfo->delRanges);
taosMemoryFreeClear(pFillInfo->pTempBuff);
taosMemoryFree(pFillInfo); taosMemoryFree(pFillInfo);
} }
@ -148,6 +151,14 @@ static void destroyStreamFillOperatorInfo(void* param) {
clearGroupResInfo(&pInfo->groupResInfo); clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->pCloseTs); taosArrayDestroy(pInfo->pCloseTs);
if (pInfo->stateStore.streamFileStateDestroy != NULL) {
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
}
if (pInfo->pState != NULL) {
taosMemoryFreeClear(pInfo->pState);
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
@ -1157,14 +1168,19 @@ _end:
return code; return code;
} }
static void resetForceFillWindow(SResultRowData* pRowData) {
pRowData->key = INT64_MIN;
pRowData->pRowVal = NULL;
}
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState; SStreamFillOperatorInfo* pInfo = pOperator->info;
bool res = false; bool res = false;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
if (pBlock->info.id.groupId == 0) { if (pBlock->info.id.groupId == 0) {
@ -1172,25 +1188,30 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter*
} else if (pBlock->info.id.groupId != pKey->groupId) { } else if (pBlock->info.id.groupId != pKey->groupId) {
break; break;
} }
void* val = NULL;
int32_t len = 0; SRowBuffPos* pValPos = NULL;
int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL); int32_t len = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode); qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
if (winCode == TSDB_CODE_SUCCESS) { if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts; pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = val; pFillSup->cur.pRowVal = pValPos->pRowBuff;
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
resetFillWindow(&pFillSup->cur); resetForceFillWindow(&pFillSup->cur);
releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore);
} else { } else {
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey); SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId}; SRowBuffPos* prePos = NULL;
void* preVal = NULL; int32_t preVLen = 0;
int32_t preVLen = 0; code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey,
winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen); (void**)&prePos, &preVLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) { if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts; pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = preVal; pFillSup->cur.pRowVal = prePos->pRowBuff;
if (pFillInfo->type == TSDB_FILL_PREV) { if (pFillInfo->type == TSDB_FILL_PREV) {
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -1200,9 +1221,9 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter*
code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res); code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
resetFillWindow(&pFillSup->cur); resetForceFillWindow(&pFillSup->cur);
} }
pAPI->stateStore.streamStateFreeCur(pCur); releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore);
} }
} }
@ -1247,6 +1268,45 @@ _end:
return code; return code;
} }
static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey key = {.groupId = groupId, .ts = pRow->key};
int32_t curVLen = 0;
SRowBuffPos* pStatePos = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos,
&curVLen, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize);
qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
int32_t rowId, uint64_t groupId, int32_t rowSize) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TSKEY ts = tsCol[rowId];
pFillInfo->nextRowKey = ts;
TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize);
SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff};
transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
keepResultInStateBuf(pInfo, groupId, &tmpNextRow);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// force window close impl // force window close impl
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1257,11 +1317,10 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
SSDataBlock* pBlock = pInfo->pSrcBlock; SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId; uint64_t groupId = pBlock->info.id.groupId;
SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData; TSKEY* tsCol = (TSKEY*)pTsCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++){ for (int32_t i = 0; i < pBlock->info.rows; i++){
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
int32_t size = taosArrayGetSize(pInfo->pCloseTs); int32_t size = taosArrayGetSize(pInfo->pCloseTs);
@ -1281,7 +1340,7 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
} }
} }
} }
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
_end: _end:
@ -1291,13 +1350,13 @@ _end:
return code; return code;
} }
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) { int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int64_t groupId = 0; int64_t groupId = 0;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState); SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState);
while (1) { while (1) {
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL); int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
if (winCode != TSDB_CODE_SUCCESS) { if (winCode != TSDB_CODE_SUCCESS) {
break; break;
} }
@ -1305,14 +1364,14 @@ int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdat
void* pPushRes = taosArrayPush(pUpdated, &key); void* pPushRes = taosArrayPush(pUpdated, &key);
QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno); QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
pAggSup->stateStore.streamStateGroupCurNext(pCur); pStateStore->streamStateGroupCurNext(pCur);
} }
pAggSup->stateStore.streamStateFreeCur(pCur); pStateStore->streamStateFreeCur(pCur);
pCur = NULL; pCur = NULL;
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pAggSup->stateStore.streamStateFreeCur(pCur); pStateStore->streamStateFreeCur(pCur);
pCur = NULL; pCur = NULL;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }
@ -1345,7 +1404,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
(*ppRes) = resBlock; (*ppRes) = resBlock;
goto _end; goto _end;
} }
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL; (*ppRes) = NULL;
goto _end; goto _end;
@ -1372,7 +1432,11 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = -1; pInfo->srcRowIndex = -1;
} break; } break;
case STREAM_CHECKPOINT: case STREAM_CHECKPOINT: {
pInfo->stateStore.streamStateCommit(pInfo->pState);
(*ppRes) = pBlock;
goto _end;
} break;
case STREAM_CREATE_CHILD_TABLE: { case STREAM_CREATE_CHILD_TABLE: {
(*ppRes) = pBlock; (*ppRes) = pBlock;
goto _end; goto _end;
@ -1393,7 +1457,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated); code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
taosArrayClear(pInfo->pCloseTs); taosArrayClear(pInfo->pCloseTs);
@ -1412,7 +1476,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) { if ((*ppRes) == NULL) {
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
} }
@ -1573,10 +1637,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->key = INT64_MIN;
pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
if (!pFillInfo->pResRow->pRowVal) { QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno);
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
@ -1590,6 +1651,21 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pCell->type = pColData->info.type; pCell->type = pColData->info.type;
} }
int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock);
if (numOfResCol < pFillSup->numOfAllCols) {
int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t));
QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno);
pFillSup->pOffsetInfo = pTmpBuf;
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1);
int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData);
for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) {
pFillSup->pOffsetInfo[i] = preLength;
pCell = getResultCell(pFillInfo->pResRow, i);
preLength += pCell->bytes + sizeof(SResultCellData);
}
}
pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData)); pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno); QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
pFillInfo->pNonFillRow->key = INT64_MIN; pFillInfo->pNonFillRow->key = INT64_MIN;
@ -1607,6 +1683,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->delIndex = 0; pFillInfo->delIndex = 0;
pFillInfo->curGroupId = 0; pFillInfo->curGroupId = 0;
pFillInfo->hasNext = false; pFillInfo->hasNext = false;
pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize);
return pFillInfo; return pFillInfo;
_end: _end:
@ -1650,21 +1727,18 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo*
} }
} }
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) { int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
if (IS_NORMAL_INTERVAL_OP(downstream)) { if (IS_NORMAL_INTERVAL_OP(downstream)) {
SStreamIntervalOperatorInfo* pInfo = downstream->info; SStreamIntervalOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger; *triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval; *pInterval = pInfo->interval;
(*ppAggSup) = NULL;
} else if (IS_CONTINUE_INTERVAL_OP(downstream)) { } else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
SStreamIntervalSliceOperatorInfo* pInfo = downstream->info; SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger; *triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval; *pInterval = pInfo->interval;
pInfo->hasFill = true; pInfo->hasFill = true;
(*ppAggSup) = &pInfo->streamAggSup;
pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState);
} else { } else {
code = TSDB_CODE_STREAM_INTERNAL_ERROR; code = TSDB_CODE_STREAM_INTERNAL_ERROR;
} }
@ -1677,8 +1751,31 @@ _end:
return code; return code;
} }
int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore,
SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
pInfo->stateStore = *pStore;
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno);
*(pInfo->pState) = *pState;
pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol);
code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs,
pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId,
STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo); QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1704,7 +1801,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
int8_t triggerType = 0; int8_t triggerType = 0;
SInterval interval = {0}; SInterval interval = {0};
code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup); code = getDownStreamInfo(downstream, &triggerType, &interval);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI, pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
@ -1759,9 +1856,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
pTaskInfo); pTaskInfo);
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
code = initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
QUERY_CHECK_CODE(code, lino, _error);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo, pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
} else { } else {
pInfo->pState = NULL;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo, pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
} }

View File

@ -74,6 +74,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(pInfo->pOffsetInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -163,7 +164,7 @@ _end:
} }
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock, void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) { int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t k = 0; k < pSup->numOfExprs; ++k) { for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) { if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
@ -175,7 +176,7 @@ void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId); SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
double prevVal = 0, curVal = 0, winVal = 0; double prevVal = 0, curVal = 0, winVal = 0;
SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId); SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData); GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex)); GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
@ -278,7 +279,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp); doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
0, pBlock->info.rows, numOfOutput); 0, pBlock->info.rows, numOfOutput);
@ -294,7 +295,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
} }
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
@ -302,7 +303,7 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) { if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) {
int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false); int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
TSKEY endRowTs = tsCols[endRowId]; TSKEY endRowTs = tsCols[endRowId];
transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL); transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
} }
SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId}; SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
@ -359,9 +360,14 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
return code; return code;
} }
if (pInfo->hasFill == false) { if (pInfo->recvCkBlock) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); pInfo->recvCkBlock = false;
} printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL; (*ppRes) = NULL;
return code; return code;
@ -392,8 +398,6 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
case STREAM_CHECKPOINT: { case STREAM_CHECKPOINT: {
pInfo->recvCkBlock = true; pInfo->recvCkBlock = true;
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
// doStreamIntervalSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
code = copyDataBlock(pInfo->pCheckpointRes, pBlock); code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
continue; continue;
@ -451,9 +455,13 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) { if ((*ppRes) == NULL) {
if (pInfo->hasFill == false) { if (pInfo->recvCkBlock) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); pInfo->recvCkBlock = false;
} printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
} }
@ -596,6 +604,9 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
code = getDownstreamRes(downstream, &pDownRes, &pPkCol); code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
QUERY_CHECK_CODE(code, lino, _error);
int32_t keyBytes = sizeof(TSKEY); int32_t keyBytes = sizeof(TSKEY);
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool); keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
if (pPkCol) { if (pPkCol) {

View File

@ -281,8 +281,34 @@ static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs, SColumnInfo* pPkCol, int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes) {
SStreamFillSupporter** ppResFillSup) { int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t numOfCol = taosArrayGetSize(pRes->pDataBlock);
int32_t preLength = 0;
int32_t* pOffsetInfo = taosMemoryCalloc(numOfCol, sizeof(int32_t));
QUERY_CHECK_NULL(pOffsetInfo, code, lino, _end, lino);
for (int32_t i = 0; i < numOfCol; i++) {
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, i);
pOffsetInfo[i] = preLength;
int32_t bytes = 1;
if (pColInfo != NULL) {
bytes = pColInfo->info.bytes;
}
preLength += bytes + sizeof(SResultCellData);
}
(*ppOffset) = pOffsetInfo;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs,
SSDataBlock* pInputRes, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
@ -320,6 +346,9 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE
pFillSup->comparePkColFn = NULL; pFillSup->comparePkColFn = NULL;
} }
code = initOffsetInfo(&pFillSup->pOffsetInfo, pInputRes);
QUERY_CHECK_CODE(code, lino, _end);
(*ppResFillSup) = pFillSup; (*ppResFillSup) = pFillSup;
_end: _end:
@ -359,17 +388,11 @@ _end:
} }
} }
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo) {
if (!pRowVal) { if (!pRowVal) {
return NULL; return NULL;
} }
char* pData = (char*)pRowVal; return POINTER_SHIFT(pRowVal, pCellOffsetInfo[index]);
SResultCellData* pCell = pRowVal;
for (int32_t i = 0; i < index; i++) {
pData += (pCell->bytes + sizeof(SResultCellData));
pCell = (SResultCellData*)pData;
}
return pCell;
} }
static bool isGroupKeyFunc(SExprInfo* pExprInfo) { static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
@ -414,9 +437,9 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = NULL; SResultCellData* pCell = NULL;
if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) { if (IS_FILL_CONST_VALUE(pFillSup->type) && (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr)) ) {
pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot); pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
} else { } else {
pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); pCell = getSliceResultCell(pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
} }
code = setRowCell(pDstCol, pBlock->info.rows, pCell); code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -475,7 +498,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else if (isInterpFunc(pFillCol->pExpr)) { } else if (isInterpFunc(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
colDataSetNULL(pDstCol, index); colDataSetNULL(pDstCol, index);
continue; continue;
@ -498,7 +521,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
destroySPoint(&cur); destroySPoint(&cur);
} else { } else {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
code = setRowCell(pDstCol, pBlock->info.rows, pCell); code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -956,8 +979,8 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo
if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) && if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) &&
!isIsfilledPseudoColumn(pFillCol->pExpr)) { !isIsfilledPseudoColumn(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot); SResultCellData* pSrcCell = getSliceResultCell(pFillSup->cur.pRowVal, srcSlot, pFillSup->pOffsetInfo);
SResultCellData* pDestCell = getResultCell(pFillInfo->pNonFillRow, srcSlot); SResultCellData* pDestCell = getSliceResultCell(pFillInfo->pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
pDestCell->isNull = pSrcCell->isNull; pDestCell->isNull = pSrcCell->isNull;
if (!pDestCell->isNull) { if (!pDestCell->isNull) {
memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes); memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes);
@ -966,11 +989,11 @@ static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo
} }
} }
static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol, int32_t* pOffsetInfo) {
for (int32_t i = 0; i < numOfCol; i++) { for (int32_t i = 0; i < numOfCol; i++) {
if (isInterpFunc(pFillCol[i].pExpr)) { if (isInterpFunc(pFillCol[i].pExpr)) {
int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId; int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pECell = getResultCell(pEndRow, slotId); SResultCellData* pECell = getSliceResultCell(pEndRow->pRowVal, slotId, pOffsetInfo);
SPoint* pPoint = taosArrayGet(pEndPoins, slotId); SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
pPoint->key = pEndRow->key; pPoint->key = pEndRow->key;
memcpy(pPoint->val, pECell->pData, pECell->bytes); memcpy(pPoint->val, pECell->pData, pECell->bytes);
@ -1112,7 +1135,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey; pFillSup->next.key = pFillSup->nextOriginKey;
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillSup->prev.key = pFillSup->prevOriginKey; pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
@ -1121,7 +1144,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillSup->prev.key = pFillSup->prevOriginKey; pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
@ -1132,7 +1155,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey; pFillSup->next.key = pFillSup->nextOriginKey;
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols); pFillSup->numOfAllCols, pFillSup->pOffsetInfo);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} }
@ -1253,11 +1276,11 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream
} }
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal, void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) { int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i); SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i, pCellOffsetInfo);
if (!colDataIsNull_s(pColData, rowId)) { if (!colDataIsNull_s(pColData, rowId)) {
pCell->isNull = false; pCell->isNull = false;
pCell->type = pColData->info.type; pCell->type = pColData->info.type;
@ -1378,7 +1401,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) { if (right) {
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap); pInfo->pDeletedMap);
@ -1397,7 +1420,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type); left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type);
if (left) { if (left) {
transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap,
needDel, pInfo->pDeletedMap); needDel, pInfo->pDeletedMap);
@ -1422,7 +1445,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
} }
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type); right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) { if (right) {
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo); transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo, pFillSup->pOffsetInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap); pInfo->pDeletedMap);
@ -1886,7 +1909,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
qDebug("===stream===build stream result, ts count:%d", size); qDebug("===stream===build stream result, ts count:%d", size);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i); TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(&pInfo->streamAggSup, ts, pInfo->pUpdated); code = buildAllResultKey(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated)); qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated));
@ -1951,7 +1974,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
continue; continue;
} }
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex; SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex;
SVariant* pVar = &(pValueCol->fillVal); SVariant* pVar = &(pValueCol->fillVal);
if (pCell->type == TSDB_DATA_TYPE_FLOAT) { if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
@ -1975,7 +1998,7 @@ static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
int32_t slotId = GET_DEST_SLOT_ID(pFillCol); int32_t slotId = GET_DEST_SLOT_ID(pFillCol);
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, slotId, pFillSup->pOffsetInfo);
pCell->isNull = true; pCell->isNull = true;
} }
} }
@ -2095,7 +2118,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = NULL; pInfo->pFillSup = NULL;
code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pPkCol, &pInfo->pFillSup); code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pDownRes, pPkCol, &pInfo->pFillSup);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
int32_t ratio = 1; int32_t ratio = 1;

View File

@ -1834,7 +1834,7 @@ int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) {
return deleteMark; return deleteMark;
} }
static TSKEY compareTs(void* pKey) { TSKEY compareTs(void* pKey) {
SWinKey* pWinKey = (SWinKey*)pKey; SWinKey* pWinKey = (SWinKey*)pKey;
return pWinKey->ts; return pWinKey->ts;
} }

View File

@ -3094,7 +3094,7 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf
qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr); qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr);

View File

@ -4130,7 +4130,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) { SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX}; SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
return streamStateFillSeekKeyNext_rocksdb(pState, &key); return streamStateFillSeekKeyPrev_rocksdb(pState, &key);
} }
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL

View File

@ -23,6 +23,33 @@
#define NUM_OF_CACHE_WIN 64 #define NUM_OF_CACHE_WIN 64
#define MAX_NUM_OF_CACHE_WIN 128 #define MAX_NUM_OF_CACHE_WIN 128
int32_t recoverSearchBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey start = {.groupId = groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = groupId};
int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
void* tmp = taosArrayPush(pWinStates, &tmpKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) { int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -38,22 +65,8 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
// recover // recover
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
TSKEY ts = getFlushMark(pFileState); code = recoverSearchBuff(pFileState, pWinStates, pKey->groupId);
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; QUERY_CHECK_CODE(code, lino, _end);
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = pKey->groupId};
int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
void* tmp = taosArrayPush(pWinStates, &tmpKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
} }
code = addSearchItem(pFileState, pWinStates, pKey); code = addSearchItem(pFileState, pWinStates, pKey);
@ -203,29 +216,17 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SArray* pWinStates = NULL; SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState); SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState); void* pState = getStateFileStore(pFileState);
void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); // void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff); code = addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates);
} else { QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); // recover
void* tmpVal = NULL; if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
int32_t len = 0; code = recoverSearchBuff(pFileState, pWinStates, pKey->groupId);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len); QUERY_CHECK_CODE(code, lino, _end);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur);
return code;
} }
int32_t size = taosArrayGetSize(pWinStates); int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index >= 0) { if (index >= 0) {

View File

@ -259,7 +259,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) { if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
code = recoverSnapshot(pFileState, checkpointId); code = recoverSnapshot(pFileState, checkpointId);
} else if (type == STREAM_STATE_BUFF_SORT) { } else if (type == STREAM_STATE_BUFF_SORT) {
code = recoverSesssion(pFileState, checkpointId); code = recoverSession(pFileState, checkpointId);
} else if (type == STREAM_STATE_BUFF_HASH_SORT) { } else if (type == STREAM_STATE_BUFF_HASH_SORT) {
code = recoverFillSnapshot(pFileState, checkpointId); code = recoverFillSnapshot(pFileState, checkpointId);
} }
@ -914,7 +914,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code; return code;
} }
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
int32_t winRes = TSDB_CODE_SUCCESS; int32_t winRes = TSDB_CODE_SUCCESS;
@ -991,6 +991,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
} }
winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen); winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__);
if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs); SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
@ -1007,6 +1008,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
memcpy(pNewPos->pRowBuff, pVal, vlen); memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
@ -1077,6 +1079,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t vlen = 0; int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__);
if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs); SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
@ -1085,9 +1088,17 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
break; break;
} }
if (vlen != pFileState->rowSize) {
qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
taosMemoryFreeClear(pVal);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, pVal, vlen); memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
qDebug("===stream=== read checkpoint state from disc. %s", __func__);
winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (winRes != TSDB_CODE_SUCCESS) { if (winRes != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
@ -1232,11 +1243,6 @@ void clearExpiredState(SStreamFileState* pFileState) {
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file); qTrace("clear expired file, ts:%" PRId64 ". %s at line %d res:%d", pKey->ts, __func__, __LINE__, code_file);
} }
if (pFileState->hasFillCatch == false) {
int32_t code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
qTrace("force clear expired file, ts:%" PRId64 ". %s at line %d res %d", pKey->ts, __func__, __LINE__, code_file);
}
} }
taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
} }

View File

@ -1,67 +0,0 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
run tsim/stream/checkTaskStatus.sim
sleep 70000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print select * from streamt2;
sql select * from streamt2;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -304,6 +304,253 @@ if $rows != 1 then
return -1 return -1
endi endi
print step3
print =============== create database
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1234567890t1 using st tags(1,1,1);
sql create table t1234567890t2 using st tags(2,2,2);
sql create stable streamt9(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stable streamt10(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stable streamt11(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stream streams9 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1, interp(b) from st partition by tbname as ta, b as cc every(2s) fill(value, 100000,200000);
sql create stream streams10 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a), sum(b),max(c) from st partition by tbname as ta, b as cc interval(2s) fill(NULL);
sql create stream streams11 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a),avg(c),min(b) from st partition by tbname as ta, b as cc interval(2s);
run tsim/stream/checkTaskStatus.sim
sql insert into t1234567890t1 values(now + 3s,100000,1,1);
$loop_count = 0
loop9:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta, * from streamt9;
sql select cc,ta, * from streamt9;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop9
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @100000@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 64 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt9";
sql select * from information_schema.ins_tables where stable_name = "streamt9";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%";
sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop10:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta, * from streamt10;
sql select cc,ta, * from streamt10;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop10
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @100000.000@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt10";
sql select * from information_schema.ins_tables where stable_name = "streamt10";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%";
sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop11:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta,* from streamt11;
sql select cc,ta,* from streamt11;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 1 then
print ======rows=$rows
goto loop11
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @1@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt11";
sql select * from information_schema.ins_tables where stable_name = "streamt11";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%";
sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print end print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,180 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/cfg.sh -n dnode1 -c ratioOfVnodeStreamThreads -v 4
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a),max(b) from st partition by tbname interval(5s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a), max(b) from st interval(5s);
sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, count(a), twa(b) from st partition by tbname interval(5s) fill(prev);
sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt4 as select _irowts, interp(a), interp(b) from st partition by tbname every(5s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt3;
sql select * from streamt3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows == 0 then
goto loop0
endi
print select * from streamt4;
sql select * from streamt4;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows == 0 then
goto loop0
endi
sleep 70000
$loop_count = 0
loop0_1:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print sql select * from information_schema.ins_stream_tasks where checkpoint_time is null;
sql select * from information_schema.ins_stream_tasks where checkpoint_time is null;
sleep 10000
if $rows > 0 then
print wait checkpoint.rows = $rows
goto loop0_1
endi
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
print select * from streamt3;
sql select * from streamt3;
$streamt3_rows = $rows
print =====streamt3_rows=$streamt3_rows
print select * from streamt4;
sql select * from streamt4;
$streamt4_rows = $rows
print =====streamt4_rows=$streamt4_rows
$loop_count = 0
loop1:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt3;
sql select * from streamt3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows <= $streamt3_rows then
print =====rows=$rows
print =====streamt3_rows=$streamt3_rows
goto loop1
endi
print select * from streamt4;
sql select * from streamt4;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows <= $streamt4_rows then
print =====rows=$rows
print =====streamt4_rows=$streamt4_rows
goto loop1
endi
sql insert into t1 values(now + 3000a,10,10,10);
$loop_count = 0
loop2:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1 order by 1;
sql select * from streamt1 order by 1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $data12 != 10 then
goto loop2
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT