merge rocksdb inst

This commit is contained in:
yihaoDeng 2023-04-28 06:32:33 +00:00
parent c4f36d32ae
commit afc6023a8a
2 changed files with 122 additions and 108 deletions

View File

@ -56,6 +56,7 @@ typedef struct STdbState {
void* env;
SListNode* pComparNode;
SBackendHandle* pBackendHandle;
char idstr[48];
TDB* db;
TTB* pStateDb;

View File

@ -324,7 +324,7 @@ int32_t streaValueIsStale(void* k, int64_t ts) {
typedef struct {
void* tableOpt;
} rocksdbCfParam;
} RocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
typedef int (*EncodeFunc)(void* key, char* buf);
@ -357,8 +357,7 @@ typedef struct {
} SCfInit;
#define GEN_COLUMN_FAMILY_NAME(name, streamId, taskId, SUBFIX) \
sprintf(name, "%d_%d_%s", (streamId), (taskId), (SUBFIX));
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX));
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
@ -384,10 +383,11 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId);
SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "%d-%d", pState->streamId, pState->taskId);
char* err = NULL;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam));
RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create();
@ -414,12 +414,12 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < cfLen; i++) {
char buf[64] = {0};
GEN_COLUMN_FAMILY_NAME(buf, pState->streamId, pState->taskId, ginitDict[i].key);
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key);
cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err);
if (err != NULL) {
qError("rocksdb create column family failed, reason:%s", err);
taosMemoryFree(err);
return -1;
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
// return -1;
}
}
@ -428,47 +428,54 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
pState->pTdbState->readOpts = rocksdb_readoptions_create();
pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
// pState->pTdbState->pCompare = pCompare;
pState->pTdbState->dbOpt = handle->dbOpt;
pState->pTdbState->param = param;
pState->pTdbState->pBackendHandle = handle;
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
return 0;
}
void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* status[] = {"remove", "drop"};
qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 1 : 0], pState, pState->streamId, pState->taskId);
char* status[] = {"close", "drop"};
qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId);
if (pState->pTdbState->rocksdb == NULL) {
return;
}
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
rocksdbCfParam* param = pState->pTdbState->param;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
for (int i = 0; i < cfLen; i++) {
if (remove) {
char* err = NULL;
if (remove) {
for (int i = 0; i < cfLen; i++) {
rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err);
} else {
rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
} else {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
for (int i = 0; i < cfLen; i++) {
rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
rocksdb_flushoptions_destroy(flushOpt);
}
rocksdb_flushoptions_destroy(flushOpt);
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
}
taosMemoryFreeClear(pState->pTdbState->pHandle);
for (int i = 0; i < cfLen; i++) {
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_block_based_options_destroy(param[i].tableOpt);
rocksdb_block_based_options_destroy(((RocksdbCfParam*)pState->pTdbState->param)[i].tableOpt);
}
if (remove) {
streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode);
}
@ -518,101 +525,107 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt;
// rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
} \
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to write to %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
code = -1; \
} else { \
qDebug("streamState str:%s succ to write to %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
vLen); \
} \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
if (pVal != NULL) *pVal = val; \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
if (pVal != NULL) *pVal = val; \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
code = -1; \
} else { \
if (code == 0) \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
qDebug("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to del from %s", toString, funcname); \
} \
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
taosMemoryFree(err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} \
} while (0);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {