diff --git a/cmake/cmake.define b/cmake/cmake.define index 0d5c21604a..f3caf49da3 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -123,8 +123,8 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert diff --git a/cmake/cmake.options b/cmake/cmake.options index 3bc5e202a5..b00ae14715 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -77,6 +77,12 @@ ELSEIF (TD_DARWIN_64) ENDIF () ENDIF () +option( + BUILD_GEOS + "If build geos on Windows" + OFF + ) + option( BUILD_SHARED_LIBS "" diff --git a/cmake/cmake.platform b/cmake/cmake.platform index cb09bf2085..f9faf7316c 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -56,9 +56,17 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin SET(TD_DARWIN TRUE) SET(OSTYPE "macOS") + execute_process(COMMAND geos-config --cflags OUTPUT_VARIABLE GEOS_CFLAGS) + execute_process(COMMAND geos-config --ldflags OUTPUT_VARIABLE GEOS_LDFLAGS) + string(SUBSTRING ${GEOS_CFLAGS} 2 -1 GEOS_CFLAGS) + string(REGEX REPLACE "\n" "" GEOS_CFLAGS ${GEOS_CFLAGS}) + string(SUBSTRING ${GEOS_LDFLAGS} 2 -1 GEOS_LDFLAGS) + string(REGEX REPLACE "\n" "" GEOS_LDFLAGS ${GEOS_LDFLAGS}) + MESSAGE("GEOS_CFLAGS "${GEOS_CFLAGS}) + MESSAGE("GEOS_LDFLAGS "${GEOS_LDFLAGS}) ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare") - INCLUDE_DIRECTORIES(/usr/local/include) - LINK_DIRECTORIES(/usr/local/lib) + INCLUDE_DIRECTORIES(${GEOS_CFLAGS}) + LINK_DIRECTORIES(${GEOS_LDFLAGS}) IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64") MESSAGE("Current system arch is arm64") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index b3333dee99..59986a3b3c 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -231,11 +231,16 @@ if(${BUILD_WITH_ROCKSDB}) if(${TD_LINUX}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result") endif(${TD_LINUX}) + MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS}) if(${TD_DARWIN}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") endif(${TD_DARWIN}) + if (${TD_DARWIN_ARM64}) + set(HAS_ARMV8_CRC true) + endif(${TD_DARWIN_ARM64}) + if (${TD_WINDOWS}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819") endif(${TD_WINDOWS}) @@ -248,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB}) endif(${TD_DARWIN}) if(${TD_WINDOWS}) - option(WITH_JNI "" ON) + option(WITH_JNI "" OFF) endif(${TD_WINDOWS}) if(${TD_WINDOWS}) @@ -260,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_FALLOCATE "" OFF) option(WITH_JEMALLOC "" OFF) option(WITH_GFLAGS "" OFF) - option(PORTABLE "" ON) + option(PORTABLE "" OFF) option(WITH_LIBURING "" OFF) option(FAIL_ON_WARNINGS OFF) @@ -268,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) - + IF (TD_LINUX) + option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) + ELSE() option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) + ENDIF() add_subdirectory(rocksdb EXCLUDE_FROM_ALL) target_include_directories( rocksdb diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md index a2a0914120..b329413aa8 100644 --- a/docs/zh/12-taos-sql/02-database.md +++ b/docs/zh/12-taos-sql/02-database.md @@ -121,6 +121,8 @@ alter_database_option: { | WAL_LEVEL value | WAL_FSYNC_PERIOD value | KEEP value + | WAL_RETENTION_PERIOD value + | WAL_RETENTION_SIZE value } ``` diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 9ed47c94f8..4cfbf2c2fa 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -103,63 +103,6 @@ typedef struct SRowBuffPos { bool beUsed; } SRowBuffPos; -// int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); -// int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, -// SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); -// int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, -// SArray *pTableUids); -// void *tsdbCacherowsReaderClose(void *pReader); -// int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); - -// int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, -// SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables); -// int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); -// void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); -// void tsdbReaderClose(STsdbReader *pReader); -// int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); -// int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); -// void tsdbReleaseDataBlock(STsdbReader *pReader); -// SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -// int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); -// int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -// int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -// void *tsdbGetIdx(void *pMeta); -// void *tsdbGetIvtIdx(void *pMeta); -// uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); -// void tsdbReaderSetCloseFlag(STsdbReader *pReader); -// int64_t tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr); - -// int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); -// int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); -// int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); -// void *vnodeGetIdx(void *pVnode); -// void *vnodeGetIvtIdx(void *pVnode); - -// void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); -// void metaReaderReleaseLock(SMetaReader *pReader); -// void metaReaderClear(SMetaReader *pReader); -// int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -// int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); -// int metaGetTableEntryByName(SMetaReader *pReader, const char *name); -// int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); -// int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList); -// const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); -// int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); -// -// int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); -// int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); -// int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); -// bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); -// int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, -// bool *acquired); -// int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, -// int32_t payloadLen, double selectivityRatio); -// int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid); -// tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); -// int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); -// int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, -// int32_t payloadLen); - // tq typedef struct SMetaTableInfo { int64_t suid; @@ -201,64 +144,39 @@ typedef struct { // int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); // int32_t destroySnapContext(SSnapContext *ctx); +// clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ - -// typedef int32_t (*__store_reader_(STsdbReader *pReader, const void *pTableList, int32_t num); -// typedef void (*tsdbReaderSetId(STsdbReader *pReader, const char *idstr); -// typedef void (*tsdbReaderClose(STsdbReader *pReader); -// typedef int32_t (*tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); -// typedef int32_t (*tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); -// typedef void (*tsdbReleaseDataBlock(STsdbReader *pReader); -// typedef SSDataBlock * (*tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -// typedef int32_t (*tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); -// typedef int32_t (*tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -// typedef int64_t (*tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -// typedef void * (*tsdbGetIdx(void *pMeta); -// typedef void * (*tsdbGetIvtIdx(void *pMeta); -// typedef uint64_t (*tsdbGetReaderMaxVersion(STsdbReader *pReader); -// typedef void (*tsdbReaderSetCloseFlag(STsdbReader *pReader); -// typedef int64_t (*tsdbGetLastTimestamp(void* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr); - -typedef int32_t (*__store_reader_open_fn_t)(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, - int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader, - const char *idstr, bool countOnly, SHashObj **pIgnoreTables); - typedef struct TsdReader { - __store_reader_open_fn_t tsdReaderOpen; - void (*tsdReaderClose)(); - void (*tsdSetReaderTaskId)(void *pReader, const char *pId); - int32_t (*tsdSetQueryTableList)(); - int32_t (*tsdNextDataBlock)(); + int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, + SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, + SHashObj** pIgnoreTables); + void (*tsdReaderClose)(); + void (*tsdSetReaderTaskId)(void *pReader, const char *pId); + int32_t (*tsdSetQueryTableList)(); + int32_t (*tsdNextDataBlock)(); - int32_t (*tsdReaderRetrieveBlockSMAInfo)(); + int32_t (*tsdReaderRetrieveBlockSMAInfo)(); SSDataBlock *(*tsdReaderRetrieveDataBlock)(); - void (*tsdReaderReleaseDataBlock)(); + void (*tsdReaderReleaseDataBlock)(); - int32_t (*tsdReaderResetStatus)(); - int32_t (*tsdReaderGetDataBlockDistInfo)(); - int64_t (*tsdReaderGetNumOfInMemRows)(); - void (*tsdReaderNotifyClosing)(); + int32_t (*tsdReaderResetStatus)(); + int32_t (*tsdReaderGetDataBlockDistInfo)(); + int64_t (*tsdReaderGetNumOfInMemRows)(); + void (*tsdReaderNotifyClosing)(); } TsdReader; - -/** - * int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); -int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); -int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, - SArray *pTableUids); -void *tsdbCacherowsReaderClose(void *pReader); - */ typedef struct SStoreCacheReader { - int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); - void *(*closeReader)(void *pReader); - int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, - SArray *pTableUidList); - int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); + int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, + SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); + void *(*closeReader)(void *pReader); + int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, + SArray *pTableUidList); + int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); } SStoreCacheReader; +// clang-format on + /*------------------------------------------------------------------------------------------------------------------*/ /* void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); @@ -309,16 +227,10 @@ typedef struct SStoreTqReader { } SStoreTqReader; typedef struct SStoreSnapshotFn { - /* - int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); - SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); - int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); - int32_t destroySnapContext(SSnapContext *ctx); - */ - int32_t (*createSnapshot)(); - int32_t (*destroySnapshot)(); - SMetaTableInfo (*getMetaTableInfoFromSnapshot)(); - int32_t (*getTableInfoFromSnapshot)(); + int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid); + int32_t (*destroySnapshot)(SSnapContext *ctx); + SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); + int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); } SStoreSnapshotFn; /** @@ -327,12 +239,10 @@ void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); -int metaGetTableEntryByName(SMetaReader *pReader, const char *name); int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); -int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); @@ -340,51 +250,40 @@ int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *k bool *acquired); int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen, double selectivityRatio); -int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); -int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, - int32_t payloadLen); +int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); */ typedef struct SStoreMeta { - SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor - void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor - int32_t (*cursorNext)(); // metaTbCursorNext - int32_t (*cursorPrev)(); // metaTbCursorPrev + SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor + void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor + int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext + int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev - int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList); - int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList); + int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList); + int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList); const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it - int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid); - int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType); - int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName); - bool (*isTableExisted)(void *pVnode, tb_uid_t uid); + int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid); + int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType); + int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName); + bool (*isTableExisted)(void *pVnode, tb_uid_t uid); - /** - * int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, - int32_t payloadLen, double selectivityRatio); -int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid); -tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); -int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); -int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, - int32_t payloadLen); - */ - void (*getCachedTableList)(); - void (*putTableListIntoCache)(); + int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); + int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); - /** - * - */ - void *(*storeGetIndexInfo)(); - void *(*getInvertIndex)(void* pVnode); - int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList - void *storeGetVersionRange; - void *storeGetLastTimestamp; + int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes); + int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio); - int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema + void *(*storeGetIndexInfo)(); + void *(*getInvertIndex)(void* pVnode); + int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList + void *storeGetVersionRange; + void *storeGetLastTimestamp; + + int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema // db name, vgId, numOfTables, numOfSTables int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); diff --git a/include/libs/function/function.h b/include/libs/function/function.h index de2494ab3c..e015f4182e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -21,8 +21,8 @@ extern "C" { #endif #include "tcommon.h" -#include "tvariant.h" #include "tsimplehash.h" +#include "tvariant.h" struct SqlFunctionCtx; struct SResultRowEntryInfo; @@ -77,7 +77,7 @@ enum { enum { MAIN_SCAN = 0x0u, REVERSE_SCAN = 0x1u, // todo remove it - PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan + PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan }; typedef struct SPoint1 { @@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle { // incremental state storage typedef struct STdbState { - void* rocksdb; - void** pHandle; - void* writeOpts; - void* readOpts; - void** cfOpts; - void* dbOpt; - struct SStreamTask* pOwner; - void* param; - void* env; - SListNode* pComparNode; - void* pBackendHandle; + void *rocksdb; + void **pHandle; + void *writeOpts; + void *readOpts; + void **cfOpts; + void *dbOpt; + struct SStreamTask *pOwner; + void *param; + void *env; + SListNode *pComparNode; + void *pBackend; char idstr[64]; - void* compactFactory; + void *compactFactory; + TdThreadRwlock rwLock; - void* db; - void* pStateDb; - void* pFuncStateDb; - void* pFillStateDb; // todo refactor - void* pSessionStateDb; - void* pParNameDb; - void* pParTagDb; - void* txn; + void *db; + void *pStateDb; + void *pFuncStateDb; + void *pFillStateDb; // todo refactor + void *pSessionStateDb; + void *pParNameDb; + void *pParTagDb; + void *txn; } STdbState; typedef struct { - STdbState* pTdbState; - struct SStreamFileState* pFileState; - int32_t number; - SSHashObj* parNameMap; - int64_t checkPointId; - int32_t taskId; - int64_t streamId; + STdbState *pTdbState; + struct SStreamFileState *pFileState; + int32_t number; + SSHashObj *parNameMap; + int64_t checkPointId; + int32_t taskId; + int64_t streamId; } SStreamState; typedef struct SFunctionStateStore { - int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); - int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); + int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); + int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen); } SFunctionStateStore; // sql function runtime context @@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx { int16_t functionId; // function id char *pOutput; // final result output buffer, point to sdata->data // input parameter, e.g., top(k, 20), the number of results of top query is kept in param - SFunctParam *param; + SFunctParam *param; // corresponding output buffer for timestamp of each result, e.g., diff/csum SColumnInfoData *pTsOutput; int32_t numOfParams; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ca85622d8a..7f9d20a9dd 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,24 @@ extern "C" { // SListNode* streamBackendAddCompare(void* backend, void* arg); // void streamBackendDelCompare(void* backend, void* arg); -//typedef struct STdbState { +// <<<<<<< HEAD +// typedef struct STdbState { +// rocksdb_t* rocksdb; +// rocksdb_column_family_handle_t** pHandle; +// rocksdb_writeoptions_t* writeOpts; +// rocksdb_readoptions_t* readOpts; +// rocksdb_options_t** cfOpts; +// rocksdb_options_t* dbOpt; +// struct SStreamTask* pOwner; +// void* param; +// void* env; +// SListNode* pComparNode; +// void* pBackend; +// char idstr[64]; +// void* compactFactory; +// TdThreadRwlock rwLock; +// ======= +// typedef struct STdbState { // rocksdb_t* rocksdb; // rocksdb_column_family_handle_t** pHandle; // rocksdb_writeoptions_t* writeOpts; @@ -58,6 +75,7 @@ extern "C" { // TTB* pParTagDb; // TXN* txn; //} STdbState; +//>>>>>>> enh/dev3.0 SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8e7dd0bb0d..1d4bbf073e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -78,11 +78,11 @@ enum { TASK_TRIGGER_STATUS__ACTIVE, }; -enum { +typedef enum { TASK_LEVEL__SOURCE = 1, TASK_LEVEL__AGG, TASK_LEVEL__SINK, -}; +} ETASK_LEVEL; enum { TASK_OUTPUT__FIXED_DISPATCH = 1, @@ -284,13 +284,13 @@ struct SStreamTask { int16_t dispatchMsgType; SStreamStatus status; int32_t selfChildId; - int32_t nodeId; + int32_t nodeId; // vgroup id SEpSet epSet; SCheckpointInfo chkInfo; STaskExec exec; - - // fill history - int8_t fillHistory; + int8_t fillHistory; // fill history + int64_t ekey; // end ts key + int64_t endVer; // end version // children info SArray* childEpInfo; // SArray @@ -351,7 +351,7 @@ typedef struct SStreamMeta { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7d707f4cba..82b714e6eb 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -663,9 +663,10 @@ typedef struct { char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; int64_t targetStbUid; - int32_t fixedSinkVgId; // 0 for shuffle + // fixedSinkVg is not applicable for encode and decode SVgObj fixedSinkVg; + int32_t fixedSinkVgId; // 0 for shuffle // transformation char* sql; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 99694ae4b8..73dbb243a1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -986,20 +986,20 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { } int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); - if (numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) { - bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); - if (isonline && force) { - terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE; - mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), - numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL); - goto _OVER; - } - if (!isonline && !force) { - terrno = TSDB_CODE_DNODE_OFFLINE; - mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), - numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL); - goto _OVER; - } + bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); + + if (isonline && force) { + terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE; + mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), + numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL); + goto _OVER; + } + + if (!isonline && !force) { + terrno = TSDB_CODE_DNODE_OFFLINE; + mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), + numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL); + goto _OVER; } code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 39c771d111..64082536da 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -14,18 +14,8 @@ */ #include "mndScheduler.h" -#include "mndConsumer.h" #include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" -#include "mndShow.h" #include "mndSnode.h" -#include "mndStb.h" -#include "mndStream.h" -#include "mndSubscribe.h" -#include "mndTopic.h" -#include "mndTrans.h" -#include "mndUser.h" #include "mndVgroup.h" #include "parser.h" #include "tcompare.h" @@ -34,12 +24,8 @@ extern bool tsDeployOnSnode; -static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) { - int32_t childId = taosArrayGetSize(pArray); - pTask->selfChildId = childId; - taosArrayPush(pArray, &pTask); - return 0; -} +static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); +static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -97,7 +83,7 @@ END: return terrno; } -int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { if (pStream->smaId != 0) { pTask->outputType = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; @@ -106,16 +92,23 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + if (pTask->tbSink.pSchemaWrapper == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + return 0; } -int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { +#define SINK_NODE_LEVEL (0) + +int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { + isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; @@ -127,47 +120,46 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream sdbRelease(pMnode->pSdb, pDb); } + SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); + if (isShuffle) { memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { + + int32_t numOfVgroups = taosArrayGetSize(pVgs); + for (int32_t i = 0; i < numOfVgroups; i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->id.taskId; + + for (int32_t j = 0; j < numOfSinkNodes; j++) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j); + if (pSinkTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pSinkTask->id.taskId; break; } } } } else { - pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - SArray* pArray = taosArrayGetP(pStream->tasks, 0); - // one sink only - SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId; - pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; - pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; + SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0); + setFixedDownstreamEpInfo(pTask, pOneSinkTask); } + return 0; } -int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { +int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; + pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.nodeId = pTask->nodeId; plan->execNode.epSet = pTask->epSet; - if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + return 0; } @@ -210,100 +202,121 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } +// create sink node for each vgroup. int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; - SArray* tasks = taosArrayGetP(pStream->tasks, 0); while (1) { SVgObj* pVgroup = NULL; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); - if (pTask == NULL) { - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(tasks, pTask); - - pTask->nodeId = pVgroup->vgId; - pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - - // type - pTask->taskLevel = TASK_LEVEL__SINK; - - // sink - if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; - pTask->smaSink.smaId = pStream->smaId; - } else { - pTask->outputType = TASK_OUTPUT__TABLE; - pTask->tbSink.stbUid = pStream->targetStbUid; - memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - if (pTask->tbSink.pSchemaWrapper == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } + mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup); sdbRelease(pSdb, pVgroup); } + return 0; } -int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { - SArray* tasks = taosArrayGetP(pStream->tasks, 0); - SStreamTask* pTask = tNewStreamTask(pStream->uid); +int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { + SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); + + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(tasks, pTask); - pTask->nodeId = pStream->fixedSinkVgId; -#if 0 - SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); - if (pVgroup == NULL) { - return -1; - } + pTask->nodeId = vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); -#endif - pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); - - pTask->taskLevel = TASK_LEVEL__SINK; - - // sink - if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; - pTask->smaSink.smaId = pStream->smaId; - } else { - pTask->outputType = TASK_OUTPUT__TABLE; - pTask->tbSink.stbUid = pStream->targetStbUid; - memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - } - + mndSetSinkTaskInfo(pStream, pTask); return 0; } +static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) { + return 0; +} + +static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream, + SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory, + bool hasExtraSink) { + SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList); + if (pTask == NULL) { + return terrno; + } + + // sink or dispatch + if (hasExtraSink) { + mndAddDispatcherForInnerTask(pMnode, pStream, pTask); + } else { + mndSetSinkTaskInfo(pStream, pTask); + } + + if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { + return terrno; + } + + return TSDB_CODE_SUCCESS; +} + +static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pEpInfo->childId = pTask->selfChildId; + pEpInfo->epSet = pTask->epSet; + pEpInfo->nodeId = pTask->nodeId; + pEpInfo->taskId = pTask->id.taskId; + + return pEpInfo; +} + +void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { + STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher; + pDispatcher->taskId = pTask->id.taskId; + pDispatcher->nodeId = pTask->nodeId; + pDispatcher->epSet = pTask->epSet; + + pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; + pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; +} + +int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) { + SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); + if (pEpInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if(pUpstream->childEpInfo == NULL) { + pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES); + } + + taosArrayPush(pUpstream->childEpInfo, &pEpInfo); + return TSDB_CODE_SUCCESS; +} + int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { - SSdb* pSdb = pMnode->pSdb; + SSdb* pSdb = pMnode->pSdb; + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); - pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); + int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); + pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; @@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); sdbRelease(pSdb, pDbObj); if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { - /*if (true) {*/ - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskOneLevel); + // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { @@ -328,19 +341,20 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } } else { - if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) { + if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { // TODO free return -1; } } } + pStream->totalLevel = planTotLevel + hasExtraSink; if (planTotLevel > 1) { SStreamTask* pInnerTask; // inner level { - SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskInnerLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); @@ -350,25 +364,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - pInnerTask = tNewStreamTask(pStream->uid); + pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; qDestroyQueryPlan(pPlan); return -1; } - pInnerTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); - - pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); - - pInnerTask->taskLevel = TASK_LEVEL__AGG; - - // trigger - pInnerTask->triggerParam = pStream->triggerParam; - // dispatch - if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) { + if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) { qDestroyQueryPlan(pPlan); return -1; } @@ -377,7 +381,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; @@ -392,17 +396,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } else { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } + sdbRelease(pSdb, pVgroup); } } // source level - SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pStream->tasks, &taskSourceLevel); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); @@ -416,66 +421,52 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(taskSourceLevel, pTask); - pTask->triggerParam = 0; + // all the source tasks dispatch result to a single agg node. + setFixedDownstreamEpInfo(pTask, pInnerTask); - // source - pTask->taskLevel = TASK_LEVEL__SOURCE; - - // add fixed vg dispatch - pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; - - pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId; - pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; - pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; - - if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { + if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); - if (pEpInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; - } - pEpInfo->childId = pTask->selfChildId; - pEpInfo->epSet = pTask->epSet; - pEpInfo->nodeId = pTask->nodeId; - pEpInfo->taskId = pTask->id.taskId; - taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); + int32_t code = appendToUpstream(pTask, pInnerTask); sdbRelease(pSdb, pVgroup); - } - } - if (planTotLevel == 1) { - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); - taosArrayPush(pStream->tasks, &taskOneLevel); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + qDestroyQueryPlan(pPlan); + return -1; + } + } + } else if (planTotLevel == 1) { + // create exec stream task, since only one level, the exec task is also the source task + SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); + taosArrayPush(pStream->tasks, &pTaskList); SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); if (plan->subplanType != SUBPLAN_TYPE_SCAN) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -486,42 +477,26 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid); - if (pTask == NULL) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; - } - pTask->fillHistory = pStream->fillHistory; - mndAddTaskToTaskSet(taskOneLevel, pTask); - - // source - pTask->taskLevel = TASK_LEVEL__SOURCE; - - // trigger - pTask->triggerParam = pStream->triggerParam; - - // sink or dispatch - if (hasExtraSink) { - mndAddDispatcherToInnerTask(pMnode, pStream, pTask); - } else { - mndAddSinkToTask(pMnode, pStream, pTask); - } - - if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; - } + // new stream task + int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink); sdbRelease(pSdb, pVgroup); + + if (code != TSDB_CODE_SUCCESS) { + qDestroyQueryPlan(pPlan); + return -1; + } } } + qDestroyQueryPlan(pPlan); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0713150b48..39a1fa223f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream->sourceDbUid == streamObj.sourceDbUid) { ++numOfStream; } + sdbRelease(pMnode->pSdb, pStream); if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); @@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { pDb = NULL; goto _OVER; } + mndReleaseDb(pMnode, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 04fad2d78e..b18cb8e282 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -87,6 +87,28 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) +IF (TD_LINUX) +target_link_libraries( + vnode + PUBLIC os + PUBLIC util + PUBLIC common + PUBLIC tfs + PUBLIC wal + PUBLIC qworker + PUBLIC sync + PUBLIC executor + PUBLIC scheduler + PUBLIC tdb + + # PUBLIC bdb + # PUBLIC scalar + PUBLIC rocksdb-shared + PUBLIC transport + PUBLIC stream + PUBLIC index +) +ELSE() target_link_libraries( vnode PUBLIC os @@ -107,6 +129,7 @@ target_link_libraries( PUBLIC stream PUBLIC index ) +ENDIF() IF (TD_GRANT) TARGET_LINK_LIBRARIES(vnode PUBLIC grant) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 72c8dce52f..fbe2f4809c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -100,18 +100,11 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); // meta -typedef struct SMeta SMeta; // todo: remove -typedef struct SMetaReader SMetaReader; -typedef struct SMetaEntry SMetaEntry; - -#define META_READER_NOLOCK 0x1 - void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); -int metaGetTableEntryByName(SMetaReader *pReader, const char *name); int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *uidList); int32_t metaGetTableTagsByUids(void* pVnode, int64_t suid, SArray *uidList); int32_t metaReadNext(SMetaReader *pReader); @@ -122,37 +115,18 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); bool metaIsTableExist(void* pVnode, tb_uid_t uid); -int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, +int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, bool *acquired); -int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, +int32_t metaUidFilterCachePut(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen, double selectivityRatio); -int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); -int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid); -int32_t metaGetCachedTbGroup(SMeta *pMeta, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList); -int32_t metaPutTbGroupToCache(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, +int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList); +int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen); int64_t metaGetTbNum(SMeta *pMeta); -int64_t metaGetNtbNum(SMeta *pMeta); -//typedef struct { -// int64_t uid; -// int64_t ctbNum; -//} SMetaStbStats; -int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); -#if 1 // refact APIs below (TODO) -typedef SVCreateTbReq STbCfg; -typedef SVCreateTSmaReq SSmaCfg; - -typedef struct SMTbCursor SMTbCursor; - -SMTbCursor *metaOpenTbCursor(void *pVnode); -void metaCloseTbCursor(SMTbCursor *pTbCur); -int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); - -#endif +int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); // tsdb typedef struct STsdbReader STsdbReader; @@ -168,8 +142,8 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, +int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, + SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, SHashObj **pIgnoreTables); int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); @@ -201,32 +175,11 @@ size_t tsdbCacheGetUsage(SVnode *pVnode); int32_t tsdbCacheGetElems(SVnode *pVnode); //// tq -//typedef struct SMetaTableInfo { -// int64_t suid; -// int64_t uid; -// SSchemaWrapper *schema; -// char tbName[TSDB_TABLE_NAME_LEN]; -//} SMetaTableInfo; - typedef struct SIdInfo { int64_t version; int32_t index; } SIdInfo; -//typedef struct SSnapContext { -// SMeta *pMeta; -// int64_t snapVersion; -// TBC *pCur; -// int64_t suid; -// int8_t subType; -// SHashObj *idVersion; -// SHashObj *suidInfo; -// SArray *idList; -// int32_t index; -// bool withMeta; -// bool queryMeta; // true-get meta, false-get data -//} SSnapContext; - typedef struct STqReader { SPackedData msg; SSubmitReq2 submit; @@ -345,59 +298,6 @@ struct SVnodeCfg { #define TABLE_ROLLUP_ON ((int8_t)0x1) #define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0) #define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON) -//struct SMetaEntry { -// int64_t version; -// int8_t type; -// int8_t flags; // TODO: need refactor? -// tb_uid_t uid; -// char *name; -// union { -// struct { -// SSchemaWrapper schemaRow; -// SSchemaWrapper schemaTag; -// SRSmaParam rsmaParam; -// } stbEntry; -// struct { -// int64_t ctime; -// int32_t ttlDays; -// int32_t commentLen; -// char *comment; -// tb_uid_t suid; -// uint8_t *pTags; -// } ctbEntry; -// struct { -// int64_t ctime; -// int32_t ttlDays; -// int32_t commentLen; -// char *comment; -// int32_t ncid; // next column id -// SSchemaWrapper schemaRow; -// } ntbEntry; -// struct { -// STSma *tsma; -// } smaEntry; -// }; -// -// uint8_t *pBuf; -//}; - -//struct SMetaReader { -// int32_t flags; -// SMeta *pMeta; -// SDecoder coder; -// SMetaEntry me; -// void *pBuf; -// int32_t szBuf; -//}; - -//struct SMTbCursor { -// TBC *pDbc; -// void *pKey; -// void *pVal; -// int32_t kLen; -// int32_t vLen; -// SMetaReader mr; -//}; #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3f79fe5fd1..db285dc124 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -103,6 +103,17 @@ struct SQueryNode { _query_reseek_func_t reseek; }; +#if 1 // refact APIs below (TODO) +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; + +SMTbCursor *metaOpenTbCursor(void *pVnode); +void metaCloseTbCursor(SMTbCursor *pTbCur); +int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); +int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); + +#endif + void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); @@ -143,6 +154,9 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaAlterCache(SMeta* pMeta, int32_t nPage); +int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); +int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid); + int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); @@ -477,6 +491,7 @@ struct SCompactInfo { void initStorageAPI(SStorageAPI* pAPI); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 436ca1abd3..8749b3ac94 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -499,8 +499,9 @@ static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, ASSERT(keyLen == sizeof(uint64_t) * 2); } -int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, +int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) { + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; int32_t vgId = TD_VID(pMeta->pVnode); // generate the composed key for LRU cache @@ -603,9 +604,10 @@ static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyL } // check both the payload size and selectivity ratio -int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, +int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) { int32_t code = 0; + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; int32_t vgId = TD_VID(pMeta->pVnode); if (selectivityRatio > tsSelectivityRatio) { @@ -702,7 +704,8 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) { return TSDB_CODE_SUCCESS; } -int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) { +int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) { + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; int32_t vgId = TD_VID(pMeta->pVnode); // generate the composed key for LRU cache @@ -786,9 +789,10 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value) } -int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, +int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen) { int32_t code = 0; + SMeta* pMeta = ((SVnode*)pVnode)->pMeta; int32_t vgId = TD_VID(pMeta->pVnode); if (payloadLen > tsTagFilterResCacheSize) { diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 26182baf7b..726ed6a4c8 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -700,8 +700,6 @@ int64_t metaGetTimeSeriesNum(SMeta *pMeta) { return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; } -int64_t metaGetNtbNum(SMeta *pMeta) { return pMeta->pVnode->config.vndStats.numOfNTables; } - typedef struct { SMeta *pMeta; TBC *pCur; diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 3fb491848a..a49848f442 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -13,8 +13,10 @@ * along with this program. If not, see . */ +#include "vnodeInt.h" #include "meta.h" + static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ef77c9a7c4..6a97ea89b3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -128,6 +128,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { rocksdb_options_set_comparator(options, cmp); rocksdb_block_based_options_set_block_cache(tableoptions, cache); rocksdb_options_set_block_based_table_factory(options, tableoptions); + rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL // rocksdb_options_set_inplace_update_support(options, 1); // rocksdb_options_set_allow_concurrent_memtable_write(options, 0); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 268d17bacb..362934ec84 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -754,7 +754,7 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit return terrno; } -static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, +static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void** ppReader, int32_t capacity, SSDataBlock* pResBlock, const char* idstr) { int32_t code = 0; int8_t level = 0; @@ -4396,11 +4396,12 @@ static void freeSchemaFunc(void* param) { } // ====================================== EXPOSED APIs ====================================== -int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) { +int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, + SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) { STimeWindow window = pCond->twindows; + SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); - int32_t capacity = pVnode->config.tsdbCfg.maxRows; + int32_t capacity = pConf->tsdbCfg.maxRows; if (pResBlock != NULL) { blockDataEnsureCapacity(pResBlock, capacity); } @@ -4431,7 +4432,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } // here we only need one more row, so the capacity is set to be ONE. - code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, pResBlock, idstr); + code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[0], 1, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4445,7 +4446,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } pCond->order = order; - code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, pResBlock, idstr); + code = tsdbReaderCreate(pVnode, pCond, (void**)&((STsdbReader*)pReader)->innerReader[1], 1, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4495,7 +4496,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL goto _err; } - pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter)); + pReader->status.pLDataIter = taosMemoryCalloc(pConf->sttTrigger, sizeof(SLDataIter)); if (pReader->status.pLDataIter == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -5546,7 +5547,7 @@ int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTabl int64_t key = INT64_MIN; for(int32_t i = 0; i < numOfTables; ++i) { - int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL); + int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 1cc985f7df..526b9b4e2d 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -31,17 +31,18 @@ static void initSnapshotFn(SStoreSnapshotFn* pSnapshot); void initStorageAPI(SStorageAPI* pAPI) { initTsdbReaderAPI(&pAPI->tsdReader); initMetadataAPI(&pAPI->metaFn); - initTqAPI(&pAPI->tqReaderFn); initStateStoreAPI(&pAPI->stateStore); initMetaReaderAPI(&pAPI->metaReaderFn); initMetaFilterAPI(&pAPI->metaFilter); + initTqAPI(&pAPI->tqReaderFn); initFunctionStateStore(&pAPI->functionStore); initCacheFn(&pAPI->cacheFn); initSnapshotFn(&pAPI->snapshotFn); } void initTsdbReaderAPI(TsdReader* pReader) { - pReader->tsdReaderOpen = (__store_reader_open_fn_t)tsdbReaderOpen; + pReader->tsdReaderOpen = (int32_t(*)(void*, SQueryTableDataCond*, void*, int32_t, SSDataBlock*, void**, const char*, + bool, SHashObj**))tsdbReaderOpen; pReader->tsdReaderClose = tsdbReaderClose; pReader->tsdNextDataBlock = tsdbNextDataBlock; @@ -87,6 +88,12 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor pMeta->storeGetTableList = vnodeGetTableList; + + pMeta->getCachedTableList = metaGetCachedTableUidList; + pMeta->putCachedTableList = metaUidFilterCachePut; + + pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup; + pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache; } void initTqAPI(SStoreTqReader* pTq) { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f4531729c4..c7677878bf 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -495,7 +495,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf genTbGroupDigest((SNode*)listNode, digest, &context); nodesFree(listNode); - pAPI->metaFn.getCachedTableList(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList); + pAPI->metaFn.metaGetCachedTbGroup(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList); if (tableList) { taosArrayDestroy(pTableListInfo->pTableList); pTableListInfo->pTableList = tableList; @@ -632,7 +632,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf if (tsTagFilterCache) { tableList = taosArrayDup(pTableListInfo->pTableList, NULL); - pAPI->metaFn.putTableListIntoCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo)); + pAPI->metaFn.metaPutTbGroupToCache(pVnode, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo)); } // int64_t st2 = taosGetTimestampUs(); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3984e9a792..2d2f377041 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3446,7 +3446,7 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName)); pRes->info.id.groupId = groupId; - int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode); + int64_t numOfTables = 0; pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables); if (numOfTables != 0) { diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 89b82ef9ba..fa6c709c8f 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -6,13 +6,23 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + if(${BUILD_WITH_ROCKSDB}) + IF (TD_LINUX) + target_link_libraries( + stream + PUBLIC rocksdb-shared tdb + PRIVATE os util transport qcom executor wal index + ) + ELSE() target_link_libraries( stream PUBLIC rocksdb tdb PRIVATE os util transport qcom executor wal index ) + ENDIF() + target_include_directories( stream PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0015fea10f..f7638a42ae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -22,6 +22,9 @@ typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; +typedef struct { + void* tableOpt; +} RocksdbCfParam; typedef struct { rocksdb_t* db; rocksdb_column_family_handle_t** pHandle; @@ -29,12 +32,13 @@ typedef struct { rocksdb_readoptions_t* rOpt; rocksdb_options_t** cfOpt; rocksdb_options_t* dbOpt; - void* param; - void* pBackendHandle; + RocksdbCfParam* param; + void* pBackend; SListNode* pCompareNode; + rocksdb_comparator_t** pCompares; } RocksdbCfInst; -int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids); +int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -46,9 +50,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c char** newval, size_t* newvlen, unsigned char* value_changed); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); -typedef struct { - void* tableOpt; -} RocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -80,16 +81,16 @@ void* streamBackendInit(const char* path) { rocksdb_env_set_low_priority_background_threads(env, 4); rocksdb_env_set_high_priority_background_threads(env, 2); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); + rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 128 << 20); + rocksdb_options_set_write_buffer_size(opts, 48 << 20); rocksdb_options_set_max_total_wal_size(opts, 128 << 20); rocksdb_options_set_recycle_log_file_num(opts, 6); - rocksdb_options_set_max_write_buffer_number(opts, 3); + rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); pHandle->env = env; @@ -114,25 +115,7 @@ void* streamBackendInit(const char* path) { /* list all cf and get prefix */ - int64_t streamId; - int32_t taskId, dummpy = 0; - SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (size_t i = 0; i < nCf; i++) { - char* cf = cfs[i]; - char suffix[64] = {0}; - if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) { - char idstr[128] = {0}; - sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); - // qError("make cf name %s", idstr); - if (taosHashGet(tbl, idstr, strlen(idstr) + 1) == NULL) { - taosHashPut(tbl, idstr, strlen(idstr) + 1, &dummpy, sizeof(dummpy)); - } - } else { - continue; - } - } - streamStateOpenBackendCf(pHandle, (char*)path, tbl); - taosHashCleanup(tbl); + streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf); } rocksdb_list_column_families_destroy(cfs, nCf); @@ -159,16 +142,17 @@ void streamBackendCleanup(void* arg) { } taosHashCleanup(pHandle->cfInst); - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - char* err = NULL; - rocksdb_flush(pHandle->db, flushOpt, &err); - if (err != NULL) { - qError("failed to flush db before streamBackend clean up, reason:%s", err); - taosMemoryFree(err); + if (pHandle->db) { + char* err = NULL; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flush(pHandle->db, flushOpt, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + } + rocksdb_flushoptions_destroy(flushOpt); + rocksdb_close(pHandle->db); } - rocksdb_flushoptions_destroy(flushOpt); - - rocksdb_close(pHandle->db); rocksdb_options_destroy(pHandle->dbOpt); rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); @@ -209,12 +193,13 @@ void streamBackendDelCompare(void* backend, void* arg) { } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); -int streamGetInit(const char* funcName); +int streamGetInit(SStreamState* pState, const char* funcName); // |key|-----value------| // |key|ttl|len|userData| -static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt); +static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int ret = memcmp(aBuf, bBuf, aLen); @@ -666,7 +651,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < cfLen; i++) { - rocksdb_column_family_handle_destroy(inst->pHandle[i]); + if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); } rocksdb_writeoptions_destroy(inst->wOpt); @@ -674,118 +659,130 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { rocksdb_readoptions_destroy(inst->rOpt); taosMemoryFree(inst->cfOpt); - taosMemoryFree(inst->param); taosMemoryFreeClear(inst->param); taosMemoryFree(inst); } -int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { +int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { SBackendHandle* handle = backend; char* err = NULL; - size_t nSize = taosHashGetSize(ids); - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + int64_t streamId; + int32_t taskId, dummy = 0; + char suffix[64] = {0}; - char** cfNames = taosMemoryCalloc(nSize * cfLen + 1, sizeof(char*)); - void* pIter = taosHashIterate(ids, NULL); - size_t keyLen = 0; - char* idstr = taosHashGetKey(pIter, &keyLen); - for (int i = 0; i < nSize * cfLen + 1; i++) { - cfNames[i] = (char*)taosMemoryCalloc(1, 128); - if (i == 0) { - memcpy(cfNames[0], "default", strlen("default")); - continue; - } + rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); + RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**)); + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); - GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); - if (i % cfLen == 0) { - pIter = taosHashIterate(ids, pIter); - if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); - } - } - rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); - RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); - for (int i = 0; i < nSize * cfLen + 1; i++) { + for (int i = 0; i < nCf; i++) { + char* cf = cfs[i]; + char funcname[64] = {0}; cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt); - if (i == 0) { - continue; + if (i == 0) continue; + if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); + params[i].tableOpt = tableOpt; + + int idx = streamGetInit(NULL, funcname); + SCfInit* cfPara = &ginitDict[idx]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); + pCompare[i] = compare; } - // refactor later - rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); - - rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); - rocksdb_block_based_options_set_filter_policy(tableOpt, filter); - - rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); - params[i].tableOpt = tableOpt; - }; - - rocksdb_comparator_t** pCompare = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_comparator_t**)); - for (int i = 0; i < nSize * cfLen + 1; i++) { - if (i == 0) { - continue; - } - SCfInit* cf = &ginitDict[(i - 1) % cfLen]; - - rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); - pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = - taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nSize * cfLen + 1, (const char* const*)cfNames, + rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs, (const rocksdb_options_t* const*)cfOpts, cfHandle, &err); if (err != NULL) { qError("failed to open rocksdb cf, reason:%s", err); taosMemoryFree(err); } else { - qDebug("succ to open rocksdb cf, reason:%s", err); - } - - pIter = taosHashIterate(ids, NULL); - idstr = taosHashGetKey(pIter, &keyLen); - for (int i = 0; i < nSize; i++) { - RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); - rocksdb_column_family_handle_t** subCf = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - rocksdb_comparator_t** subCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); - RocksdbCfParam* subParam = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); - rocksdb_options_t** subOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); - for (int j = 0; j < cfLen; j++) { - subCf[j] = cfHandle[i * cfLen + j + 1]; - subCompare[j] = pCompare[i * cfLen + j + 1]; - subParam[j] = params[i * cfLen + j + 1]; - subOpt[j] = cfOpts[i * cfLen + j + 1]; - } - inst->db = db; - inst->pHandle = subCf; - inst->wOpt = rocksdb_writeoptions_create(); - inst->rOpt = rocksdb_readoptions_create(); - inst->cfOpt = (rocksdb_options_t**)subOpt; - inst->dbOpt = handle->dbOpt; - inst->param = subParam; - inst->pBackendHandle = handle; - handle->db = db; - SCfComparator compare = {.comp = subCompare, .numOfComp = cfLen}; - inst->pCompareNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); - - taosHashPut(handle->cfInst, idstr, keyLen, &inst, sizeof(void*)); - - pIter = taosHashIterate(ids, pIter); - if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); + qDebug("succ to open rocksdb cf"); } + // close default cf rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_options_destroy(cfOpts[0]); + handle->db = db; - for (int i = 0; i < nSize * cfLen + 1; i++) { - taosMemoryFree(cfNames[i]); + static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + for (int i = 0; i < nCf; i++) { + char* cf = cfs[i]; + if (i == 0) continue; + char funcname[64] = {0}; + if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { + char idstr[128] = {0}; + sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); + + int idx = streamGetInit(NULL, funcname); + + RocksdbCfInst* inst = NULL; + RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1); + if (pInst == NULL || *pInst == NULL) { + inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); + inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); + inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + inst->wOpt = rocksdb_writeoptions_create(); + inst->rOpt = rocksdb_readoptions_create(); + inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); + inst->pBackend = handle; + inst->db = db; + inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); + + inst->dbOpt = handle->dbOpt; + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); + } else { + inst = *pInst; + } + inst->cfOpt[idx] = cfOpts[i]; + inst->pCompares[idx] = pCompare[i]; + memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam)); + inst->pHandle[idx] = cfHandle[i]; + } } - taosMemoryFree(cfNames); + void** pIter = taosHashIterate(handle->cfInst, NULL); + while (pIter) { + RocksdbCfInst* inst = *pIter; + + for (int i = 0; i < cfLen; i++) { + if (inst->cfOpt[i] == NULL) { + rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt); + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt); + + SCfInit* cfPara = &ginitDict[i]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); + + inst->pCompares[i] = compare; + inst->cfOpt[i] = opt; + inst->param[i].tableOpt = tableOpt; + } + } + SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; + inst->pCompareNode = streamBackendAddCompare(handle, &compare); + pIter = taosHashIterate(handle->cfInst, pIter); + } + taosMemoryFree(cfHandle); taosMemoryFree(pCompare); taosMemoryFree(params); taosMemoryFree(cfOpts); - return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { @@ -801,15 +798,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pState->pTdbState->pHandle = (void**)inst->pHandle; pState->pTdbState->writeOpts = inst->wOpt; pState->pTdbState->readOpts = inst->rOpt; - pState->pTdbState->cfOpts = (void**)inst->cfOpt; + pState->pTdbState->cfOpts = (void**)(inst->cfOpt); pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = inst->param; - pState->pTdbState->pBackendHandle = handle; + pState->pTdbState->pBackend = handle; pState->pTdbState->pComparNode = inst->pCompareNode; taosThreadMutexUnlock(&handle->cfMutex); return 0; } - taosThreadMutexUnlock(&handle->cfMutex); char* err = NULL; @@ -839,25 +835,17 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); - for (int i = 0; i < cfLen; i++) { - char buf[128] = {0}; - GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key); - cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); - if (err != NULL) { - qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); - taosMemoryFreeClear(err); - } - } + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); pState->pTdbState->rocksdb = handle->db; pState->pTdbState->pHandle = (void**)cfHandle; pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); pState->pTdbState->readOpts = rocksdb_readoptions_create(); - pState->pTdbState->cfOpts = (void**)(rocksdb_options_t**)cfOpt; + pState->pTdbState->cfOpts = (void**)cfOpt; pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = param; - pState->pTdbState->pBackendHandle = handle; + pState->pTdbState->pBackend = handle; + taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); @@ -866,7 +854,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { } void streamStateCloseBackend(SStreamState* pState, bool remove) { - SBackendHandle* pHandle = pState->pTdbState->pBackendHandle; + SBackendHandle* pHandle = pState->pTdbState->pBackend; taosThreadMutexLock(&pHandle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { @@ -888,7 +876,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { char* err = NULL; if (remove) { for (int i = 0; i < cfLen; i++) { - rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); + if (pState->pTdbState->pHandle[i] != NULL) + rocksdb_drop_column_family(pState->pTdbState->rocksdb, + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err); if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -897,7 +887,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } else { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); for (int i = 0; i < cfLen; i++) { - rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); + if (pState->pTdbState->pHandle[i] != NULL) + rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -907,7 +898,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } for (int i = 0; i < cfLen; i++) { - rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + if (pState->pTdbState->pHandle[i] != NULL) { + rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + } } taosMemoryFreeClear(pState->pTdbState->pHandle); for (int i = 0; i < cfLen; i++) { @@ -916,7 +909,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } if (remove) { - streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode); + streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode); } rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts); pState->pTdbState->writeOpts = NULL; @@ -925,24 +918,52 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { pState->pTdbState->readOpts = NULL; taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFreeClear(pState->pTdbState->param); + + taosThreadRwlockDestroy(&pState->pTdbState->rwLock); pState->pTdbState->rocksdb = NULL; } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; for (int i = 0; i < comp->numOfComp; i++) { - rocksdb_comparator_destroy(comp->comp[i]); + if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]); } taosMemoryFree(comp->comp); } -int streamGetInit(const char* funcName) { +int streamGetInit(SStreamState* pState, const char* funcName) { + int idx = -1; size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) { - return i; + idx = i; + break; } } - return -1; + if (pState != NULL && idx != -1) { + rocksdb_column_family_handle_t* cf = NULL; + taosThreadRwlockRdlock(&pState->pTdbState->rwLock); + cf = pState->pTdbState->pHandle[idx]; + taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + if (cf == NULL) { + char buf[128] = {0}; + GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); + char* err = NULL; + + taosThreadRwlockWrlock(&pState->pTdbState->rwLock); + cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); + if (err != NULL) { + idx = -1; + qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId, + funcName, err); + taosMemoryFree(err); + } else { + pState->pTdbState->pHandle[idx] = cf; + } + taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + } + } + + return idx; } bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) { rocksdb_iter_seek(iter, buf, len); @@ -954,8 +975,9 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } return true; } -rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt) { - int idx = streamGetInit(cfName); +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, + rocksdb_readoptions_t** readOpt) { + int idx = streamGetInit(pState, cfName); if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); @@ -966,7 +988,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_fill_cache(rOpt, 0); - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -974,7 +997,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -983,11 +1006,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -1004,7 +1028,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -1013,11 +1037,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ if (val == NULL) { \ if (err == NULL) { \ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ @@ -1051,7 +1076,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ @@ -1060,9 +1085,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ if (err != NULL) { \ qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ @@ -1113,8 +1139,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { } char* err = NULL; - rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], - sKeyStr, sLen, eKeyStr, eLen, &err); + if (pState->pTdbState->pHandle[1] != NULL) { + rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], + sKeyStr, sLen, eKeyStr, eLen, &err); + } // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // eLen); if (err != NULL) { @@ -1214,7 +1242,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin } pCur->number = pState->number; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -1254,7 +1283,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); @@ -1276,7 +1306,8 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -1368,7 +1399,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } pCur->number = pState->number; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -1407,7 +1439,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta return NULL; } pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; char buf[128] = {0}; @@ -1443,7 +1476,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con return NULL; } pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -1535,7 +1569,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1594,7 +1629,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const } pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1629,7 +1665,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const } pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1664,7 +1701,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } pCur->number = pState->number; pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; @@ -1895,7 +1933,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co rocksdb_snapshot_t* snapshot = NULL; rocksdb_readoptions_t* readopts = NULL; - rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", (void**)&snapshot, (void**)&readopts); + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); if (pIter == NULL) { return -1; } @@ -1934,7 +1972,8 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); return pCur; } int32_t streamDefaultIterValid_rocksdb(void* iter) { @@ -1965,7 +2004,6 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { } return dst; } - // batch func void* streamStateCreateBatch() { rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); @@ -1980,7 +2018,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - int i = streamGetInit(cfName); + int i = streamGetInit(pState, cfName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfName); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8a03896978..a0caffd41f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -17,14 +17,25 @@ #include "tstream.h" #include "wal.h" -SStreamTask* tNewStreamTask(int64_t streamId) { +static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) { + int32_t childId = taosArrayGetSize(pArray); + pTask->selfChildId = childId; + taosArrayPush(pArray, &pTask); + return 0; +} + +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; + pTask->taskLevel = taskLevel; + pTask->fillHistory = fillHistory; + pTask->triggerParam = triggerParam; char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); @@ -34,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + mndAddToTaskset(pTaskList, pTask); return pTask; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index fa8b5d33b7..acbf0bb0d3 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -166,7 +166,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { char* t = NULL; if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) { void* payload = GET_PAYLOAD_DATA(pg); - t = doCompressData(payload, pBuf->pageSize, &size, pBuf); + t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf); if (size < 0) { uError("failed to compress data when flushing data to disk, %s", pBuf->id); terrno = TSDB_CODE_INVALID_PARA; diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 2e307771b7..0bf06e6f44 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -75,4 +75,12 @@ target_link_libraries(rbtreeTest os util gtest_main) add_test( NAME rbtreeTest COMMAND rbtreeTest -) \ No newline at end of file +) + +# pageBufferTest +add_executable(pageBufferTest "pageBufferTest.cpp") +target_link_libraries(pageBufferTest os util gtest_main) +add_test( + NAME pageBufferTest + COMMAND pageBufferTest +) diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index 00ed804930..50d3656ccd 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -157,6 +157,68 @@ void recyclePageTest() { destroyDiskbasedBuf(pBuf); } + +int saveDataToPage(SFilePage* pPg, const char* data, uint32_t len) { + memcpy(pPg->data + pPg->num, data, len); + pPg->num += len; + setBufPageDirty(pPg, true); + return 0; +} + +bool checkBufVarData(SFilePage* pPg, const char* varData, uint32_t varLen) { + const char* start = pPg->data + sizeof(SFilePage); + for (uint32_t i = 0; i < (pPg->num - sizeof(SFilePage)) / varLen; ++i) { + if (0 != strncmp(start + 6 * i + 3, varData, varLen - 3)) { + using namespace std; + cout << "pos: " << sizeof(SFilePage) + 6 * i + 3 << " should be " << varData << " but is: " << start + 6 * i + 3 + << endl; + return false; + } + } + return true; +} + +// SPageInfo.pData: | sizeof(void*) 8 bytes | sizeof(SFilePage) 4 bytes| 4096 bytes | +// ^ +// | +// SFilePage: flush to disk from here +void testFlushAndReadBackBuffer() { + SDiskbasedBuf* pBuf = NULL; + uint32_t totalLen = 4096; + auto code = createDiskbasedBuf(&pBuf, totalLen, totalLen * 2, "1", TD_TMP_DIR_PATH); + int32_t pageId = -1; + auto* pPg = (SFilePage*)getNewBufPage(pBuf, &pageId); + ASSERT_TRUE(pPg != nullptr); + pPg->num = sizeof(SFilePage); + + // save data into page + uint32_t len = 6; // sizeof(SFilePage) + 6 * 682 = 4096 + // nullbitmap(1) + len(2) + AA\0(3) + char* rowData = (char*)taosMemoryCalloc(1, len); + *(uint16_t*)(rowData + 2) = (uint16_t)2; + rowData[3] = 'A'; + rowData[4] = 'A'; + + while (pPg->num + len <= getBufPageSize(pBuf)) { + saveDataToPage(pPg, rowData, len); + } + ASSERT_EQ(pPg->num, totalLen); + ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len)); + releaseBufPage(pBuf, pPg); + + // flush to disk + int32_t newPgId = -1; + pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId); + releaseBufPage(pBuf, pPg); + pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId); + releaseBufPage(pBuf, pPg); + + // reload it from disk + pPg = (SFilePage*)getBufPage(pBuf, pageId); + ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len)); + destroyDiskbasedBuf(pBuf); +} + } // namespace TEST(testCase, resultBufferTest) { @@ -164,6 +226,7 @@ TEST(testCase, resultBufferTest) { simpleTest(); writeDownTest(); recyclePageTest(); + testFlushAndReadBackBuffer(); } -#pragma GCC diagnostic pop \ No newline at end of file +#pragma GCC diagnostic pop diff --git a/tests/script/tsim/dnode/drop_dnode_has_mnode.sim b/tests/script/tsim/dnode/drop_dnode_has_mnode.sim index 054f978607..8a74367726 100644 --- a/tests/script/tsim/dnode/drop_dnode_has_mnode.sim +++ b/tests/script/tsim/dnode/drop_dnode_has_mnode.sim @@ -35,7 +35,7 @@ endi print =============== step2 drop dnode 3 sql_error drop dnode 1 -sql drop dnode 3 +sql drop dnode 3 force sql select * from information_schema.ins_dnodes print ===> $data00 $data01 $data02 $data03 $data04 $data05 diff --git a/tests/script/tsim/dnode/offline_reason.sim b/tests/script/tsim/dnode/offline_reason.sim index 8c4d8b47f7..23f015392b 100644 --- a/tests/script/tsim/dnode/offline_reason.sim +++ b/tests/script/tsim/dnode/offline_reason.sim @@ -57,7 +57,7 @@ if $data(2)[7] != @status msg timeout@ then endi print ========== step4 -sql drop dnode 2 +sql drop dnode 2 force sql select * from information_schema.ins_dnodes if $rows != 1 then return -1 diff --git a/tests/script/tsim/valgrind/checkError1.sim b/tests/script/tsim/valgrind/checkError1.sim index 5f82d2d935..debe633f06 100644 --- a/tests/script/tsim/valgrind/checkError1.sim +++ b/tests/script/tsim/valgrind/checkError1.sim @@ -20,7 +20,7 @@ sql_error alter user u2 sysinfo 0 print =============== step2 create drop dnode sql create dnode $hostname port 7200 sql create dnode $hostname port 7300 -sql drop dnode 3 +sql drop dnode 3 force sql alter dnode 1 'debugflag 131' print =============== step3: select * from information_schema.ins_dnodes