From f264ffdcd645dac5f3de780d8501362af0f0cd1d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Oct 2023 20:12:07 +0800 Subject: [PATCH] refactor backend --- include/libs/stream/tstream.h | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 12 +- source/libs/stream/src/streamBackendRocksdb.c | 190 +++++++++--------- source/libs/stream/src/streamMeta.c | 10 +- source/libs/stream/src/streamTask.c | 4 +- 5 files changed, 109 insertions(+), 109 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 859d7cea54..260609e697 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -425,7 +425,7 @@ typedef struct SStreamMeta { SRWLatch chkpDirLock; int32_t pauseTaskNum; - // SHashObj* pTaskBackend; + // SHashObj* pTaskDb; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9aa616e190..cea948e2f1 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -74,7 +74,7 @@ typedef struct { int32_t chkpCap; TdThreadRwlock chkpDirLock; -} STaskBackendWrapper; +} STaskDbWrapper; void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); @@ -85,12 +85,12 @@ SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); -STaskBackendWrapper* taskBackendOpen(char* path, char* key); -void taskBackendDestroy(void* pBackend); -int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId); +STaskDbWrapper* taskBackendOpen(char* path, char* key); +void taskDbDestroy(void* pBackend); +int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId); -void* taskBackendAddRef(void* pTaskBackend); -void taskBackendRemoveRef(void* pTaskBackend); +void* taskDbAddRef(void* pTaskDb); +void taskDbRemoveRef(void* pTaskDb); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 917bc0f796..d5450711a2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -716,7 +716,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { * replication is finished */ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { - STaskBackendWrapper* pBackend = arg; + STaskDbWrapper* pBackend = arg; taosThreadRwlockWrlock(&pBackend->chkpDirLock); @@ -863,7 +863,7 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* // return nCf; } -int32_t chkpGetAllDbCfHandle2(STaskBackendWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { +int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { SArray* pHandle = taosArrayInit(8, POINTER_BYTES); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (pBackend->pCf[i]) { @@ -956,12 +956,12 @@ int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) { int32_t code = 0; while (pIter) { - STaskBackendWrapper* pBackend = *(STaskBackendWrapper**)pIter; - taskBackendAddRef(pBackend); + STaskDbWrapper* pBackend = *(STaskDbWrapper**)pIter; + taskDbAddRef(pBackend); - code = taskBackendDoCheckpoint((STaskBackendWrapper*)pBackend, chkpId); + code = taskBackendDoCheckpoint((STaskDbWrapper*)pBackend, chkpId); - taskBackendRemoveRef(pBackend); + taskDbRemoveRef(pBackend); pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); } return 0; @@ -1036,10 +1036,10 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { 0 */ int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) { - STaskBackendWrapper* pBackend = arg; - int64_t st = taosGetTimestampMs(); - int32_t code = -1; - int64_t refId = pBackend->refId; + STaskDbWrapper* pBackend = arg; + int64_t st = taosGetTimestampMs(); + int32_t code = -1; + int64_t refId = pBackend->refId; if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) { return -1; @@ -1549,7 +1549,7 @@ int32_t getCfIdx(const char* cfName) { } return idx; } -int32_t taskBackendOpenCfs(STaskBackendWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { +int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { int32_t code = -1; char* err = NULL; @@ -1583,17 +1583,17 @@ _EXIT: taosMemoryFree(cfHandle); return code; } -void* taskBackendAddRef(void* pTaskBackend) { - STaskBackendWrapper* pBackend = pTaskBackend; +void* taskDbAddRef(void* pTaskDb) { + STaskDbWrapper* pBackend = pTaskDb; return taosAcquireRef(taskBackendWrapperId, pBackend->refId); } -void taskBackendRemoveRef(void* pTaskBackend) { - STaskBackendWrapper* pBackend = pTaskBackend; +void taskDbRemoveRef(void* pTaskDb) { + STaskDbWrapper* pBackend = pTaskDb; taosReleaseRef(taskBackendWrapperId, pBackend->refId); } -// void taskBackendDestroy(STaskBackendWrapper* wrapper); +// void taskDbDestroy(STaskDbWrapper* wrapper); -void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) { +void taskDbInitDBOpt(STaskDbWrapper* pTaskDb) { rocksdb_env_t* env = rocksdb_create_default_env(); rocksdb_cache_t* cache = rocksdb_cache_create_lru(256); @@ -1609,25 +1609,25 @@ void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) { rocksdb_options_set_write_buffer_size(opts, 32 << 20); rocksdb_options_set_atomic_flush(opts, 1); - pTaskBackend->dbOpt = opts; - pTaskBackend->env = env; - pTaskBackend->cache = cache; - pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create( + pTaskDb->dbOpt = opts; + pTaskDb->env = env; + pTaskDb->cache = cache; + pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create( NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); - rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory); - pTaskBackend->readOpt = rocksdb_readoptions_create(); - pTaskBackend->writeOpt = rocksdb_writeoptions_create(); + rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory); + pTaskDb->readOpt = rocksdb_readoptions_create(); + pTaskDb->writeOpt = rocksdb_writeoptions_create(); size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); - pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); - pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); - pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); - pTaskBackend->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); + pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam)); + pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); + pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*)); for (int i = 0; i < nCf; i++) { - rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskBackend->dbOpt); + rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt); rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_block_based_options_set_block_cache(tableOpt, pTaskBackend->cache); + rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache); rocksdb_block_based_options_set_partition_filters(tableOpt, 1); rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); @@ -1641,13 +1641,13 @@ void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) { rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); - pTaskBackend->pCompares[i] = compare; - pTaskBackend->pCfOpts[i] = opt; - pTaskBackend->pCfParams[i].tableOpt = tableOpt; + pTaskDb->pCompares[i] = compare; + pTaskDb->pCfOpts[i] = opt; + pTaskDb->pCfParams[i].tableOpt = tableOpt; } return; } -void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) { +void taskDbInitChkpOpt(STaskDbWrapper* pBackend) { pBackend->chkpId = -1; pBackend->chkpCap = 4; pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t)); @@ -1656,13 +1656,13 @@ void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) { taosThreadRwlockInit(&pBackend->chkpDirLock, NULL); } -void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) { +void taskDbDestroyChkpOpt(STaskDbWrapper* pBackend) { taosArrayDestroy(pBackend->chkpSaved); taosArrayDestroy(pBackend->chkpInUse); taosThreadRwlockDestroy(&pBackend->chkpDirLock); } -int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { +int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) { int32_t code = 0; char* statePath = taosMemoryCalloc(1, strlen(path) + 128); @@ -1692,41 +1692,41 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char* *stateFullPath = statePath; return 0; } -STaskBackendWrapper* taskBackendOpen(char* path, char* key) { +STaskDbWrapper* taskBackendOpen(char* path, char* key) { char* statePath = NULL; char* err = NULL; char* dbPath = NULL; char** cfNames = NULL; size_t nCf = 0; - if (taskBackendBuildFullPath(path, key, &dbPath, &statePath) != 0) { + if (taskDbBuildFullPath(path, key, &dbPath, &statePath) != 0) { return NULL; } - STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); - pTaskBackend->idstr = taosStrdup(key); - pTaskBackend->path = statePath; - taosThreadMutexInit(&pTaskBackend->mutex, NULL); - taskBackendInitChkpOpt(pTaskBackend); - taskBackendInitDBOpt(pTaskBackend); + STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper)); + pTaskDb->idstr = taosStrdup(key); + pTaskDb->path = statePath; + taosThreadMutexInit(&pTaskDb->mutex, NULL); + taskDbInitChkpOpt(pTaskDb); + taskDbInitDBOpt(pTaskDb); statePath = NULL; - cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); + cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { // pre create db - pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err); - rocksdb_close(pTaskBackend->db); + pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); + rocksdb_close(pTaskDb->db); if (cfNames != NULL) { rocksdb_list_column_families_destroy(cfNames, nCf); } taosMemoryFree(err); - cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); + cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); ASSERT(err != NULL); } - if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) { + if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) { goto _EXIT; } @@ -1734,13 +1734,13 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) { rocksdb_list_column_families_destroy(cfNames, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend); + qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); taosMemoryFree(dbPath); - return pTaskBackend; + return pTaskDb; _EXIT: - taskBackendDestroy(pTaskBackend); + taskDbDestroy(pTaskDb); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); @@ -1749,8 +1749,8 @@ _EXIT: return NULL; } -void taskBackendDestroy(void* pBackend) { - STaskBackendWrapper* wrapper = pBackend; +void taskDbDestroy(void* pBackend) { + STaskDbWrapper* wrapper = pBackend; if (wrapper == NULL) return; @@ -1790,7 +1790,7 @@ void taskBackendDestroy(void* pBackend) { if (wrapper->db) rocksdb_close(wrapper->db); - taskBackendDestroyChkpOpt(pBackend); + taskDbDestroyChkpOpt(pBackend); taosMemoryFree(wrapper->idstr); taosMemoryFree(wrapper->path); @@ -1799,7 +1799,7 @@ void taskBackendDestroy(void* pBackend) { return; } -int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) { +int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) { int32_t code = 0; char* err = NULL; int8_t idx = getCfIdx(key); @@ -1817,7 +1817,7 @@ int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) { pBackend->pCf[idx] = cf; return code; } -int32_t copyDataAt(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) { +int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) { int32_t WRITE_BATCH = 1024; char* err = NULL; int code = 0; @@ -1866,22 +1866,22 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; - STaskBackendWrapper* pTaskBackend = taskBackendOpen(path, key); - RocksdbCfInst* pSrcBackend = pCfInst; + STaskDbWrapper* pTaskDb = taskBackendOpen(path, key); + RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; if (pSrcCf == NULL) continue; - code = taskBackendOpenCfByKey(pTaskBackend, ginitDict[i].key); + code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key); if (code != 0) goto _EXIT; - code = copyDataAt(pSrcBackend, pTaskBackend, i); + code = copyDataAt(pSrcBackend, pTaskDb, i); if (code != 0) goto _EXIT; } _EXIT: - taskBackendDestroy(pTaskBackend); + taskDbDestroy(pTaskDb); return code; } @@ -2140,7 +2140,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { } } if (pState != NULL && idx != -1) { - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_column_family_handle_t* cf = NULL; taosThreadMutexLock(&wrapper->mutex); @@ -2179,7 +2179,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe *readOpt = rocksdb_readoptions_create(); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db); rocksdb_readoptions_set_snapshot(*readOpt, *snapshot); @@ -2200,8 +2200,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe code = -1; \ break; \ } \ - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ @@ -2231,8 +2231,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe code = -1; \ break; \ } \ - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ @@ -2274,8 +2274,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe code = -1; \ break; \ } \ - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ @@ -2314,7 +2314,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; @@ -2437,7 +2437,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin if (pCur == NULL) { return NULL; } - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; pCur->number = pState->number; pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, @@ -2493,7 +2493,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; pCur->number = pState->number; - pCur->db = ((STaskBackendWrapper*)pState->pTdbState->pOwner->pBackend)->db; + pCur->db = ((STaskDbWrapper*)pState->pTdbState->pOwner->pBackend)->db; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -2514,7 +2514,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; @@ -2604,8 +2604,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -2645,8 +2645,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -2683,8 +2683,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -2785,8 +2785,8 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; if (pCur == NULL) return NULL; @@ -2845,8 +2845,8 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; } @@ -2883,8 +2883,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -2921,8 +2921,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; } @@ -3153,7 +3153,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co int code = 0; char* err = NULL; - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_snapshot_t* snapshot = NULL; rocksdb_readoptions_t* readopts = NULL; rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); @@ -3192,8 +3192,8 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, @@ -3244,7 +3244,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { @@ -3277,7 +3277,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -3293,8 +3293,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb return 0; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { - char* err = NULL; - STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; + char* err = NULL; + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { qError("streamState failed to write batch, err:%s", err); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c398254548..8d855d3513 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -53,7 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); - taskBackendWrapperId = taosOpenRef(64, taskBackendDestroy); + taskBackendWrapperId = taosOpenRef(64, taskDbDestroy); streamMetaId = taosOpenRef(64, streamMetaCloseImpl); @@ -226,8 +226,8 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { - taskBackendAddRef(*ppBackend); - *ref = ((STaskBackendWrapper*)*ppBackend)->refId; + taskDbAddRef(*ppBackend); + *ref = ((STaskDbWrapper*)*ppBackend)->refId; taosThreadMutexUnlock(&pMeta->backendMutex); return *ppBackend; } @@ -761,9 +761,9 @@ int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { // void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); // while (pIter) { - // STaskBackendWrapper* taskBackend = *(STaskBackendWrapper**)pIter; + // STaskDbWrapper* taskBackend = *(STaskDbWrapper**)pIter; // if (taskBackend != NULL) { - // taskBackendRemoveRef(taskBackend); + // taskDbRemoveRef(taskBackend); // } // pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); // } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 152b426628..c0f9bc65ff 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -355,8 +355,8 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pUpstreamInfoList = NULL; } if (pTask->pBackend) { - taskBackendRemoveRef(pTask->pBackend); - + taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; } taosThreadMutexDestroy(&pTask->lock);