fix checkpoint error

This commit is contained in:
yihaoDeng 2023-08-19 12:25:22 +08:00
parent 1dad0b007d
commit 52788aca1c
1 changed files with 13 additions and 9 deletions

View File

@ -826,6 +826,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
} }
qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf); qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf);
if (taosIsDir(checkpointDir)) {
qInfo("stream rm exist checkpoint%s", checkpointDir);
taosRemoveFile(checkpointDir);
}
if (pHandle->db != NULL) { if (pHandle->db != NULL) {
char* err = NULL; char* err = NULL;
@ -841,19 +846,18 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err);
if (cp == NULL || err != NULL) { if (cp == NULL || err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
code = -1; code = -1;
goto _ERROR; goto _ERROR;
} }
rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err); rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err);
if (err != NULL) { if (err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
} else { } else {
code = 0; code = 0;
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, path, qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, checkpointDir,
taosGetTimestampMs() - st); taosGetTimestampMs() - st);
} }
rocksdb_checkpoint_object_destroy(cp); rocksdb_checkpoint_object_destroy(cp);
@ -863,9 +867,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
int64_t id = *(int64_t*)taosArrayGet(refs, i); int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id); taosReleaseRef(streamBackendCfWrapperId, id);
} }
taosWLockLatch(&pMeta->chkpDirLock); if (code == 0) {
taosArrayPush(pMeta->chkpSaved, &checkpointId); taosWLockLatch(&pMeta->chkpDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock); taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock);
}
taosArrayDestroy(refs); taosArrayDestroy(refs);
taosMemoryFree(ppCf); taosMemoryFree(ppCf);
@ -874,8 +880,6 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
_ERROR: _ERROR:
taosReleaseRef(streamBackendId, backendRid); taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
return code; return code;
} }