session state recover

This commit is contained in:
liuyao 2023-10-13 11:32:35 +08:00
parent 9b3fa72fea
commit 429125be93
3 changed files with 25 additions and 5 deletions

View File

@ -79,6 +79,9 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen); int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen);
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen);
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId);
void sessionWinStateClear(SStreamFileState* pFileState); void sessionWinStateClear(SStreamFileState* pFileState);
void sessionWinStateCleanup(void* pBuff); void sessionWinStateCleanup(void* pBuff);

View File

@ -3590,7 +3590,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0}; SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0}; SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i); pSeKeyBuf[i].groupId, i);
getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo); getStateWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo, &nextInfo);

View File

@ -177,7 +177,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
// todo(liuyao) optimize // todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) { if (type == STREAM_STATE_BUFF_HASH) {
recoverSnapshot(pFileState, checkpointId); recoverSnapshot(pFileState, checkpointId);
} else {
recoverSesssion(pFileState, checkpointId);
} }
return pFileState; return pFileState;
_error: _error:
@ -642,12 +645,24 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
} }
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark);
}
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore); SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
} }
int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) { while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount >= recoverNum) {
break;
}
void* pVal = NULL; void* pVal = NULL;
int32_t vlen = 0; int32_t vlen = 0;
SSessionKey key = {0}; SSessionKey key = {0};
@ -655,12 +670,14 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
if (code != 0) { if (code != 0) {
break; break;
} }
taosMemoryFree(pVal); SRowBuffPos* pPos = createSessionWinBuff(pFileState, &key, pVal, &vlen);
putSessionWinResultBuff(pFileState, pPos);
code = streamStateSessionCurPrev_rocksdb(pCur); code = streamStateSessionCurPrev_rocksdb(pCur);
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
} }
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) { if (pFileState->maxTs != INT64_MIN) {
@ -674,11 +691,12 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
} }
int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) { while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount == pFileState->maxRowCount) { if (pFileState->curRowCount >= recoverNum) {
break; break;
} }
void* pVal = NULL; void* pVal = NULL;
int32_t vlen = 0; int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);