refactor backend

This commit is contained in:
yihaoDeng 2023-10-11 20:12:07 +08:00
parent 83b52a4c1d
commit f264ffdcd6
5 changed files with 109 additions and 109 deletions

View File

@ -425,7 +425,7 @@ typedef struct SStreamMeta {
SRWLatch chkpDirLock;
int32_t pauseTaskNum;
// SHashObj* pTaskBackend;
// SHashObj* pTaskDb;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);

View File

@ -74,7 +74,7 @@ typedef struct {
int32_t chkpCap;
TdThreadRwlock chkpDirLock;
} STaskBackendWrapper;
} STaskDbWrapper;
void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg);
@ -85,12 +85,12 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskBackendWrapper* taskBackendOpen(char* path, char* key);
void taskBackendDestroy(void* pBackend);
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId);
STaskDbWrapper* taskBackendOpen(char* path, char* key);
void taskDbDestroy(void* pBackend);
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId);
void* taskBackendAddRef(void* pTaskBackend);
void taskBackendRemoveRef(void* pTaskBackend);
void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);

View File

@ -716,7 +716,7 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
* replication is finished
*/
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
STaskBackendWrapper* pBackend = arg;
STaskDbWrapper* pBackend = arg;
taosThreadRwlockWrlock(&pBackend->chkpDirLock);
@ -863,7 +863,7 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
// return nCf;
}
int32_t chkpGetAllDbCfHandle2(STaskBackendWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (pBackend->pCf[i]) {
@ -956,12 +956,12 @@ int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) {
int32_t code = 0;
while (pIter) {
STaskBackendWrapper* pBackend = *(STaskBackendWrapper**)pIter;
taskBackendAddRef(pBackend);
STaskDbWrapper* pBackend = *(STaskDbWrapper**)pIter;
taskDbAddRef(pBackend);
code = taskBackendDoCheckpoint((STaskBackendWrapper*)pBackend, chkpId);
code = taskBackendDoCheckpoint((STaskDbWrapper*)pBackend, chkpId);
taskBackendRemoveRef(pBackend);
taskDbRemoveRef(pBackend);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
return 0;
@ -1036,10 +1036,10 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
0
*/
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) {
STaskBackendWrapper* pBackend = arg;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pBackend->refId;
STaskDbWrapper* pBackend = arg;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pBackend->refId;
if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) {
return -1;
@ -1549,7 +1549,7 @@ int32_t getCfIdx(const char* cfName) {
}
return idx;
}
int32_t taskBackendOpenCfs(STaskBackendWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
int32_t taskDbOpenCfs(STaskDbWrapper* pTask, char* path, char** pCfNames, int32_t nCf) {
int32_t code = -1;
char* err = NULL;
@ -1583,17 +1583,17 @@ _EXIT:
taosMemoryFree(cfHandle);
return code;
}
void* taskBackendAddRef(void* pTaskBackend) {
STaskBackendWrapper* pBackend = pTaskBackend;
void* taskDbAddRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
return taosAcquireRef(taskBackendWrapperId, pBackend->refId);
}
void taskBackendRemoveRef(void* pTaskBackend) {
STaskBackendWrapper* pBackend = pTaskBackend;
void taskDbRemoveRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskBackendWrapperId, pBackend->refId);
}
// void taskBackendDestroy(STaskBackendWrapper* wrapper);
// void taskDbDestroy(STaskDbWrapper* wrapper);
void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) {
void taskDbInitDBOpt(STaskDbWrapper* pTaskDb) {
rocksdb_env_t* env = rocksdb_create_default_env();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(256);
@ -1609,25 +1609,25 @@ void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) {
rocksdb_options_set_write_buffer_size(opts, 32 << 20);
rocksdb_options_set_atomic_flush(opts, 1);
pTaskBackend->dbOpt = opts;
pTaskBackend->env = env;
pTaskBackend->cache = cache;
pTaskBackend->filterFactory = rocksdb_compactionfilterfactory_create(
pTaskDb->dbOpt = opts;
pTaskDb->env = env;
pTaskDb->cache = cache;
pTaskDb->filterFactory = rocksdb_compactionfilterfactory_create(
NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName);
rocksdb_options_set_compaction_filter_factory(pTaskBackend->dbOpt, pTaskBackend->filterFactory);
pTaskBackend->readOpt = rocksdb_readoptions_create();
pTaskBackend->writeOpt = rocksdb_writeoptions_create();
rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
pTaskDb->readOpt = rocksdb_readoptions_create();
pTaskDb->writeOpt = rocksdb_writeoptions_create();
size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
pTaskBackend->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
pTaskBackend->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
pTaskBackend->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
pTaskBackend->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
pTaskDb->pCfParams = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
pTaskDb->pCfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
pTaskDb->pCompares = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t*));
for (int i = 0; i < nCf; i++) {
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskBackend->dbOpt);
rocksdb_options_t* opt = rocksdb_options_create_copy(pTaskDb->dbOpt);
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(tableOpt, pTaskBackend->cache);
rocksdb_block_based_options_set_block_cache(tableOpt, pTaskDb->cache);
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
@ -1641,13 +1641,13 @@ void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) {
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
pTaskBackend->pCompares[i] = compare;
pTaskBackend->pCfOpts[i] = opt;
pTaskBackend->pCfParams[i].tableOpt = tableOpt;
pTaskDb->pCompares[i] = compare;
pTaskDb->pCfOpts[i] = opt;
pTaskDb->pCfParams[i].tableOpt = tableOpt;
}
return;
}
void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) {
void taskDbInitChkpOpt(STaskDbWrapper* pBackend) {
pBackend->chkpId = -1;
pBackend->chkpCap = 4;
pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t));
@ -1656,13 +1656,13 @@ void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) {
taosThreadRwlockInit(&pBackend->chkpDirLock, NULL);
}
void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) {
void taskDbDestroyChkpOpt(STaskDbWrapper* pBackend) {
taosArrayDestroy(pBackend->chkpSaved);
taosArrayDestroy(pBackend->chkpInUse);
taosThreadRwlockDestroy(&pBackend->chkpDirLock);
}
int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
int32_t code = 0;
char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
@ -1692,41 +1692,41 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char*
*stateFullPath = statePath;
return 0;
}
STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
STaskDbWrapper* taskBackendOpen(char* path, char* key) {
char* statePath = NULL;
char* err = NULL;
char* dbPath = NULL;
char** cfNames = NULL;
size_t nCf = 0;
if (taskBackendBuildFullPath(path, key, &dbPath, &statePath) != 0) {
if (taskDbBuildFullPath(path, key, &dbPath, &statePath) != 0) {
return NULL;
}
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
pTaskBackend->idstr = taosStrdup(key);
pTaskBackend->path = statePath;
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
taskBackendInitChkpOpt(pTaskBackend);
taskBackendInitDBOpt(pTaskBackend);
STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
pTaskDb->idstr = taosStrdup(key);
pTaskDb->path = statePath;
taosThreadMutexInit(&pTaskDb->mutex, NULL);
taskDbInitChkpOpt(pTaskDb);
taskDbInitDBOpt(pTaskDb);
statePath = NULL;
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) {
// pre create db
pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err);
rocksdb_close(pTaskBackend->db);
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
rocksdb_close(pTaskDb->db);
if (cfNames != NULL) {
rocksdb_list_column_families_destroy(cfNames, nCf);
}
taosMemoryFree(err);
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
ASSERT(err != NULL);
}
if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) {
if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) {
goto _EXIT;
}
@ -1734,13 +1734,13 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
rocksdb_list_column_families_destroy(cfNames, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend);
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
taosMemoryFree(dbPath);
return pTaskBackend;
return pTaskDb;
_EXIT:
taskBackendDestroy(pTaskBackend);
taskDbDestroy(pTaskDb);
if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
@ -1749,8 +1749,8 @@ _EXIT:
return NULL;
}
void taskBackendDestroy(void* pBackend) {
STaskBackendWrapper* wrapper = pBackend;
void taskDbDestroy(void* pBackend) {
STaskDbWrapper* wrapper = pBackend;
if (wrapper == NULL) return;
@ -1790,7 +1790,7 @@ void taskBackendDestroy(void* pBackend) {
if (wrapper->db) rocksdb_close(wrapper->db);
taskBackendDestroyChkpOpt(pBackend);
taskDbDestroyChkpOpt(pBackend);
taosMemoryFree(wrapper->idstr);
taosMemoryFree(wrapper->path);
@ -1799,7 +1799,7 @@ void taskBackendDestroy(void* pBackend) {
return;
}
int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) {
int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) {
int32_t code = 0;
char* err = NULL;
int8_t idx = getCfIdx(key);
@ -1817,7 +1817,7 @@ int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) {
pBackend->pCf[idx] = cf;
return code;
}
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskBackendWrapper* pDst, int8_t i) {
int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) {
int32_t WRITE_BATCH = 1024;
char* err = NULL;
int code = 0;
@ -1866,22 +1866,22 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0;
STaskBackendWrapper* pTaskBackend = taskBackendOpen(path, key);
RocksdbCfInst* pSrcBackend = pCfInst;
STaskDbWrapper* pTaskDb = taskBackendOpen(path, key);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) {
rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i];
if (pSrcCf == NULL) continue;
code = taskBackendOpenCfByKey(pTaskBackend, ginitDict[i].key);
code = taskDbOpenCfByKey(pTaskDb, ginitDict[i].key);
if (code != 0) goto _EXIT;
code = copyDataAt(pSrcBackend, pTaskBackend, i);
code = copyDataAt(pSrcBackend, pTaskDb, i);
if (code != 0) goto _EXIT;
}
_EXIT:
taskBackendDestroy(pTaskBackend);
taskDbDestroy(pTaskDb);
return code;
}
@ -2140,7 +2140,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
}
}
if (pState != NULL && idx != -1) {
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_column_family_handle_t* cf = NULL;
taosThreadMutexLock(&wrapper->mutex);
@ -2179,7 +2179,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
*readOpt = rocksdb_readoptions_create();
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->db);
rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
@ -2200,8 +2200,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
code = -1; \
break; \
} \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
@ -2231,8 +2231,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
code = -1; \
break; \
} \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
@ -2274,8 +2274,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
code = -1; \
break; \
} \
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
@ -2314,7 +2314,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
@ -2437,7 +2437,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
if (pCur == NULL) {
return NULL;
}
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->number = pState->number;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
@ -2493,7 +2493,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
pCur->number = pState->number;
pCur->db = ((STaskBackendWrapper*)pState->pTdbState->pOwner->pBackend)->db;
pCur->db = ((STaskDbWrapper*)pState->pTdbState->pOwner->pBackend)->db;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
@ -2514,7 +2514,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
@ -2604,8 +2604,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
@ -2645,8 +2645,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
@ -2683,8 +2683,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
@ -2785,8 +2785,8 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
if (pCur == NULL) return NULL;
@ -2845,8 +2845,8 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) {
return NULL;
}
@ -2883,8 +2883,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
}
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
@ -2921,8 +2921,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb");
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return -1;
}
@ -3153,7 +3153,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
int code = 0;
char* err = NULL;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
@ -3192,8 +3192,8 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
return code;
}
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
@ -3244,7 +3244,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) {
@ -3277,7 +3277,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
@ -3293,8 +3293,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
return 0;
}
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL;
STaskBackendWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
char* err = NULL;
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
rocksdb_write(wrapper->db, wrapper->writeOpt, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) {
qError("streamState failed to write batch, err:%s", err);

View File

@ -53,7 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskBackendWrapperId = taosOpenRef(64, taskBackendDestroy);
taskBackendWrapperId = taosOpenRef(64, taskDbDestroy);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
@ -226,8 +226,8 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref)
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
taskBackendAddRef(*ppBackend);
*ref = ((STaskBackendWrapper*)*ppBackend)->refId;
taskDbAddRef(*ppBackend);
*ref = ((STaskDbWrapper*)*ppBackend)->refId;
taosThreadMutexUnlock(&pMeta->backendMutex);
return *ppBackend;
}
@ -761,9 +761,9 @@ int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
// void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
// while (pIter) {
// STaskBackendWrapper* taskBackend = *(STaskBackendWrapper**)pIter;
// STaskDbWrapper* taskBackend = *(STaskDbWrapper**)pIter;
// if (taskBackend != NULL) {
// taskBackendRemoveRef(taskBackend);
// taskDbRemoveRef(taskBackend);
// }
// pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
// }

View File

@ -355,8 +355,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pUpstreamInfoList = NULL;
}
if (pTask->pBackend) {
taskBackendRemoveRef(pTask->pBackend);
taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
}
taosThreadMutexDestroy(&pTask->lock);