From 4c94fbb44f223eba2c5781b703fb9b4a56923475 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Oct 2023 20:26:19 +0800 Subject: [PATCH] refactor backend --- source/libs/stream/inc/streamBackendRocksdb.h | 6 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 68 +++++++++---------- source/libs/stream/src/streamMeta.c | 8 +-- source/libs/stream/src/streamSnapshot.c | 2 +- source/libs/stream/src/streamTask.c | 2 +- 6 files changed, 44 insertions(+), 44 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index cea948e2f1..0f97bda4c5 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -85,9 +85,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskBackendOpen(char* path, char* key); +STaskDbWrapper* taskDbOpen(char* path, char* key); void taskDbDestroy(void* pBackend); -int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId); +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); @@ -182,7 +182,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst); int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); -int32_t taskBackendBuildSnap(void* arg, int64_t chkpId); +int32_t taskDbBuildSnap(void* arg, int64_t chkpId); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index f200c714ab..a5e7531f7e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -48,7 +48,7 @@ typedef struct { extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; -extern int32_t taskBackendWrapperId; +extern int32_t taskDbWrapperId; const char* streamGetBlockTypeStr(int32_t type); void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d5450711a2..78f67d06d5 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -950,18 +950,18 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } -int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) { +int32_t taskDbBuildSnap(void* arg, int64_t chkpId) { SStreamMeta* pMeta = arg; void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); int32_t code = 0; while (pIter) { - STaskDbWrapper* pBackend = *(STaskDbWrapper**)pIter; - taskDbAddRef(pBackend); + STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; + taskDbAddRef(pTaskDb); - code = taskBackendDoCheckpoint((STaskDbWrapper*)pBackend, chkpId); + code = taskDbDoCheckpoint((STaskDbWrapper*)pTaskDb, chkpId); - taskDbRemoveRef(pBackend); + taskDbRemoveRef(pTaskDb); pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); } return 0; @@ -1035,48 +1035,48 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { /* 0 */ -int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) { - STaskDbWrapper* pBackend = arg; +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { + STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); int32_t code = -1; - int64_t refId = pBackend->refId; + int64_t refId = pTaskDb->refId; - if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) { + if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { return -1; } char* pChkpDir = NULL; char* pChkpIdDir = NULL; - if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { + if (chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { code = -1; goto _EXIT; } // Get all cf and acquire cfWrappter rocksdb_column_family_handle_t** ppCf = NULL; - int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf); + int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); - if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) { - if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) { - qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir); + if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) { + if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { + qError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); } else { - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir, + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, taosGetTimestampMs() - st); } } else { - qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir); + qError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir); } - code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir); + code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); _EXIT: taosMemoryFree(pChkpDir); taosMemoryFree(pChkpIdDir); - taosReleaseRef(taskBackendWrapperId, refId); + taosReleaseRef(taskDbWrapperId, refId); return code; } -int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); } +int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; @@ -1585,11 +1585,11 @@ _EXIT: } void* taskDbAddRef(void* pTaskDb) { STaskDbWrapper* pBackend = pTaskDb; - return taosAcquireRef(taskBackendWrapperId, pBackend->refId); + return taosAcquireRef(taskDbWrapperId, pBackend->refId); } void taskDbRemoveRef(void* pTaskDb) { STaskDbWrapper* pBackend = pTaskDb; - taosReleaseRef(taskBackendWrapperId, pBackend->refId); + taosReleaseRef(taskDbWrapperId, pBackend->refId); } // void taskDbDestroy(STaskDbWrapper* wrapper); @@ -1647,19 +1647,19 @@ void taskDbInitDBOpt(STaskDbWrapper* pTaskDb) { } return; } -void taskDbInitChkpOpt(STaskDbWrapper* pBackend) { - pBackend->chkpId = -1; - pBackend->chkpCap = 4; - pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t)); - pBackend->chkpInUse = taosArrayInit(4, sizeof(int64_t)); +void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { + pTaskDb->chkpId = -1; + pTaskDb->chkpCap = 4; + pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); + pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); - taosThreadRwlockInit(&pBackend->chkpDirLock, NULL); + taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); } -void taskDbDestroyChkpOpt(STaskDbWrapper* pBackend) { - taosArrayDestroy(pBackend->chkpSaved); - taosArrayDestroy(pBackend->chkpInUse); - taosThreadRwlockDestroy(&pBackend->chkpDirLock); +void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { + taosArrayDestroy(pTaskDb->chkpSaved); + taosArrayDestroy(pTaskDb->chkpInUse); + taosThreadRwlockDestroy(&pTaskDb->chkpDirLock); } int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { @@ -1692,7 +1692,7 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta *stateFullPath = statePath; return 0; } -STaskDbWrapper* taskBackendOpen(char* path, char* key) { +STaskDbWrapper* taskDbOpen(char* path, char* key) { char* statePath = NULL; char* err = NULL; char* dbPath = NULL; @@ -1866,7 +1866,7 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; - STaskDbWrapper* pTaskDb = taskBackendOpen(path, key); + STaskDbWrapper* pTaskDb = taskDbOpen(path, key); RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8d855d3513..f4c8021ac6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -30,7 +30,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t streamMetaId = 0; -int32_t taskBackendWrapperId = 0; +int32_t taskDbWrapperId = 0; static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); @@ -53,7 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); - taskBackendWrapperId = taosOpenRef(64, taskDbDestroy); + taskDbWrapperId = taosOpenRef(64, taskDbDestroy); streamMetaId = taosOpenRef(64, streamMetaCloseImpl); @@ -232,13 +232,13 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) return *ppBackend; } - void* pBackend = taskBackendOpen(pMeta->path, key); + void* pBackend = taskDbOpen(pMeta->path, key); if (pBackend == NULL) { taosThreadMutexUnlock(&pMeta->backendMutex); return NULL; } - *ref = taosAddRef(taskBackendWrapperId, pBackend); + *ref = taosAddRef(taskDbWrapperId, pBackend); taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 25c228e97d..d33f018a2c 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -137,7 +137,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskBackendBuildSnap(arg, chkpId); } +int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskDbBuildSnap(arg, chkpId); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c0f9bc65ff..5b4b0c309d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -392,7 +392,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); - // pTask->pBackend = taskBackendOpen(pMeta->path, (char*)pTask->id.idStr); + // pTask->pBackend = taskDbOpen(pMeta->path, (char*)pTask->id.idStr); return TSDB_CODE_SUCCESS; }