refactor checkpoint
This commit is contained in:
parent
126c7fa6a5
commit
b1c50e23b6
|
@ -1043,8 +1043,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err);
|
||||||
rocksdb_checkpoint_create(cp, path, 64 << 20, &err);
|
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("failed to do checkpoint at:%s, reason:%s", path, err);
|
stError("failed to do checkpoint at:%s, reason:%s", path, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
@ -1192,20 +1191,23 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
|
||||||
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
||||||
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
||||||
|
|
||||||
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
|
int64_t written = atomic_load_64(&pTaskDb->dataWritten);
|
||||||
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
|
||||||
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
if (written > 0) {
|
||||||
} else {
|
stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
|
||||||
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf);
|
||||||
taosGetTimestampMs() - st);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
|
stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written);
|
||||||
|
}
|
||||||
|
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
||||||
|
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
||||||
|
} else {
|
||||||
|
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
||||||
|
taosGetTimestampMs() - st);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
|
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
|
||||||
pTaskDb->dataWritten = 0;
|
atomic_store_64(&pTaskDb->dataWritten, 0);
|
||||||
|
|
||||||
pTaskDb->chkpId = chkpId;
|
pTaskDb->chkpId = chkpId;
|
||||||
|
|
||||||
_EXIT:
|
_EXIT:
|
||||||
|
@ -1588,7 +1590,7 @@ int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest) {
|
||||||
if (*dest == NULL) {
|
if (*dest == NULL) {
|
||||||
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
size_t size = sizeof(key.unixTimestamp) + sizeof(key.len) + sizeof(key.rawLen) + sizeof(key.compress) + key.len;
|
||||||
char* p = taosMemoryCalloc(1, size);
|
char* p = taosMemoryCalloc(1, size);
|
||||||
char* buf = p;
|
char* buf = p;
|
||||||
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.len);
|
len += taosEncodeFixedI32((void**)&buf, key.len);
|
||||||
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
len += taosEncodeFixedI32((void**)&buf, key.rawLen);
|
||||||
|
@ -1846,7 +1848,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
|
||||||
rocksdb_options_set_create_if_missing(opts, 1);
|
rocksdb_options_set_create_if_missing(opts, 1);
|
||||||
rocksdb_options_set_create_missing_column_families(opts, 1);
|
rocksdb_options_set_create_missing_column_families(opts, 1);
|
||||||
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
|
// rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
|
||||||
rocksdb_options_set_recycle_log_file_num(opts, 6);
|
// rocksdb_options_set_ecycle_log_file_num(opts, 6);
|
||||||
rocksdb_options_set_max_write_buffer_number(opts, 3);
|
rocksdb_options_set_max_write_buffer_number(opts, 3);
|
||||||
rocksdb_options_set_info_log_level(opts, 1);
|
rocksdb_options_set_info_log_level(opts, 1);
|
||||||
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
|
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
|
||||||
|
@ -2563,7 +2565,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
|
@ -2640,7 +2642,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
atomic_add_fetch_64(&wrapper->dataWritten, 1); \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
|
@ -2681,7 +2683,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
stDebug("streamStateClear_rocksdb");
|
stDebug("streamStateClear_rocksdb");
|
||||||
|
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
char sKeyStr[128] = {0};
|
char sKeyStr[128] = {0};
|
||||||
char eKeyStr[128] = {0};
|
char eKeyStr[128] = {0};
|
||||||
|
@ -3705,7 +3707,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
|
||||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
void* val, int32_t vlen, int64_t ttl) {
|
void* val, int32_t vlen, int64_t ttl) {
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
int i = streamStateGetCfIdx(pState, cfKeyName);
|
int i = streamStateGetCfIdx(pState, cfKeyName);
|
||||||
if (i < 0) {
|
if (i < 0) {
|
||||||
|
@ -3739,7 +3741,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
||||||
|
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
|
||||||
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
|
|
||||||
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
|
||||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||||
|
@ -3758,7 +3761,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
wrapper->dataWritten += 1;
|
atomic_add_fetch_64(&wrapper->dataWritten, 1);
|
||||||
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
stError("streamState failed to write batch, err:%s", err);
|
stError("streamState failed to write batch, err:%s", err);
|
||||||
|
|
Loading…
Reference in New Issue