remove unused code
This commit is contained in:
parent
bc546e27bc
commit
ffed7a3c67
|
@ -166,10 +166,12 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
|
|||
int32_t streamStateClear_rocksdb(SStreamState* pState);
|
||||
void streamStateCurNext_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey,
|
||||
const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
|
||||
int32_t* pVLen);
|
||||
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
|
@ -218,14 +220,13 @@ int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* p
|
|||
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
|
||||
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
|
||||
void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur);
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal,
|
||||
int32_t* pVLen);
|
||||
|
||||
// parname cf
|
||||
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
|
||||
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
|
||||
|
||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
|
||||
|
||||
// default cf
|
||||
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
|
||||
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
|
||||
|
|
|
@ -1699,9 +1699,6 @@ void streamBackendDelCompare(void* backend, void* arg) {
|
|||
taosMemoryFree(node);
|
||||
}
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
||||
#endif
|
||||
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||
if (inst->pHandle) {
|
||||
|
@ -3025,117 +3022,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
taosMemoryFree(cfOpts);
|
||||
return 0;
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
SBackendWrapper* handle = backend;
|
||||
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
|
||||
|
||||
streamMutexLock(&handle->cfMutex);
|
||||
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
if (ppInst != NULL && *ppInst != NULL) {
|
||||
RocksdbCfInst* inst = *ppInst;
|
||||
pBackendCfWrapper->rocksdb = inst->db;
|
||||
pBackendCfWrapper->pHandle = (void**)inst->pHandle;
|
||||
pBackendCfWrapper->writeOpts = inst->wOpt;
|
||||
pBackendCfWrapper->readOpts = inst->rOpt;
|
||||
pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt);
|
||||
pBackendCfWrapper->dbOpt = handle->dbOpt;
|
||||
pBackendCfWrapper->param = inst->param;
|
||||
pBackendCfWrapper->pBackend = handle;
|
||||
pBackendCfWrapper->pComparNode = inst->pCompareNode;
|
||||
streamMutexUnlock(&handle->cfMutex);
|
||||
pBackendCfWrapper->backendId = pState->streamBackendRid;
|
||||
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
|
||||
|
||||
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
|
||||
pState->pTdbState->backendCfWrapperId = id;
|
||||
pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
|
||||
stInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
|
||||
|
||||
inst->pHandle = NULL;
|
||||
inst->cfOpt = NULL;
|
||||
inst->param = NULL;
|
||||
|
||||
inst->wOpt = NULL;
|
||||
inst->rOpt = NULL;
|
||||
return 0;
|
||||
}
|
||||
streamMutexUnlock(&handle->cfMutex);
|
||||
|
||||
char* err = NULL;
|
||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||
|
||||
RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
|
||||
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
|
||||
for (int i = 0; i < cfLen; i++) {
|
||||
cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt);
|
||||
// refactor later
|
||||
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
||||
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
||||
rocksdb_block_based_options_set_partition_filters(tableOpt, 1);
|
||||
|
||||
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
||||
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
||||
|
||||
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
|
||||
|
||||
param[i].tableOpt = tableOpt;
|
||||
};
|
||||
|
||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
|
||||
for (int i = 0; i < cfLen; i++) {
|
||||
SCfInit* cf = &ginitDict[i];
|
||||
|
||||
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName);
|
||||
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
||||
pCompare[i] = compare;
|
||||
}
|
||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||
pBackendCfWrapper->rocksdb = handle->db;
|
||||
pBackendCfWrapper->pHandle = (void**)cfHandle;
|
||||
pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create();
|
||||
pBackendCfWrapper->readOpts = rocksdb_readoptions_create();
|
||||
pBackendCfWrapper->cfOpts = (void**)cfOpt;
|
||||
pBackendCfWrapper->dbOpt = handle->dbOpt;
|
||||
pBackendCfWrapper->param = param;
|
||||
pBackendCfWrapper->pBackend = handle;
|
||||
pBackendCfWrapper->backendId = pState->streamBackendRid;
|
||||
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
|
||||
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
||||
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
|
||||
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
|
||||
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
|
||||
|
||||
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
|
||||
pState->pTdbState->backendCfWrapperId = id;
|
||||
pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper;
|
||||
stInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||
SBackendWrapper* pHandle = wrapper->pBackend;
|
||||
|
||||
stInfo("start to close state on backend: %p", pHandle);
|
||||
|
||||
streamMutexLock(&pHandle->cfMutex);
|
||||
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
if (ppInst != NULL && *ppInst != NULL) {
|
||||
RocksdbCfInst* inst = *ppInst;
|
||||
taosMemoryFree(inst);
|
||||
taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
}
|
||||
streamMutexUnlock(&pHandle->cfMutex);
|
||||
|
||||
char* status[] = {"close", "drop"};
|
||||
stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
|
||||
wrapper->idstr);
|
||||
wrapper->remove |= remove; // update by other pState
|
||||
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
|
||||
}
|
||||
#endif
|
||||
void streamStateDestroyCompar(void* arg) {
|
||||
SCfComparator* comp = (SCfComparator*)arg;
|
||||
for (int i = 0; i < comp->numOfComp; i++) {
|
||||
|
|
|
@ -120,7 +120,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
|
|||
SStreamTask* pStreamTask = pTask;
|
||||
pState->streamId = streamId;
|
||||
pState->taskId = taskId;
|
||||
TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId));
|
||||
TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x",
|
||||
pState->streamId, pState->taskId));
|
||||
|
||||
code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -527,7 +528,6 @@ _end:
|
|||
|
||||
void streamStateDestroy(SStreamState* pState, bool remove) {
|
||||
streamFileStateDestroy(pState->pFileState);
|
||||
// streamStateDestroy_rocksdb(pState, remove);
|
||||
tSimpleHashCleanup(pState->parNameMap);
|
||||
// do nothong
|
||||
taosMemoryFreeClear(pState->pTdbState);
|
||||
|
@ -572,7 +572,8 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey
|
|||
return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode);
|
||||
}
|
||||
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal,
|
||||
int32_t* pVLen) {
|
||||
return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen);
|
||||
}
|
||||
|
||||
|
@ -593,9 +594,7 @@ SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) {
|
|||
return pCur;
|
||||
}
|
||||
|
||||
void streamStateGroupCurNext(SStreamStateCur* pCur) {
|
||||
streamFileStateGroupCurNext(pCur);
|
||||
}
|
||||
void streamStateGroupCurNext(SStreamStateCur* pCur) { streamFileStateGroupCurNext(pCur); }
|
||||
|
||||
int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
|
||||
if (pVal != NULL) {
|
||||
|
@ -604,13 +603,9 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
|
|||
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
|
||||
}
|
||||
|
||||
void streamStateClearExpiredState(SStreamState* pState) {
|
||||
clearExpiredState(pState->pFileState);
|
||||
}
|
||||
void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); }
|
||||
|
||||
void streamStateSetFillInfo(SStreamState* pState) {
|
||||
setFillInfo(pState->pFileState);
|
||||
}
|
||||
void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); }
|
||||
|
||||
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
|
||||
int32_t* pWinCode) {
|
||||
|
|
Loading…
Reference in New Issue