From 7e1e68d8f5b679f871999194c03b104d18c687da Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 26 Oct 2023 18:27:09 +0800 Subject: [PATCH] fix stream case error --- source/libs/stream/src/streamBackendRocksdb.c | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4add9d2912..f9b77b4e9a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2287,7 +2287,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(err); \ code = -1; \ } else { \ - qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ + qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, ttlVLen, wrapper); \ } \ taosMemoryFree(ttlV); \ } while (0); @@ -2328,7 +2328,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe funcname); \ code = -1; \ } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ + qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, wrapper); \ } \ taosMemoryFree(val); \ if (vLen != NULL) *vLen = tlen; \ @@ -2725,7 +2725,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta stDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2766,7 +2766,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2804,7 +2804,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -2907,7 +2907,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (pCur == NULL) return NULL; @@ -2968,7 +2968,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (!pCur) { return NULL; } @@ -3006,7 +3006,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return NULL; } @@ -3044,7 +3044,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { stDebug("streamStateSessionGetKeyByRange_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { return -1; } @@ -3314,7 +3314,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; pCur->db = wrapper->db; @@ -3437,4 +3437,4 @@ uint32_t nextPow2(uint32_t x) { x = x | (x >> 8); x = x | (x >> 16); return x + 1; -} +} \ No newline at end of file