fix stream case error

This commit is contained in:
yihaoDeng 2023-10-26 18:27:09 +08:00
parent 4c03d372ef
commit 7e1e68d8f5
1 changed files with 11 additions and 11 deletions

View File

@ -2287,7 +2287,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
taosMemoryFree(err); \
code = -1; \
} else { \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, ttlVLen, wrapper); \
} \
taosMemoryFree(ttlV); \
} while (0);
@ -2328,7 +2328,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
funcname); \
code = -1; \
} else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, wrapper); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
@ -2725,7 +2725,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2766,7 +2766,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2804,7 +2804,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -2907,7 +2907,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (pCur == NULL) return NULL;
@ -2968,7 +2968,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (!pCur) {
return NULL;
}
@ -3006,7 +3006,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
@ -3044,7 +3044,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
stDebug("streamStateSessionGetKeyByRange_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return -1;
}
@ -3314,7 +3314,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = createStreamStateCursor();
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->db = wrapper->db;
@ -3437,4 +3437,4 @@ uint32_t nextPow2(uint32_t x) {
x = x | (x >> 8);
x = x | (x >> 16);
return x + 1;
}
}