refactor checkpoint
This commit is contained in:
parent
6fe649c458
commit
9ef69dd455
|
@ -871,14 +871,14 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
SArray* refs = taosArrayInit(16, sizeof(int64_t));
|
SArray* refs = taosArrayInit(16, sizeof(int64_t));
|
||||||
SArray* pCf = taosArrayInit(16, POINTER_BYTES);
|
|
||||||
|
|
||||||
rocksdb_column_family_handle_t** ppCf = NULL;
|
rocksdb_column_family_handle_t** ppCf = NULL;
|
||||||
|
|
||||||
char* pChkpDir = NULL;
|
char* pChkpDir = NULL;
|
||||||
char* pChkpIdDir = NULL;
|
char* pChkpIdDir = NULL;
|
||||||
if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
|
if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
|
||||||
goto _ERROR;
|
taosArrayDestroy(refs);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
||||||
|
@ -886,6 +886,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get all cf and acquire cfWrappter
|
||||||
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
||||||
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
|
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
|
||||||
|
|
||||||
|
@ -901,7 +902,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
||||||
} else {
|
} else {
|
||||||
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
|
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
|
||||||
}
|
}
|
||||||
|
// release all ref to cfWrapper;
|
||||||
for (int i = 0; i < taosArrayGetSize(refs); i++) {
|
for (int i = 0; i < taosArrayGetSize(refs); i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(refs, i);
|
int64_t id = *(int64_t*)taosArrayGet(refs, i);
|
||||||
taosReleaseRef(streamBackendCfWrapperId, id);
|
taosReleaseRef(streamBackendCfWrapperId, id);
|
||||||
|
|
Loading…
Reference in New Issue