add session iter

This commit is contained in:
yihaoDeng 2023-10-10 15:12:21 +08:00
parent 825341d90a
commit b36163a197
4 changed files with 41 additions and 14 deletions

View File

@ -65,7 +65,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
@ -85,11 +85,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState); SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState);
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen); int32_t* pVLen);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,

View File

@ -1962,7 +1962,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
memset(*pVal, 0, size); memset(*pVal, 0, size);
return 0; return 0;
} }
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb"); qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1; if (!pCur) return -1;
@ -2190,10 +2190,6 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = createStreamStateCursor(); SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
@ -2215,6 +2211,15 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey); STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
return pCur; return pCur;
} }
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
return 0;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");

View File

@ -670,7 +670,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
return streamStateCurPrev_rocksdb(pState, pCur); return streamStateCurPrev_rocksdb(pCur);
#else #else
if (!pCur) { if (!pCur) {
return -1; return -1;
@ -714,7 +714,7 @@ void streamStateFreeVal(void* val) {
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRowBuffPos* pos = (SRowBuffPos*)value; SRowBuffPos* pos = (SRowBuffPos*)value;
if (pos->needFree) { if (pos->needFree) {
if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { if (isFlushedState(pState->pFileState, key->win.ekey, 0)) {
@ -725,7 +725,7 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
streamStateReleaseBuf(pState, pos, true); streamStateReleaseBuf(pState, pos, true);
putFreeBuff(pState->pFileState, pos); putFreeBuff(pState->pFileState, pos);
qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code); key->win.ekey, key->groupId, code);
} else { } else {
code = putSessionWinResultBuff(pState->pFileState, value); code = putSessionWinResultBuff(pState->pFileState, value);
} }

View File

@ -641,6 +641,26 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
return code; return code;
} }
int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
int code = TSDB_CODE_SUCCESS;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
while (code == TSDB_CODE_SUCCESS) {
void* pVal = NULL;
int32_t vlen = 0;
SSessionKey key = {0};
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &vlen);
if (code != 0) {
break;
}
taosMemoryFree(pVal);
code = streamStateSessionCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
return code;
}
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) { if (pFileState->maxTs != INT64_MIN) {
@ -660,9 +680,9 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
break; break;
} }
void* pVal = NULL; void* pVal = NULL;
int32_t pVLen = 0; int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &pVLen); code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs); SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
@ -670,8 +690,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
break; break;
} }
ASSERT(pVLen == pFileState->rowSize); ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, pVLen); memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal); taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true; pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
@ -679,7 +699,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
break; break;
} }
code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur); code = streamStateCurPrev_rocksdb(pCur);
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);