Merge pull request #28151 from taosdata/fix/TD-32254

valid iter
This commit is contained in:
Hongze Cheng 2024-09-27 17:17:28 +08:00 committed by GitHub
commit 839f85d857
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 8 deletions

View File

@ -3657,6 +3657,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
if (pCur->iter == NULL) {
streamStateFreeCur(pCur);
return NULL;
}
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@ -3679,6 +3683,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey));
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);

View File

@ -99,8 +99,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
}
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
@ -170,12 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
int32_t lino = 0;
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t tmpLen = len;
int32_t tmpLen = len;
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
_end:
@ -189,12 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa
int32_t lino = 0;
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t tmpLen = len;
int32_t tmpLen = len;
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
streamStateReleaseBuf(pState, pVal, false);