From b4bd6a4f1c3d72564f6a1dd308858d95a02506a3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 10 May 2023 10:49:10 +0000 Subject: [PATCH] refactor code --- source/libs/stream/src/streamBackendRocksdb.c | 123 ++++++++---------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index db4ec17b19..70aeed3cdd 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -649,17 +649,9 @@ const char* compactFilteFactoryName(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { - // int64_t unixTime = taosGetTimestampMs(); if (streamStateValueIsStale((char*)val)) { return 1; } - // SStreamValue value; - // memset(&value, 0, sizeof(value)); - // streamValueDecode(&value, (char*)val); - // taosMemoryFree(value.data); - // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) { - // return 1; - // } return 0; } const char* compactFilteName(void* arg) { return "stream_filte"; } @@ -703,7 +695,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { memcpy(cfNames[0], "default", strlen("default")); continue; } - qError("cf name %s", idstr); + qDebug("cf name %s", idstr); GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); if (i % cfLen == 0) { @@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); } } - for (int i = 0; i < nSize * cfLen + 1; i++) { - qError("cf name %s", cfNames[i]); - } rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); for (int i = 0; i < nSize * cfLen + 1; i++) { @@ -1012,53 +1001,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - char * p = NULL, *end = NULL; \ - int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ - if (len < 0) { \ - qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ - code = -1; \ - } else { \ - qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ - } \ - if (pVal != NULL) { \ - *pVal = p; \ - } else { \ - taosMemoryFree(p); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = len; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + if (err == NULL) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ + funcname); \ + } else { \ + qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (len < 0) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ + len); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = len; \ + } \ + if (code == 0) \ + qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -1133,10 +1120,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // eLen); if (err != NULL) { - qWarn( - "failed to delete range cf(state) err: %s, " - "start: %s, end:%s", - err, toStringStart, toStringEnd); + qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); } @@ -1588,20 +1572,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { return -1; } - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + size_t klen, vlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen); winKeyDecode(&winKey, keyStr); - size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + // char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); if (len < 0) { return -1; } - - if (pVal != NULL) *pVal = (char*)dst; - if (pVLen != NULL) *pVLen = vlen; + if (pVLen != NULL) *pVLen = len; *pKey = winKey; return 0;