diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 704bc9a2f2..bb6362ff73 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -247,6 +247,7 @@ int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); +int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index db13da5f3b..2e65c9ef16 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -146,6 +146,9 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); +void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); +void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); + #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); int32_t copyFiles(const char* src, const char* dst); uint32_t nextPow2(uint32_t x); @@ -1010,6 +1013,50 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* } #endif +int chkpIdComp(const void* a, const void* b) { + int64_t x = *(int64_t*)a; + int64_t y = *(int64_t*)b; + if (x == y) return 0; + + return x < y ? -1 : 1; +} +int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { + int32_t code = 0; + char* pChkpDir = taosMemoryCalloc(1, 256); + + sprintf(pChkpDir, "%s%s%s", pBackend->path, TD_DIRSEP, "checkpoints"); + if (!taosIsDir(pChkpDir)) { + taosMemoryFree(pChkpDir); + return 0; + } + TdDirPtr pDir = taosOpenDir(pChkpDir); + if (pDir == NULL) { + taosMemoryFree(pChkpDir); + return 0; + } + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + if (taosDirEntryIsDir(de)) { + char checkpointPrefix[32] = {0}; + int64_t checkpointId = 0; + + int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); + if (ret == 1) { + taosArrayPush(pBackend->chkpSaved, &checkpointId); + } + } else { + continue; + } + } + taosArraySort(pBackend->chkpSaved, chkpIdComp); + + taosMemoryFree(pChkpDir); + taosCloseDir(&pDir); + + return 0; +} int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { SArray* pHandle = taosArrayInit(8, POINTER_BYTES); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -1043,8 +1090,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { taosMemoryFreeClear(err); goto _ERROR; } - - rocksdb_checkpoint_create(cp, path, 64 << 20, &err); + rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); if (err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); @@ -1056,7 +1102,6 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { - if (nCf == 0) return 0; int code = 0; char* err = NULL; @@ -1109,9 +1154,12 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; taskDbAddRef(pTaskDb); - code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); + int64_t chkpId = pTaskDb->chkpId; + code = taskDbDoCheckpoint(pTaskDb, chkpId); taskDbRemoveRef(pTaskDb); + taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, @@ -1124,6 +1172,28 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { return code; } +int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { + if (pSnapInfo == NULL) return 0; + SStreamMeta* pMeta = arg; + int32_t code = 0; + taosThreadMutexLock(&pMeta->backendMutex); + + char buf[128] = {0}; + for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i); + sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId); + STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); + if (pTaskDb == NULL) { + stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId); + continue; + } + memset(buf, 0, sizeof(buf)); + + taskDbUnRefChkp(pTaskDb, pSnap->chkpId); + } + taosThreadMutexUnlock(&pMeta->backendMutex); + return 0; +} #ifdef BUILD_NO_CALL int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { // if (arg == NULL) return 0; @@ -1192,20 +1262,23 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf); stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); - if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) { - if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { - stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); - } else { - stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, - taosGetTimestampMs() - st); - } + int64_t written = atomic_load_64(&pTaskDb->dataWritten); + + if (written > 0) { + stDebug("stream backend:%p start to flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); + code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf); } else { - stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir); + stDebug("stream backend:%p not need flush db at:%s, data written:%" PRId64 "", pTaskDb, pChkpIdDir, written); + } + if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { + stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); + } else { + stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, + taosGetTimestampMs() - st); } code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); - pTaskDb->dataWritten = 0; - + atomic_store_64(&pTaskDb->dataWritten, 0); pTaskDb->chkpId = chkpId; _EXIT: @@ -1846,7 +1919,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); - rocksdb_options_set_recycle_log_file_num(opts, 6); + // rocksdb_options_set_ecycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 1); rocksdb_options_set_db_write_buffer_size(opts, 256 << 20); @@ -1900,11 +1973,32 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { pTaskDb->chkpId = -1; pTaskDb->chkpCap = 4; pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + taskDbLoadChkpInfo(pTaskDb); + pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); } +void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { + taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + taosArrayPush(pTaskDb->chkpInUse, &chkp); + taosArraySort(pTaskDb->chkpInUse, chkpIdComp); + taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); +} + +void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { + taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) { + int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); + if (*p == chkp) { + taosArrayRemove(pTaskDb->chkpInUse, i); + break; + } + } + taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); +} + void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { taosArrayDestroy(pTaskDb->chkpSaved); taosArrayDestroy(pTaskDb->chkpInUse); @@ -2131,15 +2225,18 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { + int32_t code = -1; STaskDbWrapper* pDb = arg; ECHECKPOINT_BACKUP_TYPE utype = type; + taskDbRefChkp(pDb, chkpId); if (utype == DATA_UPLOAD_RSYNC) { - return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); + code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == DATA_UPLOAD_S3) { - return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); + code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } - return -1; + taskDbUnRefChkp(pDb, chkpId); + return code; } int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) { @@ -2563,7 +2660,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - wrapper->dataWritten += 1; \ + atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ @@ -2640,7 +2737,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - wrapper->dataWritten += 1; \ + atomic_add_fetch_64(&wrapper->dataWritten, 1); \ char toString[128] = {0}; \ if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ @@ -2681,7 +2778,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { stDebug("streamStateClear_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -3707,7 +3804,7 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { @@ -3741,7 +3838,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + + atomic_add_fetch_64(&wrapper->dataWritten, 1); rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -3760,7 +3858,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { char* err = NULL; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - wrapper->dataWritten += 1; + atomic_add_fetch_64(&wrapper->dataWritten, 1); rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { stError("streamState failed to write batch, err:%s", err); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 1800324cc8..eb67e61c4c 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -74,7 +74,9 @@ struct SStreamSnapHandle { int32_t currFileIdx; char* metaPath; + void* pMeta; SArray* pDbSnapSet; + SArray* pSnapInfoSet; int32_t currIdx; int8_t delFlag; // 0 : not del, 1: del }; @@ -140,7 +142,9 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } +int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } + +int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -291,30 +295,26 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { } int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later - SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); - int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet); + SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); + int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { return -1; } SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { - SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); + for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); SBackendSnapFile2 snapFile = {0}; code = streamBackendSnapInitFile(path, pSnap, &snapFile); ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } - for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { - SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); - taosMemoryFree(pSnap->dbPrefixPath); - } - taosArrayDestroy(pSnapSet); - pHandle->pDbSnapSet = pDbSnapSet; + pHandle->pSnapInfoSet = pSnapInfoSet; pHandle->currIdx = 0; + pHandle->pMeta = pMeta; return 0; _err: @@ -333,6 +333,14 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { } taosArrayDestroy(handle->pDbSnapSet); } + streamDestroyTasdDbSnapInfo(handle->pMeta, handle->pSnapInfoSet); + if (handle->pSnapInfoSet) { + for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i); + taosMemoryFree(pSnap->dbPrefixPath); + } + taosArrayDestroy(handle->pSnapInfoSet); + } taosMemoryFree(handle->metaPath); return; }