diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 5a802447bd..20d25dbceb 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -96,7 +96,7 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) { int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { int32_t code = 0; const void* pKey = NULL; - const void* pVal = NULL; + void* pVal = NULL; int32_t kLen = 0; int32_t vLen = 0; SDecoder decoder; @@ -110,9 +110,15 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { NextTbl: except = 0; for (;;) { - if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { + const void* tVal = NULL; + int32_t tLen = 0; + if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &tVal, &tLen)) { except = 1; break; + } else { + pVal = taosMemoryCalloc(1, tLen); + memcpy(pVal, tVal, tLen); + vLen = tLen; } tdbTbcMoveToNext(pReader->pCur); break; @@ -144,6 +150,7 @@ NextTbl: pHdr->type = pPair->type; pHdr->size = vLen; memcpy(pHdr->data, pVal, vLen); + taosMemoryFree(pVal); tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5ada8e05a6..618f3c52e7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1727,17 +1727,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe rocksdb_readoptions_t** readOpt) { int idx = streamStateGetCfIdx(pState, cfKeyName); - rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); - *readOpt = rOpt; + *readOpt = rocksdb_readoptions_create(); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb); - rocksdb_readoptions_set_snapshot(rOpt, *snapshot); - rocksdb_readoptions_set_fill_cache(rOpt, 0); + rocksdb_readoptions_set_snapshot(*readOpt, *snapshot); + rocksdb_readoptions_set_fill_cache(*readOpt, 0); } - return rocksdb_create_iterator_cf(wrapper->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); + return rocksdb_create_iterator_cf(wrapper->rocksdb, *readOpt, + ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -2806,8 +2806,9 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb char buf[128] = {0}; int32_t klen = ginitDict[i].enFunc((void*)key, buf); - char* ttlV = NULL; - int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); + char* ttlV = NULL; + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); + rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV);