add ci
This commit is contained in:
parent
0d983e28ef
commit
82e18e32b0
|
@ -52,7 +52,6 @@ int32_t streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuf
|
||||||
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
|
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
|
||||||
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
|
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
|
||||||
void releaseRowBuffPos(SRowBuffPos* pBuff);
|
|
||||||
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
|
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
|
||||||
|
|
||||||
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
|
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
|
||||||
|
@ -67,7 +66,7 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
||||||
void* getRowStateBuff(SStreamFileState* pFileState);
|
void* getRowStateBuff(SStreamFileState* pFileState);
|
||||||
void* getStateFileStore(SStreamFileState* pFileState);
|
void* getStateFileStore(SStreamFileState* pFileState);
|
||||||
bool isDeteled(SStreamFileState* pFileState, TSKEY ts);
|
bool isDeteled(SStreamFileState* pFileState, TSKEY ts);
|
||||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts);
|
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap);
|
||||||
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState);
|
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState);
|
||||||
int32_t getRowStateRowSize(SStreamFileState* pFileState);
|
int32_t getRowStateRowSize(SStreamFileState* pFileState);
|
||||||
|
|
||||||
|
|
|
@ -1735,9 +1735,23 @@ void saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
|
||||||
tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
|
tSimpleHashPut(pStDelete, &key, sizeof(SSessionKey), NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
|
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
|
||||||
|
pAPI->streamStateReleaseBuf(pState, pPos, false);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
|
||||||
|
pAPI->streamStateReleaseBuf(pState, pPos, true);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey key) {
|
||||||
key.win.ekey = key.win.skey;
|
key.win.ekey = key.win.skey;
|
||||||
|
void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
if (pVal) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
|
||||||
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
}
|
||||||
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1746,7 +1760,7 @@ static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
|
||||||
pHashKey->win.ekey = pKey->win.skey;
|
pHashKey->win.ekey = pKey->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
|
static void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
|
||||||
if (tSimpleHashGetSize(pHashMap) == 0) {
|
if (tSimpleHashGetSize(pHashMap) == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1760,7 +1774,25 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
|
static void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins) {
|
||||||
|
if (tSimpleHashGetSize(pHashMap) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int32_t size = taosArrayGetSize(pWins);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SSessionKey* pWin = taosArrayGet(pWins, i);
|
||||||
|
if (!pWin) continue;
|
||||||
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(pWin, &key);
|
||||||
|
void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
if (pVal) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
|
||||||
|
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
|
||||||
int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
|
int32_t rows, int32_t start, int64_t gap, SSHashObj* pResultRows, SSHashObj* pStUpdated,
|
||||||
SSHashObj* pStDeleted) {
|
SSHashObj* pStDeleted) {
|
||||||
for (int32_t i = start; i < rows; ++i) {
|
for (int32_t i = start; i < rows; ++i) {
|
||||||
|
@ -1771,7 +1803,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
|
||||||
if (pStDeleted && pWinInfo->isOutput) {
|
if (pStDeleted && pWinInfo->isOutput) {
|
||||||
saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
|
saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pStUpdated, pResultRows, pWinInfo->sessionWin);
|
removeSessionResult(pAggSup, pStUpdated, pResultRows, pWinInfo->sessionWin);
|
||||||
pWinInfo->sessionWin.win.skey = pStartTs[i];
|
pWinInfo->sessionWin.win.skey = pStartTs[i];
|
||||||
}
|
}
|
||||||
pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
|
pWinInfo->sessionWin.win.ekey = TMAX(pWinInfo->sessionWin.win.ekey, pStartTs[i]);
|
||||||
|
@ -1824,11 +1856,6 @@ static int32_t setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
|
|
||||||
pAPI->streamStateReleaseBuf(pState, pPos, false);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
|
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
|
||||||
SResultWindowInfo* pNextWin) {
|
SResultWindowInfo* pNextWin) {
|
||||||
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
|
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->sessionWin);
|
||||||
|
@ -1879,7 +1906,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
||||||
if (winInfo.isOutput && pStDeleted) {
|
if (winInfo.isOutput && pStDeleted) {
|
||||||
saveDeleteRes(pStDeleted, winInfo.sessionWin);
|
saveDeleteRes(pStDeleted, winInfo.sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
|
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
|
||||||
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
|
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
|
||||||
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||||
winNum++;
|
winNum++;
|
||||||
|
@ -1955,7 +1982,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
setSessionWinOutputInfo(pStUpdated, &winInfo);
|
setSessionWinOutputInfo(pStUpdated, &winInfo);
|
||||||
winRows = updateSessionWindowInfo(&winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
|
winRows = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
|
||||||
pAggSup->pResultRows, pStUpdated, pStDeleted);
|
pAggSup->pResultRows, pStUpdated, pStDeleted);
|
||||||
|
|
||||||
int64_t winDelta = 0;
|
int64_t winDelta = 0;
|
||||||
|
@ -2009,8 +2036,10 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
|
static inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) {
|
||||||
SSessionKey* pWin1 = (SSessionKey*)pKey1;
|
SResultWindowInfo* pWinInfo1 = (SResultWindowInfo*)pKey1;
|
||||||
SSessionKey* pWin2 = (SSessionKey*)pKey2;
|
SResultWindowInfo* pWinInfo2 = (SResultWindowInfo*)pKey2;
|
||||||
|
SSessionKey* pWin1 = &pWinInfo1->sessionWin;
|
||||||
|
SSessionKey* pWin2 = &pWinInfo2->sessionWin;
|
||||||
|
|
||||||
if (pWin1->groupId > pWin2->groupId) {
|
if (pWin1->groupId > pWin2->groupId) {
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -2210,27 +2239,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
|
|
||||||
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
|
||||||
SRowBuffPos* pPos = *(SRowBuffPos**) taosArrayGet(pGroupResInfo->pRows, i);
|
SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
|
||||||
|
SRowBuffPos* pPos = pWinInfo->pStatePos;
|
||||||
SResultRow* pRow = NULL;
|
SResultRow* pRow = NULL;
|
||||||
int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
|
|
||||||
SSessionKey* pKey = (SSessionKey*) pPos->pKey;
|
SSessionKey* pKey = (SSessionKey*) pPos->pKey;
|
||||||
|
|
||||||
if (code == -1) {
|
|
||||||
// for history
|
|
||||||
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
|
|
||||||
pKey->win.skey, pKey->win.ekey, pKey->groupId);
|
|
||||||
pGroupResInfo->index += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
|
||||||
// no results, continue to check the next one
|
|
||||||
if (pRow->numOfRows == 0) {
|
|
||||||
pGroupResInfo->index += 1;
|
|
||||||
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pBlock->info.id.groupId == 0) {
|
if (pBlock->info.id.groupId == 0) {
|
||||||
pBlock->info.id.groupId = pKey->groupId;
|
pBlock->info.id.groupId = pKey->groupId;
|
||||||
|
|
||||||
|
@ -2245,17 +2258,31 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
||||||
} else {
|
} else {
|
||||||
// current value belongs to different group, it can't be packed into one datablock
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
if (pBlock->info.id.groupId != pKey->groupId) {
|
if (pBlock->info.id.groupId != pKey->groupId) {
|
||||||
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
|
||||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
ASSERT(pBlock->info.rows > 0);
|
ASSERT(pBlock->info.rows > 0);
|
||||||
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code == -1) {
|
||||||
|
// for history
|
||||||
|
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
|
||||||
|
pKey->win.skey, pKey->win.ekey, pKey->groupId);
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||||
|
// no results, continue to check the next one
|
||||||
|
if (pRow->numOfRows == 0) {
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
pGroupResInfo->index += 1;
|
pGroupResInfo->index += 1;
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfExprs; ++j) {
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
@ -2283,7 +2310,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
||||||
|
|
||||||
pBlock->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
releaseOutputBuf(pState, pPos, &pAPI->stateStore);
|
|
||||||
}
|
}
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2332,15 +2358,16 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SRowBuffPos* pPos = taosArrayGetP(pAllWins, size - 1);
|
SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1);
|
||||||
SSessionKey* pSeKey = pPos->pKey;
|
SSessionKey* pSeKey = pWinInfo->pStatePos->pKey;
|
||||||
taosArrayPush(pMaxWins, pSeKey);
|
taosArrayPush(pMaxWins, pSeKey);
|
||||||
if (pSeKey->groupId == 0) {
|
if (pSeKey->groupId == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
uint64_t preGpId = pSeKey->groupId;
|
uint64_t preGpId = pSeKey->groupId;
|
||||||
for (int32_t i = size - 2; i >= 0; i--) {
|
for (int32_t i = size - 2; i >= 0; i--) {
|
||||||
pSeKey = taosArrayGet(pAllWins, i);
|
pWinInfo = taosArrayGet(pAllWins, i);
|
||||||
|
pSeKey = pWinInfo->pStatePos->pKey;
|
||||||
if (preGpId != pSeKey->groupId) {
|
if (preGpId != pSeKey->groupId) {
|
||||||
taosArrayPush(pMaxWins, pSeKey);
|
taosArrayPush(pMaxWins, pSeKey);
|
||||||
preGpId = pSeKey->groupId;
|
preGpId = pSeKey->groupId;
|
||||||
|
@ -2499,7 +2526,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
if (!pInfo->pStUpdated) {
|
if (!pInfo->pStUpdated) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -2517,7 +2544,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||||
removeSessionResults(pInfo->pStUpdated, pWins);
|
removeSessionResults(pAggSup, pInfo->pStUpdated, pWins);
|
||||||
if (IS_FINAL_SESSION_OP(pOperator)) {
|
if (IS_FINAL_SESSION_OP(pOperator)) {
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
int32_t childIndex = getChildIndex(pBlock);
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||||
|
@ -2576,7 +2603,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
|
closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
|
||||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||||
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
||||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||||
if (pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
}
|
}
|
||||||
|
@ -2857,7 +2884,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
if (!pInfo->pStUpdated) {
|
if (!pInfo->pStUpdated) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -2875,8 +2902,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||||
removeSessionResults(pInfo->pStUpdated, pWins);
|
removeSessionResults(pAggSup, pInfo->pStUpdated, pWins);
|
||||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
||||||
taosArrayDestroy(pWins);
|
taosArrayDestroy(pWins);
|
||||||
pInfo->clearState = true;
|
pInfo->clearState = true;
|
||||||
|
@ -2908,7 +2935,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
|
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||||
|
|
||||||
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
||||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||||
|
|
||||||
if(pInfo->isHistoryOp) {
|
if(pInfo->isHistoryOp) {
|
||||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
@ -3099,7 +3126,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
|
int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId,
|
||||||
SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
|
SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
|
||||||
SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
|
SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
|
||||||
*allEqual = true;
|
*allEqual = true;
|
||||||
|
@ -3122,7 +3149,7 @@ int32_t updateStateWindowInfo(SStateWindowInfo* pWinInfo, SStateWindowInfo* pNex
|
||||||
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
|
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
|
||||||
saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
|
saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
|
removeSessionResult(pAggSup, pSeUpdated, pResultRows, pWinInfo->winInfo.sessionWin);
|
||||||
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
|
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
|
||||||
}
|
}
|
||||||
pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
|
pWinInfo->winInfo.sessionWin.win.ekey = TMAX(pWinInfo->winInfo.sessionWin.win.ekey, pTs[i]);
|
||||||
|
@ -3179,7 +3206,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
|
||||||
}
|
}
|
||||||
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
|
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
|
||||||
winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
|
winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
|
||||||
pAggSup->pResultRows, pSeUpdated, pStDeleted);
|
pAggSup->pResultRows, pSeUpdated, pStDeleted);
|
||||||
if (!allEqual) {
|
if (!allEqual) {
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
|
@ -3356,7 +3383,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SSessionKey));
|
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
}
|
}
|
||||||
if (!pInfo->pSeUpdated) {
|
if (!pInfo->pSeUpdated) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -3373,7 +3400,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
pBlock->info.type == STREAM_CLEAR) {
|
pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, pWins);
|
||||||
removeSessionResults(pInfo->pSeUpdated, pWins);
|
removeSessionResults(&pInfo->streamAggSup, pInfo->pSeUpdated, pWins);
|
||||||
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
||||||
taosArrayDestroy(pWins);
|
taosArrayDestroy(pWins);
|
||||||
continue;
|
continue;
|
||||||
|
@ -3405,7 +3432,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
|
||||||
copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
|
||||||
removeSessionResults(pInfo->pSeDeleted, pInfo->pUpdated);
|
removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated);
|
||||||
|
|
||||||
if (pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
@ -3461,7 +3488,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
|
||||||
pNextWin->sessionWin.groupId);
|
pNextWin->sessionWin.groupId);
|
||||||
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
|
||||||
}
|
}
|
||||||
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
|
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
|
||||||
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
|
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
|
||||||
releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
|
releaseOutputBuf(pAggSup->pState, pNextWin->pStatePos, &pAggSup->pSessionAPI->stateStore);
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
|
||||||
if (inSessionWindow(pPos->pKey, startTs, gap)) {
|
if (inSessionWindow(pPos->pKey, startTs, gap)) {
|
||||||
(*pVal) = pPos;
|
(*pVal) = pPos;
|
||||||
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
||||||
|
pPos->beUsed = true;
|
||||||
*pKey = *pDestWinKey;
|
*pKey = *pDestWinKey;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -127,17 +128,19 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
|
||||||
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
|
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
|
||||||
(*pVal) = pPos;
|
(*pVal) = pPos;
|
||||||
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
||||||
|
pPos->beUsed = true;
|
||||||
*pKey = *pDestWinKey;
|
*pKey = *pDestWinKey;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index + 1 == 0) {
|
if (index + 1 == 0) {
|
||||||
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
|
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
|
||||||
void* p = NULL;
|
void* p = NULL;
|
||||||
void* pFileStore = getStateFileStore(pFileState);
|
void* pFileStore = getStateFileStore(pFileState);
|
||||||
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
|
int32_t code = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
|
|
||||||
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
|
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
|
||||||
|
@ -193,6 +196,7 @@ _end:
|
||||||
|
|
||||||
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
void* pBuff = NULL;
|
void* pBuff = NULL;
|
||||||
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
|
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
|
||||||
|
@ -387,6 +391,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
} else if (code == TSDB_CODE_SUCCESS && pVal) {
|
} else if (code == TSDB_CODE_SUCCESS && pVal) {
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
|
||||||
|
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
memcpy(pNewPos->pRowBuff, pData, *pVLen);
|
memcpy(pNewPos->pRowBuff, pData, *pVLen);
|
||||||
(*pVal) = pNewPos;
|
(*pVal) = pNewPos;
|
||||||
|
@ -496,11 +501,12 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index + 1 == 0) {
|
if (index + 1 == 0) {
|
||||||
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs)) {
|
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
|
||||||
void* p = NULL;
|
void* p = NULL;
|
||||||
void* pFileStore = getStateFileStore(pFileState);
|
void* pFileStore = getStateFileStore(pFileState);
|
||||||
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
|
int32_t code = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
|
||||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||||
|
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
|
||||||
pNewPos->needFree = true;
|
pNewPos->needFree = true;
|
||||||
|
|
||||||
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
|
qDebug("===stream===get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code);
|
||||||
|
|
|
@ -351,7 +351,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
|
||||||
|
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
||||||
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
||||||
releaseRowBuffPos(pos);
|
streamFileStateReleaseBuff(pState->pFileState, pos, false);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,7 +717,10 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRowBuffPos* pos = (SRowBuffPos*)value;
|
SRowBuffPos* pos = (SRowBuffPos*)value;
|
||||||
if (pos->needFree) {
|
if (pos->needFree) {
|
||||||
if (isFlushedState(pState->pFileState, key->win.ekey)) {
|
if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
|
||||||
|
if (!pos->pRowBuff) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
|
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
|
||||||
streamStateReleaseBuf(pState, pos, true);
|
streamStateReleaseBuf(pState, pos, true);
|
||||||
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
|
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
|
||||||
|
|
|
@ -239,6 +239,27 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) {
|
||||||
|
uint64_t i = 0;
|
||||||
|
SListIter iter = {0};
|
||||||
|
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
|
SListNode* pNode = NULL;
|
||||||
|
while ((pNode = tdListNext(&iter)) != NULL && i < max) {
|
||||||
|
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
|
||||||
|
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0)) {
|
||||||
|
tdListAppend(pFlushList, &pPos);
|
||||||
|
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
|
||||||
|
pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pPos->pKey, pFileState->keyLen);
|
||||||
|
tdListPopNode(pFileState->usedBuffs, pNode);
|
||||||
|
taosMemoryFreeClear(pNode);
|
||||||
|
if (pPos->pRowBuff) {
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void streamFileStateClear(SStreamFileState* pFileState) {
|
void streamFileStateClear(SStreamFileState* pFileState) {
|
||||||
pFileState->flushMark = INT64_MIN;
|
pFileState->flushMark = INT64_MIN;
|
||||||
pFileState->maxTs = INT64_MIN;
|
pFileState->maxTs = INT64_MIN;
|
||||||
|
@ -283,11 +304,14 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
|
||||||
|
|
||||||
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
|
||||||
num = TMAX(num, FLUSH_NUM);
|
num = TMAX(num, FLUSH_NUM);
|
||||||
|
clearFlushedRowBuff(pFileState, pFlushList, num);
|
||||||
|
if (isListEmpty(pFlushList)) {
|
||||||
popUsedBuffs(pFileState, pFlushList, num, false);
|
popUsedBuffs(pFileState, pFlushList, num, false);
|
||||||
|
|
||||||
if (isListEmpty(pFlushList)) {
|
if (isListEmpty(pFlushList)) {
|
||||||
popUsedBuffs(pFileState, pFlushList, num, true);
|
popUsedBuffs(pFileState, pFlushList, num, true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
flushSnapshot(pFileState, pFlushList, false);
|
flushSnapshot(pFileState, pFlushList, false);
|
||||||
|
|
||||||
|
@ -383,7 +407,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
|
||||||
memcpy(pNewPos->pKey, pKey, keyLen);
|
memcpy(pNewPos->pKey, pKey, keyLen);
|
||||||
|
|
||||||
TSKEY ts = pFileState->getTs(pKey);
|
TSKEY ts = pFileState->getTs(pKey);
|
||||||
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts)) {
|
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void* p = NULL;
|
void* p = NULL;
|
||||||
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
|
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
|
||||||
|
@ -450,8 +474,6 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
|
|
||||||
|
|
||||||
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
|
||||||
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
|
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
|
||||||
: pFileState->maxTs - pFileState->deleteMark;
|
: pFileState->maxTs - pFileState->deleteMark;
|
||||||
|
@ -663,8 +685,8 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
|
||||||
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
|
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts) {
|
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) {
|
||||||
return ts <= pFileState->flushMark;
|
return ts <= (pFileState->flushMark + gap);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
|
int32_t getRowStateRowSize(SStreamFileState* pFileState) {
|
||||||
|
|
|
@ -11,7 +11,7 @@ sql connect
|
||||||
sql create database test vgroups 1;
|
sql create database test vgroups 1;
|
||||||
sql use test;
|
sql use test;
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s);
|
sql create stream streams0 trigger at_once ignore expired 0 ignore update 0 into streamt as select _wstart, count(*) c1 from t1 interval(1s);
|
||||||
|
|
||||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||||
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
||||||
|
@ -77,7 +77,7 @@ sql create database test2 vgroups 10;
|
||||||
sql use test2;
|
sql use test2;
|
||||||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create stream streams2 trigger at_once ignore expired 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
|
sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
|
||||||
|
|
||||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||||
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
||||||
|
@ -137,4 +137,183 @@ if $rows != 29 then
|
||||||
goto loop3
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step2=============
|
||||||
|
|
||||||
|
sql create database test1 vgroups 1;
|
||||||
|
sql use test1;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql create stream streams1 trigger at_once ignore expired 0 ignore update 1 into streamt1 as select _wstart, count(*) c1 from t1 session(ts, 1s);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791215000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791217000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791219000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791221000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791225000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791227000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791229000,1,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791231000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791235000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791237000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791239000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791241000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791243000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791245000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791247000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791249000,1,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791251000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791253000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791255000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791257000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791259000,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791261000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791263000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791265000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791267000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791269000,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 30 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791211001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791215001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791217001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791219001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791221001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791225001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791227001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791229001,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 30 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data91 != 2 then
|
||||||
|
print =====data91=$data91
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791231001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791235001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791237001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791239001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791241001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791243001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791245001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791247001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791249001,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 30 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[10][1] != 2 then
|
||||||
|
print =====data[10][1]=$data[10][1]
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[19][1] != 2 then
|
||||||
|
print =====data[19][1]=$data[19][1]
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791251001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791253001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791255001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791257001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791259001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791261001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791263001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791265001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791267001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791269001,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop7:
|
||||||
|
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 select * from streamt1;
|
||||||
|
sql select * from streamt1;
|
||||||
|
|
||||||
|
if $rows != 30 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[20][1] != 2 then
|
||||||
|
print =====[20][1]=$[20][1]
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[29][1] != 2 then
|
||||||
|
print =====[29][1]=$[29][1]
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue