opt checkpoint

This commit is contained in:
Yihao Deng 2024-05-16 20:16:59 +08:00
parent 2359e0e61b
commit a7b434075b
3 changed files with 75 additions and 18 deletions

View File

@ -247,6 +247,7 @@ int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);

View File

@ -146,6 +146,9 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter,
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp);
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t copyFiles(const char* src, const char* dst); int32_t copyFiles(const char* src, const char* dst);
uint32_t nextPow2(uint32_t x); uint32_t nextPow2(uint32_t x);
@ -1017,7 +1020,7 @@ int chkpIdComp(const void* a, const void* b) {
return x < y ? -1 : 1; return x < y ? -1 : 1;
} }
int32_t chkpLoadInfo(STaskDbWrapper* pBackend) { int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
int32_t code = 0; int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256); char* pChkpDir = taosMemoryCalloc(1, 256);
@ -1087,7 +1090,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
goto _ERROR; goto _ERROR;
} }
rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); rocksdb_checkpoint_create(cp, path, 0, &err);
if (err != NULL) { if (err != NULL) {
stError("failed to do checkpoint at:%s, reason:%s", path, err); stError("failed to do checkpoint at:%s, reason:%s", path, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
@ -1152,9 +1155,12 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
taskDbAddRef(pTaskDb); taskDbAddRef(pTaskDb);
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); int64_t chkpId = pTaskDb->chkpId;
code = taskDbDoCheckpoint(pTaskDb, chkpId);
taskDbRemoveRef(pTaskDb); taskDbRemoveRef(pTaskDb);
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
SStreamTask* pTask = pTaskDb->pTask; SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId, SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId, .taskId = pTask->id.taskId,
@ -1167,6 +1173,28 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
return code; return code;
} }
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
if (pSnapInfo == NULL) return 0;
SStreamMeta* pMeta = arg;
int32_t code = 0;
taosThreadMutexLock(&pMeta->backendMutex);
char buf[128] = {0};
for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i);
sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId);
STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf));
if (pTaskDb == NULL) {
stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId);
continue;
}
memset(buf, 0, sizeof(buf));
taskDbUnRefChkp(pTaskDb, pSnap->chkpId);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return 0;
}
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
// if (arg == NULL) return 0; // if (arg == NULL) return 0;
@ -1946,13 +1974,32 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->chkpId = -1; pTaskDb->chkpId = -1;
pTaskDb->chkpCap = 4; pTaskDb->chkpCap = 4;
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
chkpLoadInfo(pTaskDb); taskDbLoadChkpInfo(pTaskDb);
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
} }
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
taosArrayPush(pTaskDb->chkpInUse, &chkp);
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) {
int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i);
if (*p == chkp) {
taosArrayRemove(pTaskDb->chkpInUse, i);
break;
}
}
taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
}
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
taosArrayDestroy(pTaskDb->chkpSaved); taosArrayDestroy(pTaskDb->chkpSaved);
taosArrayDestroy(pTaskDb->chkpInUse); taosArrayDestroy(pTaskDb->chkpInUse);
@ -2179,15 +2226,18 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code; return code;
} }
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
int32_t code = -1;
STaskDbWrapper* pDb = arg; STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type; ECHECKPOINT_BACKUP_TYPE utype = type;
taskDbRefChkp(pDb, chkpId);
if (utype == DATA_UPLOAD_RSYNC) { if (utype == DATA_UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == DATA_UPLOAD_S3) { } else if (utype == DATA_UPLOAD_S3) {
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
} }
return -1; taskDbUnRefChkp(pDb, chkpId);
return code;
} }
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) { int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {

View File

@ -75,6 +75,7 @@ struct SStreamSnapHandle {
char* metaPath; char* metaPath;
SArray* pDbSnapSet; SArray* pDbSnapSet;
SArray* pSnapInfoSet;
int32_t currIdx; int32_t currIdx;
int8_t delFlag; // 0 : not del, 1: del int8_t delFlag; // 0 : not del, 1: del
}; };
@ -140,7 +141,9 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt); return taosOpenFile(fullname, opt);
} }
int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); }
int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {
@ -291,29 +294,24 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
} }
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later // impl later
SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet); int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) { if (code != 0) {
return -1; return -1;
} }
SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i);
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile); code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0); ASSERT(code == 0);
taosArrayPush(pDbSnapSet, &snapFile); taosArrayPush(pDbSnapSet, &snapFile);
} }
for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(pSnapSet);
pHandle->pDbSnapSet = pDbSnapSet; pHandle->pDbSnapSet = pDbSnapSet;
pHandle->pSnapInfoSet = pSnapInfoSet;
pHandle->currIdx = 0; pHandle->currIdx = 0;
return 0; return 0;
@ -333,6 +331,14 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
} }
taosArrayDestroy(handle->pDbSnapSet); taosArrayDestroy(handle->pDbSnapSet);
} }
streamDestroyTasdDbSnapInfo(handle->handle, handle->pSnapInfoSet);
if (handle->pSnapInfoSet) {
for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i);
taosMemoryFree(pSnap->dbPrefixPath);
}
taosArrayDestroy(handle->pSnapInfoSet);
}
taosMemoryFree(handle->metaPath); taosMemoryFree(handle->metaPath);
return; return;
} }