diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index f11420edfb..6b96fcaa26 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -629,7 +629,8 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo code = -1; } else { *key = resKey; - *pVal = tmp; + *pVal = taosMemoryMalloc(*pVLen); + memcpy(*pVal, tmp, *pVLen); } } streamStateFreeCur(pCur); @@ -707,14 +708,14 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel(pState, key); + streamStateSessionDel_rocksdb(pState, key); goto _end; } void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { memcpy(tmp, *pVal, valSize); - streamStateSessionDel(pState, key); + streamStateSessionDel_rocksdb(pState, key); goto _end; } @@ -746,7 +747,7 @@ _end: return res; } -int32_t streamStateSessionClear(SStreamState* pState) { +int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); @@ -756,7 +757,7 @@ int32_t streamStateSessionClear(SStreamState* pState) { int32_t size = 0; int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { - memset(buf, 0, size); + // memset(buf, 0, size); streamStateSessionPut_rocksdb(pState, &delKey, buf, size); } else { break;