merge 3.0
This commit is contained in:
parent
7977490b81
commit
e00f79367d
|
@ -142,16 +142,17 @@ void streamBackendCleanup(void* arg) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(pHandle->cfInst);
|
taosHashCleanup(pHandle->cfInst);
|
||||||
|
|
||||||
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
if (pHandle->db) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
||||||
rocksdb_flush(pHandle->db, flushOpt, &err);
|
rocksdb_flush(pHandle->db, flushOpt, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to flush db before streamBackend clean up, reason:%s", err);
|
qError("failed to flush db before streamBackend clean up, reason:%s", err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
}
|
||||||
rocksdb_flushoptions_destroy(flushOpt);
|
rocksdb_flushoptions_destroy(flushOpt);
|
||||||
|
|
||||||
rocksdb_close(pHandle->db);
|
rocksdb_close(pHandle->db);
|
||||||
|
}
|
||||||
rocksdb_options_destroy(pHandle->dbOpt);
|
rocksdb_options_destroy(pHandle->dbOpt);
|
||||||
rocksdb_env_destroy(pHandle->env);
|
rocksdb_env_destroy(pHandle->env);
|
||||||
rocksdb_cache_destroy(pHandle->cache);
|
rocksdb_cache_destroy(pHandle->cache);
|
||||||
|
@ -650,7 +651,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
|
||||||
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
rocksdb_column_family_handle_destroy(inst->pHandle[i]);
|
if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb_writeoptions_destroy(inst->wOpt);
|
rocksdb_writeoptions_destroy(inst->wOpt);
|
||||||
|
@ -668,7 +669,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId, dummy = 0;
|
int32_t taskId, dummy = 0;
|
||||||
char suffix[64] = {0};
|
char suffix[64] = {0};
|
||||||
SHashObj* instTbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||||
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
|
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
|
||||||
|
@ -705,8 +705,12 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
qError("failed to open rocksdb cf, reason:%s", err);
|
qError("failed to open rocksdb cf, reason:%s", err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
} else {
|
} else {
|
||||||
qDebug("succ to open rocksdb cf, reason");
|
qDebug("succ to open rocksdb cf");
|
||||||
}
|
}
|
||||||
|
// close default cf
|
||||||
|
rocksdb_column_family_handle_destroy(cfHandle[0]);
|
||||||
|
rocksdb_options_destroy(cfOpts[0]);
|
||||||
|
handle->db = db;
|
||||||
|
|
||||||
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
for (int i = 0; i < nCf; i++) {
|
for (int i = 0; i < nCf; i++) {
|
||||||
|
@ -720,7 +724,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
int idx = streamGetInit(NULL, funcname);
|
int idx = streamGetInit(NULL, funcname);
|
||||||
|
|
||||||
RocksdbCfInst* inst = NULL;
|
RocksdbCfInst* inst = NULL;
|
||||||
RocksdbCfInst** pInst = taosHashGet(instTbl, idstr, strlen(idstr) + 1);
|
RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
|
||||||
if (pInst == NULL || *pInst == NULL) {
|
if (pInst == NULL || *pInst == NULL) {
|
||||||
inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
|
inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
|
||||||
inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||||
|
@ -745,7 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void** pIter = taosHashIterate(handle->cfInst, NULL);
|
void** pIter = taosHashIterate(handle->cfInst, NULL);
|
||||||
while (*pIter) {
|
while (pIter) {
|
||||||
RocksdbCfInst* inst = *pIter;
|
RocksdbCfInst* inst = *pIter;
|
||||||
|
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
@ -831,7 +835,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
||||||
pCompare[i] = compare;
|
pCompare[i] = compare;
|
||||||
}
|
}
|
||||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(1, cfLen * sizeof(rocksdb_column_family_handle_t*));
|
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||||
pState->pTdbState->rocksdb = handle->db;
|
pState->pTdbState->rocksdb = handle->db;
|
||||||
pState->pTdbState->pHandle = (void**)cfHandle;
|
pState->pTdbState->pHandle = (void**)cfHandle;
|
||||||
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
||||||
|
@ -873,7 +877,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
if (remove) {
|
if (remove) {
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
if (pState->pTdbState->pHandle[i] != NULL)
|
if (pState->pTdbState->pHandle[i] != NULL)
|
||||||
rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err);
|
rocksdb_drop_column_family(pState->pTdbState->rocksdb,
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
@ -951,8 +956,9 @@ int streamGetInit(SStreamState* pState, const char* funcName) {
|
||||||
qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId,
|
qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId,
|
||||||
funcName, err);
|
funcName, err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
}
|
} else {
|
||||||
pState->pTdbState->pHandle[idx] = cf;
|
pState->pTdbState->pHandle[idx] = cf;
|
||||||
|
}
|
||||||
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -982,7 +988,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
||||||
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
||||||
|
|
||||||
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
|
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
||||||
|
@ -999,7 +1006,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
||||||
char* ttlV = NULL; \
|
char* ttlV = NULL; \
|
||||||
|
@ -1029,7 +1037,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
||||||
size_t len = 0; \
|
size_t len = 0; \
|
||||||
|
@ -1076,7 +1085,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
||||||
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
|
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
|
||||||
|
|
Loading…
Reference in New Issue