diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f24186c673..ba12c47bc0 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -408,6 +408,7 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { int64_t tc = 0; int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); if (sz <= 0) { + taosWUnLockLatch(&pMeta->checkpointDirLock); return -1; } else { tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved); @@ -623,9 +624,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, return ret; } } -int streamStateValueIsStale(char* vv) { +int streamStateValueIsStale(char* v) { int64_t ts = 0; - taosDecodeFixedI64(vv, &ts); + taosDecodeFixedI64(v, &ts); return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0; } int iterValueIsStale(rocksdb_iterator_t* iter) { @@ -956,33 +957,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; if (streamStateValueIsStale(p)) { - if (dest != NULL) *dest = NULL; - return -1; + goto _EXCEPT; } p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) { - if (dest != NULL) *dest = NULL; qError("vlen: %d, read len: %d", vlen, key.len); - return -1; + goto _EXCEPT; } + if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len); - if (key.len == 0) { - key.data = NULL; - } else { - p = taosDecodeBinary(p, (void**)&(key.data), key.len); - } - - if (ttl != NULL) { - int64_t now = taosGetTimestampMs(); - *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; - } - if (dest != NULL) { - *dest = key.data; - } else { - taosMemoryFree(key.data); - } + if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs(); return key.len; + +_EXCEPT: + if (dest != NULL) *dest = NULL; + if (ttl != NULL) *ttl = 0; + return -1; } const char* compareDefaultName(void* arg) { @@ -1096,9 +1087,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } else { qDebug("succ to open rocksdb cf"); } - // close default cf + // close default cf and destroy default cfOpts if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_options_destroy(cfOpts[0]); + handle->db = db; static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -2354,9 +2346,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { } int32_t streamDefaultIterValid_rocksdb(void* iter) { SStreamStateCur* pCur = iter; - bool val = rocksdb_iter_valid(pCur->iter); - - return val ? 1 : 0; + return rocksdb_iter_valid(pCur->iter) ? 1 : 0; } void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { SStreamStateCur* pCur = iter; @@ -2372,13 +2362,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { } char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { SStreamStateCur* pCur = iter; - int32_t vlen = 0; - char* dst = NULL; - const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); - if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { + char* ret = NULL; + + int32_t vlen = 0; + const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); + *len = decodeValueFunc((void*)val, vlen, NULL, &ret); + if (*len < 0) { return NULL; } - return dst; + + return ret; } // batch func void* streamStateCreateBatch() { @@ -2433,6 +2426,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb if (tmpBuf == NULL) { taosMemoryFree(ttlV); } + { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf);