From afc6023a8aafc146655416fe4550f9c359ff0afb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Apr 2023 06:32:33 +0000 Subject: [PATCH] merge rocksdb inst --- include/libs/stream/streamState.h | 1 + source/libs/stream/src/streamStateRocksdb.c | 229 +++++++++++--------- 2 files changed, 122 insertions(+), 108 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 301b7e7abc..350ce0dab5 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -56,6 +56,7 @@ typedef struct STdbState { void* env; SListNode* pComparNode; SBackendHandle* pBackendHandle; + char idstr[48]; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index c92486522b..45a7b46249 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -324,7 +324,7 @@ int32_t streaValueIsStale(void* k, int64_t ts) { typedef struct { void* tableOpt; -} rocksdbCfParam; +} RocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -357,8 +357,7 @@ typedef struct { } SCfInit; -#define GEN_COLUMN_FAMILY_NAME(name, streamId, taskId, SUBFIX) \ - sprintf(name, "%d_%d_%s", (streamId), (taskId), (SUBFIX)); +#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX)); SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, @@ -384,10 +383,11 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId); SBackendHandle* handle = backend; + sprintf(pState->pTdbState->idstr, "%d-%d", pState->streamId, pState->taskId); char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam)); + RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { cfOpt[i] = rocksdb_options_create(); @@ -414,12 +414,12 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); for (int i = 0; i < cfLen; i++) { char buf[64] = {0}; - GEN_COLUMN_FAMILY_NAME(buf, pState->streamId, pState->taskId, ginitDict[i].key); + GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key); cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); if (err != NULL) { - qError("rocksdb create column family failed, reason:%s", err); - taosMemoryFree(err); - return -1; + qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); + taosMemoryFreeClear(err); + // return -1; } } @@ -428,47 +428,54 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); pState->pTdbState->readOpts = rocksdb_readoptions_create(); pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; - // pState->pTdbState->pCompare = pCompare; pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = param; pState->pTdbState->pBackendHandle = handle; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); return 0; } void streamStateCloseBackend(SStreamState* pState, bool remove) { - char* status[] = {"remove", "drop"}; - qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 1 : 0], pState, pState->streamId, pState->taskId); + char* status[] = {"close", "drop"}; + qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); if (pState->pTdbState->rocksdb == NULL) { return; } - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - rocksdbCfParam* param = pState->pTdbState->param; + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - char* err = NULL; - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - for (int i = 0; i < cfLen; i++) { - if (remove) { + char* err = NULL; + if (remove) { + for (int i = 0; i < cfLen; i++) { rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); - } else { - rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); + if (err != NULL) { + qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); + taosMemoryFreeClear(err); + } } + } else { + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + for (int i = 0; i < cfLen; i++) { + rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); + if (err != NULL) { + qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); + taosMemoryFreeClear(err); + } + } + rocksdb_flushoptions_destroy(flushOpt); } - rocksdb_flushoptions_destroy(flushOpt); for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); } taosMemoryFreeClear(pState->pTdbState->pHandle); - for (int i = 0; i < cfLen; i++) { rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); - rocksdb_block_based_options_destroy(param[i].tableOpt); + rocksdb_block_based_options_destroy(((RocksdbCfParam*)pState->pTdbState->param)[i].tableOpt); } + if (remove) { streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode); } @@ -518,101 +525,107 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); *readOpt = rOpt; - // rocksdb_readoptions_set_snapshot(rOpt, *snapshot); + rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_fill_cache(rOpt, 0); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ - } \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to write to %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + code = -1; \ + } else { \ + qDebug("streamState str:%s succ to write to %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ + vLen); \ + } \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - if (pVal != NULL) *pVal = val; \ - if (vLen != NULL) *vLen = len; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ + funcname); \ + if (err != NULL) taosMemoryFree(err); \ + code = -1; \ + } else { \ + if (pVal != NULL) *pVal = val; \ + if (vLen != NULL) *vLen = len; \ + } \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + code = -1; \ + } else { \ + if (code == 0) \ + qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ + } \ } while (0); -#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ - if (err != NULL) { \ - qDebug("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - qDebug("streamState str: %s succ to del from %s", toString, funcname); \ - } \ +#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ + if (err != NULL) { \ + qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \ + } \ } while (0); int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {