check cursor for count window

This commit is contained in:
54liuyao 2024-06-13 15:35:26 +08:00
parent 8bbdfacca0
commit 367b3b153f
2 changed files with 8 additions and 5 deletions

View File

@ -516,24 +516,25 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
return pBuffCur; return pBuffCur;
} }
winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
taosMemoryFreeClear(pVal);
streamStateFreeCur(pBuffCur);
if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) { if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) {
streamStateFreeCur(pCur); streamStateCurNext(pFileStore, pCur);
return pBuffCur; return pCur;
} }
streamStateCurPrev(pFileStore, pCur); streamStateCurPrev(pFileStore, pCur);
while (1) { while (1) {
code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len);
if (code == TSDB_CODE_FAILED) { if (code == TSDB_CODE_FAILED) {
streamStateCurNext(pFileStore, pCur); streamStateCurNext(pFileStore, pCur);
streamStateFreeCur(pBuffCur);
return pCur; return pCur;
} }
winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE))));
taosMemoryFreeClear(pVal);
if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) { if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) {
streamStateCurPrev(pFileStore, pCur); streamStateCurPrev(pFileStore, pCur);
} else { } else {
streamStateCurNext(pFileStore, pCur); streamStateCurNext(pFileStore, pCur);
streamStateFreeCur(pBuffCur);
return pCur; return pCur;
} }
} }
@ -568,7 +569,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
void* pData = NULL; void* pData = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
if (taosArrayGetSize(pWinStates) > 0 && if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
transformCursor(pCur->pStreamFileState, pCur); transformCursor(pCur->pStreamFileState, pCur);
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) { if (pVal) {
@ -590,6 +591,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
} }
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {
qDebug("move cursor to next");
if (pCur && pCur->buffIndex >= 0) { if (pCur && pCur->buffIndex >= 0) {
pCur->buffIndex++; pCur->buffIndex++;
} else { } else {

View File

@ -654,6 +654,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qDebug("move cursor to next");
return streamStateCurPrev_rocksdb(pCur); return streamStateCurPrev_rocksdb(pCur);
#else #else
if (!pCur) { if (!pCur) {