diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index d313acc61d..d053a52832 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -166,10 +166,12 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateClear_rocksdb(SStreamState* pState); void streamStateCurNext_rocksdb(SStreamStateCur* pCur); int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, + const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); void streamStateCurPrev_rocksdb(SStreamStateCur* pCur); -int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, + int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); @@ -217,15 +219,14 @@ int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* p // partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); -void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur); -int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen); +void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t groupId, SStreamStateCur* pCur); +int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, + int32_t* pVLen); // parname cf int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); -void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); - // default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c14de8766b..11d2385d1c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1699,9 +1699,6 @@ void streamBackendDelCompare(void* backend, void* arg) { taosMemoryFree(node); } } -#ifdef BUILD_NO_CALL -void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } -#endif void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); if (inst->pHandle) { @@ -3025,117 +3022,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t taosMemoryFree(cfOpts); return 0; } -#ifdef BUILD_NO_CALL -int streamStateOpenBackend(void* backend, SStreamState* pState) { - taosAcquireRef(streamBackendId, pState->streamBackendRid); - SBackendWrapper* handle = backend; - SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); - streamMutexLock(&handle->cfMutex); - RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); - if (ppInst != NULL && *ppInst != NULL) { - RocksdbCfInst* inst = *ppInst; - pBackendCfWrapper->rocksdb = inst->db; - pBackendCfWrapper->pHandle = (void**)inst->pHandle; - pBackendCfWrapper->writeOpts = inst->wOpt; - pBackendCfWrapper->readOpts = inst->rOpt; - pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt); - pBackendCfWrapper->dbOpt = handle->dbOpt; - pBackendCfWrapper->param = inst->param; - pBackendCfWrapper->pBackend = handle; - pBackendCfWrapper->pComparNode = inst->pCompareNode; - streamMutexUnlock(&handle->cfMutex); - pBackendCfWrapper->backendId = pState->streamBackendRid; - memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); - - int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); - pState->pTdbState->backendCfWrapperId = id; - pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper; - stInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr); - - inst->pHandle = NULL; - inst->cfOpt = NULL; - inst->param = NULL; - - inst->wOpt = NULL; - inst->rOpt = NULL; - return 0; - } - streamMutexUnlock(&handle->cfMutex); - - char* err = NULL; - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - - RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); - const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); - for (int i = 0; i < cfLen; i++) { - cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt); - // refactor later - rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); - rocksdb_block_based_options_set_partition_filters(tableOpt, 1); - - rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); - rocksdb_block_based_options_set_filter_policy(tableOpt, filter); - - rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); - - param[i].tableOpt = tableOpt; - }; - - rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); - for (int i = 0; i < cfLen; i++) { - SCfInit* cf = &ginitDict[i]; - - rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); - pCompare[i] = compare; - } - rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - pBackendCfWrapper->rocksdb = handle->db; - pBackendCfWrapper->pHandle = (void**)cfHandle; - pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create(); - pBackendCfWrapper->readOpts = rocksdb_readoptions_create(); - pBackendCfWrapper->cfOpts = (void**)cfOpt; - pBackendCfWrapper->dbOpt = handle->dbOpt; - pBackendCfWrapper->param = param; - pBackendCfWrapper->pBackend = handle; - pBackendCfWrapper->backendId = pState->streamBackendRid; - taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); - SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; - pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); - memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); - - int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); - pState->pTdbState->backendCfWrapperId = id; - pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper; - stInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr); - return 0; -} - -void streamStateCloseBackend(SStreamState* pState, bool remove) { - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SBackendWrapper* pHandle = wrapper->pBackend; - - stInfo("start to close state on backend: %p", pHandle); - - streamMutexLock(&pHandle->cfMutex); - RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); - if (ppInst != NULL && *ppInst != NULL) { - RocksdbCfInst* inst = *ppInst; - taosMemoryFree(inst); - taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); - } - streamMutexUnlock(&pHandle->cfMutex); - - char* status[] = {"close", "drop"}; - stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, - wrapper->idstr); - wrapper->remove |= remove; // update by other pState - taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); -} -#endif void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; for (int i = 0; i < comp->numOfComp; i++) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 794fc346bf..a69ee5448d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -120,7 +120,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i SStreamTask* pStreamTask = pTask; pState->streamId = streamId; pState->taskId = taskId; - TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId)); + TAOS_UNUSED(tsnprintf(pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr), "0x%" PRIx64 "-0x%x", + pState->streamId, pState->taskId)); code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr); QUERY_CHECK_CODE(code, lino, _end); @@ -527,7 +528,6 @@ _end: void streamStateDestroy(SStreamState* pState, bool remove) { streamFileStateDestroy(pState->pFileState); - // streamStateDestroy_rocksdb(pState, remove); tSimpleHashCleanup(pState->parNameMap); // do nothong taosMemoryFreeClear(pState->pTdbState); @@ -572,7 +572,8 @@ int32_t streamStateCountWinAddIfNotExist(SStreamState* pState, SSessionKey* pKey return getCountWinResultBuff(pState->pFileState, pKey, winCount, ppVal, pVLen, pWinCode); } -int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { +int32_t streamStateCountWinAdd(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, + int32_t* pVLen) { return createCountWinResultBuff(pState->pFileState, pKey, winCount, pVal, pVLen); } @@ -593,9 +594,7 @@ SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) { return pCur; } -void streamStateGroupCurNext(SStreamStateCur* pCur) { - streamFileStateGroupCurNext(pCur); -} +void streamStateGroupCurNext(SStreamStateCur* pCur) { streamFileStateGroupCurNext(pCur); } int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) { if (pVal != NULL) { @@ -604,13 +603,9 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen); } -void streamStateClearExpiredState(SStreamState* pState) { - clearExpiredState(pState->pFileState); -} +void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); } -void streamStateSetFillInfo(SStreamState* pState) { - setFillInfo(pState->pFileState); -} +void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); } int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) {