From f0f2215f49b7a3703dc74b8d593cceef40230e53 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 17 Jul 2023 06:21:45 +0000 Subject: [PATCH] add checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 98055fbeea..20899b2922 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1340,35 +1340,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChk return rocksdb_create_iterator_cf(wrapper->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->rocksdb; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + qError("streamState str: %s failed to write to %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qTrace("streamState str:%s succ to write to %s_%s, rowValLen:%d, ttlValLen:%d", toString, wrapper->idstr, \ + funcname, vLen, ttlVLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -2404,6 +2405,12 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* pChkptFileName, ro rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); + + { + char tbuf[256] = {0}; + ginitDict[i].toStrFunc((void*)key, tbuf); + qDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[i].key); + } return 0; } @@ -2424,7 +2431,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf); - qDebug("streamState str: %s succ to write to %s", tbuf, ginitDict[cfIdx].key); + qDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key); } return 0; }