diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9bc4a91f1c..a9e593eaad 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -419,9 +419,9 @@ typedef struct SStreamMeta { int64_t rid; int64_t chkpId; + int32_t chkpCap; SArray* chkpSaved; SArray* chkpInUse; - int32_t chkpCap; SRWLatch chkpDirLock; int32_t pauseTaskNum; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index f5c303b809..b79979df67 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -66,6 +66,13 @@ typedef struct { TdThreadMutex mutex; char* idstr; int64_t refId; + char* path; + + int64_t chkpId; + SArray* chkpSaved; + SArray* chkpInUse; + int32_t chkpCap; + TdThreadRwlock chkpDirLock; } STaskBackendWrapper; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9e1a7ceeec..4edb166982 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -709,6 +709,60 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { taosArrayDestroy(chkpDel); return 0; } +/* + * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| + * chkpInUse: |--cp2--|--cp4--| + * chkpInUse is doing translation, cannot del until + * replication is finished + */ +int32_t chkpDelObsolete(void* arg, char* path) { + STaskBackendWrapper* pBackend = arg; + taosThreadRwlockWrlock(&pBackend->chkpDirLock); + + SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); + SArray* chkpDup = taosArrayInit(8, sizeof(int64_t)); + + int64_t firsId = 0; + if (taosArrayGetSize(pBackend->chkpInUse) >= 1) { + firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0); + + for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) { + int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); + if (id >= firsId) { + taosArrayPush(chkpDup, &id); + } else { + taosArrayPush(chkpDel, &id); + } + } + } else { + int32_t sz = taosArrayGetSize(pBackend->chkpSaved); + int32_t dsz = sz - pBackend->chkpCap; // del size + + for (int i = 0; i < dsz; i++) { + int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); + taosArrayPush(chkpDel, &id); + } + for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { + int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); + taosArrayPush(chkpDup, &id); + } + } + taosArrayDestroy(pBackend->chkpSaved); + pBackend->chkpSaved = chkpDup; + + taosThreadRwlockUnlock(&pBackend->chkpDirLock); + + for (int i = 0; i < taosArrayGetSize(chkpDel); i++) { + int64_t id = *(int64_t*)taosArrayGet(chkpDel, i); + char tbuf[256] = {0}; + sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id); + if (taosIsDir(tbuf)) { + taosRemoveDir(tbuf); + } + } + taosArrayDestroy(chkpDel); + return 0; +} static int32_t compareCheckpoint(const void* a, const void* b) { int64_t x = *(int64_t*)a; @@ -805,6 +859,27 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* // *ppHandle = ppCf; // return nCf; } + +int32_t chkpGetAllDbCfHandle2(STaskBackendWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { + SArray* pHandle = taosArrayInit(8, POINTER_BYTES); + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (pBackend->pCf[i]) { + rocksdb_column_family_handle_t* p = pBackend->pCf[i]; + taosArrayPush(pHandle, &p); + } + } + int32_t nCf = taosArrayGetSize(pHandle); + + rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + for (int i = 0; i < nCf; i++) { + ppCf[i] = taosArrayGetP(pHandle, i); + } + taosArrayDestroy(pHandle); + + *ppHandle = ppCf; + return nCf; +} + int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { int32_t code = -1; char* err = NULL; @@ -827,47 +902,45 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { - return 0; - // int code = 0; - // char* err = NULL; + int code = 0; + char* err = NULL; - // rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - // rocksdb_flushoptions_set_wait(flushOpt, 1); + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); - // rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); - // if (err != NULL) { - // qError("failed to flush db before streamBackend clean up, reason:%s", err); - // taosMemoryFree(err); - // code = -1; - // } - // rocksdb_flushoptions_destroy(flushOpt); - // return code; + rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + code = -1; + } + rocksdb_flushoptions_destroy(flushOpt); + return code; } int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { + int32_t code = 0; + char* pChkpDir = taosMemoryCalloc(1, 256); + char* pChkpIdDir = taosMemoryCalloc(1, 256); + + sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); + code = taosMulModeMkDir(pChkpDir, 0755, true); + if (code != 0) { + qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + code = -1; + return code; + } + + sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); + if (taosIsDir(pChkpIdDir)) { + qInfo("stream rm exist checkpoint%s", pChkpIdDir); + taosRemoveFile(pChkpIdDir); + } + *chkpDir = pChkpDir; + *chkpIdDir = pChkpIdDir; + return 0; - // int32_t code = 0; - // char* pChkpDir = taosMemoryCalloc(1, 256); - // char* pChkpIdDir = taosMemoryCalloc(1, 256); - - // sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); - // code = taosMulModeMkDir(pChkpDir, 0755, true); - // if (code != 0) { - // qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); - // taosMemoryFree(pChkpDir); - // taosMemoryFree(pChkpIdDir); - // code = -1; - // return code; - // } - - // sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); - // if (taosIsDir(pChkpIdDir)) { - // qInfo("stream rm exist checkpoint%s", pChkpIdDir); - // taosRemoveFile(pChkpIdDir); - // } - // *chkpDir = pChkpDir; - // *chkpIdDir = pChkpIdDir; - - // return 0; } int32_t streamBackendTriggerChkp(void* arg, char* dst) { @@ -936,6 +1009,49 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { // taosWUnLockLatch(&pMeta->chkpDirLock); } +int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { + STaskBackendWrapper* pBackend = arg; + int64_t st = taosGetTimestampMs(); + int32_t code = -1; + int64_t refId = pBackend->refId; + + if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) { + return -1; + } + + char* pChkpDir = NULL; + char* pChkpIdDir = NULL; + if (chkpPreCheckDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { + goto _EXIT; + } + // Get all cf and acquire cfWrappter + rocksdb_column_family_handle_t** ppCf = NULL; + + int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf); + + if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) { + if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) { + qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir); + } else { + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir, + taosGetTimestampMs() - st); + } + } else { + qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir); + } + + code = chkpDelObsolete(pBackend, pChkpDir); + taosReleaseRef(taskBackendWrapperId, refId); + + return code; + +_EXIT: + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + taosReleaseRef(taskBackendWrapperId, refId); + return -1; +} int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { return 0; // SStreamMeta* pMeta = arg; @@ -1514,7 +1630,7 @@ void taskBackendRemoveRef(void* pTaskBackend) { } // void taskBackendDestroy(STaskBackendWrapper* wrapper); -void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) { +void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) { rocksdb_env_t* env = rocksdb_create_default_env(); rocksdb_cache_t* cache = rocksdb_cache_create_lru(256); @@ -1568,10 +1684,26 @@ void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) { } return; } -int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) { +void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) { + pBackend->chkpId = -1; + pBackend->chkpCap = 4; + pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + pBackend->chkpInUse = taosArrayInit(4, sizeof(int64_t)); + + taosThreadRwlockInit(&pBackend->chkpDirLock, NULL); +} + +void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) { + taosArrayDestroy(pBackend->chkpSaved); + taosArrayDestroy(pBackend->chkpInUse); + taosThreadRwlockDestroy(&pBackend->chkpDirLock); +} + +int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** taskFullPath) { int32_t code = 0; - char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); - sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); + + char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); + sprintf(taskPath, "%s%s%s", path, TD_DIRSEP, key); if (!taosDirExist(taskPath)) { code = taosMulMkDir(taskPath); if (code != 0) { @@ -1580,36 +1712,53 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) { return code; } } - *fullPath = taskPath; + + char* dbPath = taosMemoryCalloc(1, strlen(taskPath) + 128); + sprintf(dbPath, "%s%s%s", taskPath, TD_DIRSEP, "state"); + if (!taosDirExist(dbPath)) { + code = taosMulMkDir(dbPath); + if (code != 0) { + qError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); + taosMemoryFree(taskPath); + taosMemoryFree(dbPath); + return code; + } + } + + *dbFullPath = dbPath; + *taskFullPath = taskPath; return 0; } STaskBackendWrapper* taskBackendOpen(char* path, char* key) { - char* taskPath = NULL; - char* err = NULL; - + char* taskPath = NULL; + char* err = NULL; + char* dbPath = NULL; char** cfNames = NULL; size_t nCf = 0; - if (taskBackendBuildFullPath(path, key, &taskPath) != 0) { + if (taskBackendBuildFullPath(path, key, &dbPath, &taskPath) != 0) { return NULL; } STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); - taskBackendInitOpt(pTaskBackend); - cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); + pTaskBackend->path = taskPath; + taskBackendInitChkpOpt(pTaskBackend); + taskBackendInitDBOpt(pTaskBackend); + + cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { - pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], taskPath, &err); + pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err); rocksdb_close(pTaskBackend->db); if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); taosMemoryFree(err); } - cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); + cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); ASSERT(err != NULL); - if (taskBackendOpenCfs(pTaskBackend, taskPath, cfNames, nCf) != 0) { + if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) { goto _EXIT; } @@ -1617,11 +1766,12 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) { rocksdb_list_column_families_destroy(cfNames, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); - taosMemoryFree(taskPath); + qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend); + taosMemoryFree(dbPath); pTaskBackend->idstr = taosStrdup(key); taosThreadMutexInit(&pTaskBackend->mutex, NULL); + return pTaskBackend; _EXIT: @@ -1673,7 +1823,10 @@ void taskBackendDestroy(void* pBackend) { if (wrapper->db) rocksdb_close(wrapper->db); + taskBackendDestroyChkpOpt(pBackend); + taosMemoryFree(wrapper->idstr); + taosMemoryFree(wrapper->path); taosMemoryFree(wrapper); return;