refactor checkpoint

This commit is contained in:
yihaoDeng 2023-08-19 16:10:03 +08:00
parent 52788aca1c
commit 1c5346b13c
1 changed files with 100 additions and 59 deletions

View File

@ -420,7 +420,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
/*param@1: checkpointId dir
param@2: state
copy checkpointdir's file to state dir
copy pChkpIdDir's file to state dir
opt to set hard link to previous file
*/
char* state = taosMemoryCalloc(1, strlen(path) + 32);
@ -770,8 +770,9 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
return 0;
}
int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray* refs) {
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
while (pIter) {
int64_t id = *(int64_t*)pIter;
@ -790,6 +791,77 @@ int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray*
taosArrayPush(refs, &id);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
int32_t nCf = taosArrayGetSize(pHandle);
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
ppCf[i] = taosArrayGetP(pHandle, i);
}
taosArrayDestroy(pHandle);
*ppHandle = ppCf;
return nCf;
}
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
int32_t code = -1;
char* err = NULL;
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err);
if (cp == NULL || err != NULL) {
qError("failed to do checkpoint at:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
goto _ERROR;
}
rocksdb_checkpoint_create(cp, path, 64 << 20, &err);
if (err != NULL) {
qError("failed to do checkpoint at:%s, reason:%s", path, err);
taosMemoryFreeClear(err);
} else {
code = 0;
}
_ERROR:
rocksdb_checkpoint_object_destroy(cp);
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
int code = -1;
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
}
code = 0;
rocksdb_flushoptions_destroy(flushOpt);
return code;
}
int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256);
char* pChkpIdDir = taosMemoryCalloc(1, 256);
sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(pChkpDir, 0755);
if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
code = -1;
return code;
}
sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
if (taosIsDir(pChkpIdDir)) {
qInfo("stream rm exist checkpoint%s", pChkpIdDir);
taosRemoveFile(pChkpIdDir);
}
*chkpDir = pChkpDir;
*chkpIdDir = pChkpIdDir;
return 0;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
@ -801,66 +873,33 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SArray* refs = taosArrayInit(16, sizeof(int64_t));
SArray* pCf = taosArrayInit(16, POINTER_BYTES);
char path[256] = {0};
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(path, 0755);
if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
return code;
}
rocksdb_column_family_handle_t** ppCf = NULL;
char checkpointDir[256] = {0};
snprintf(checkpointDir, tListLen(checkpointDir), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, checkpointId);
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
goto _ERROR;
}
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL) {
return -1;
if (pHandle == NULL || pHandle->db == NULL) {
goto _ERROR;
}
streamBackendGetAllCfHandle(pMeta, pCf, refs);
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
int32_t nCf = taosArrayGetSize(pCf);
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
ppCf[i] = taosArrayGetP(pCf, i);
}
qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf);
if (taosIsDir(checkpointDir)) {
qInfo("stream rm exist checkpoint%s", checkpointDir);
taosRemoveFile(checkpointDir);
}
if (pHandle->db != NULL) {
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush_cfs(pHandle->db, flushOpt, ppCf, nCf, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
}
rocksdb_flushoptions_destroy(flushOpt);
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err);
if (cp == NULL || err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err);
taosMemoryFreeClear(err);
code = -1;
goto _ERROR;
}
rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err);
if (err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err);
taosMemoryFreeClear(err);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
if (code != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
} else {
code = 0;
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, checkpointDir,
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
taosGetTimestampMs() - st);
}
rocksdb_checkpoint_object_destroy(cp);
} else {
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
for (int i = 0; i < taosArrayGetSize(refs); i++) {
@ -871,15 +910,17 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock);
// delete obsolte checkpoint
delObsoleteCheckpoint(arg, pChkpDir);
}
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
delObsoleteCheckpoint(arg, path);
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
return code;
}