diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index 1b7f63510b..c86b631df4 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -190,7 +190,8 @@ The effective value of charset is UTF-8. |Parameter Name |Supported Version |Dynamic Modification|Description| |-----------------------|-------------------------|--------------------|------------| |supportVnodes | |Supported, effective immediately |Maximum number of vnodes supported by a dnode, range 0-4096, default value is twice the number of CPU cores + 5| -|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 0-1024, default value 4| +|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-1024, default value 4| +|numOfCompactThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-16, default value 2| |numOfMnodeReadThreads | |Supported, effective after restart|Number of Read threads for mnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)| |numOfVnodeQueryThreads | |Supported, effective after restart|Number of Query threads for vnode, range 0-1024, default value is twice the number of CPU cores (not exceeding 16)| |numOfVnodeFetchThreads | |Supported, effective after restart|Number of Fetch threads for vnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)| diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 0a4160a00b..c08581fdc1 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -185,7 +185,8 @@ charset 的有效值是 UTF-8。 |参数名称|支持版本|动态修改|参数含义| |--------------------------|----------|-------------------------|-| |supportVnodes | |支持动态修改 立即生效 |dnode 支持的最大 vnode 数目,取值范围 0-4096,默认值 CPU 核数的 2 倍 + 5| -|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 0-1024,默认值为 4| +|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-1024,默认值为 4| +|numOfCompactThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-16,默认值为 2| |numOfMnodeReadThreads | |支持动态修改 重启生效 |mnode 的 Read 线程数目,取值范围 0-1024,默认值为 CPU 核数的四分之一(不超过 4)| |numOfVnodeQueryThreads | |支持动态修改 重启生效 |vnode 的 Query 线程数目,取值范围 0-1024,默认值为 CPU 核数的两倍(不超过 16)| |numOfVnodeFetchThreads | |支持动态修改 重启生效 |vnode 的 Fetch 线程数目,取值范围 0-1024,默认值为 CPU 核数的四分之一(不超过 4)| diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 0800b3ea50..6beb7c8860 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -112,9 +112,8 @@ extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsQueueMemoryAllowed; extern int32_t tsRetentionSpeedLimitMB; -extern const char *tsAlterCompactTaskKeywords; -extern int32_t tsNumOfCompactThreads; -extern int32_t tsNumOfRetentionThreads; +extern int32_t tsNumOfCompactThreads; +extern int32_t tsNumOfRetentionThreads; // sync raft extern int32_t tsElectInterval; diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index a2be706dc0..7a4401827c 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -389,7 +389,6 @@ typedef struct SStateStore { int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - void (*streamStateSetFillInfo)(SStreamState* pState); void (*streamStateClearExpiredState)(SStreamState* pState); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, @@ -455,7 +454,6 @@ typedef struct SStateStore { int32_t (*streamStateBegin)(SStreamState* pState); void (*streamStateCommit)(SStreamState* pState); void (*streamStateDestroy)(SStreamState* pState, bool remove); - int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts); void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst); } SStateStore; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 2179547352..b4e0087b1a 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); void streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState, bool remove); -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); int32_t streamStateDelTaskDb(SStreamState* pState); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); @@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); // twa -void streamStateSetFillInfo(SStreamState* pState); void streamStateClearExpiredState(SStreamState* pState); void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index f1f5b00e38..f47c308e18 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId); -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState); void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts); diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index a6a3c63a50..709d053414 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -148,7 +148,7 @@ int32_t tfsMkdirRecur(STfs *pTfs, const char *rname); * @return int32_t 0 for success, -1 for failure. */ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId); - +#if 0 /** * @brief check directories exist in tfs. * @@ -158,7 +158,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId); * @return true for exist, false for not exist. */ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId); - +#endif /** * @brief Remove directory at all levels in tfs. * @@ -241,7 +241,7 @@ void tfsBasename(const STfsFile *pFile, char *dest); * @param dest The buffer where dirname will be saved. */ void tfsDirname(const STfsFile *pFile, char *dest); - +#if 0 /** * @brief Get the absolute file name of rname. * @@ -251,7 +251,7 @@ void tfsDirname(const STfsFile *pFile, char *dest); * @param aname absolute file name */ void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname); - +#endif /** * @brief Remove file in tfs. * diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 35ab328815..68d75f9897 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -66,9 +66,9 @@ int32_t s3Begin() { void s3End() { S3_deinitialize(); } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } - +#if 0 static int32_t s3ListBucket(char const *bucketname); - +#endif static void s3DumpCfgByEp(int8_t epIndex) { // clang-format off (void)fprintf(stdout, @@ -291,7 +291,7 @@ static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) { TAOS_RETURN(code); } - +#if 0 static int32_t s3ListBucket(char const *bucketname) { int32_t code = 0; @@ -312,7 +312,7 @@ static int32_t s3ListBucket(char const *bucketname) { TAOS_RETURN(code); } - +#endif typedef struct growbuffer { // The total number of bytes, and the start byte int size; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 355a34d05b..d6d3e3a443 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -102,9 +102,8 @@ int32_t tsMaxStreamBackendCache = 128; // M int32_t tsPQSortMemThreshold = 16; // M int32_t tsRetentionSpeedLimitMB = 0; // unlimited -const char *tsAlterCompactTaskKeywords = "max_compact_tasks"; -int32_t tsNumOfCompactThreads = 2; -int32_t tsNumOfRetentionThreads = 1; +int32_t tsNumOfCompactThreads = 2; +int32_t tsNumOfRetentionThreads = 1; // sync raft int32_t tsElectInterval = 25 * 1000; @@ -744,8 +743,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); - - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); + + TAOS_CHECK_RETURN( + cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -794,9 +794,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); - tsNumOfCompactThreads = tsNumOfCommitThreads; - tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4); - tsNumOfSupportVnodes = tsNumOfCores * 2 + 5; tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2); @@ -841,7 +838,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxCompactConcurrency", tsNumOfCompactThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCompactThreads", tsNumOfCompactThreads, 1, 16, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0); @@ -1038,10 +1035,8 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } - pItem = cfgGetItem(pCfg, "maxCompactConcurrency"); + pItem = cfgGetItem(pCfg, "numOfCompactThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfCompactThreads = numOfCores / 2; - tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4); pItem->i32 = tsNumOfCompactThreads; pItem->stype = stype; } @@ -1546,7 +1541,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCommitThreads"); tsNumOfCommitThreads = pItem->i32; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "maxCompactConcurrency"); + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCompactThreads"); tsNumOfCompactThreads = pItem->i32; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "retentionSpeedLimitMB"); @@ -2349,6 +2344,8 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, TAOS_RETURN(code); } +extern void tsdbAlterNumCompactThreads(); + static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = -1; @@ -2399,6 +2396,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { goto _exit; } + if (strcasecmp(name, "numOfCompactThreads") == 0) { +#ifdef TD_ENTERPRISE + tsNumOfCompactThreads = pItem->i32; + code = TSDB_CODE_SUCCESS; + // tsdbAlterNumCompactThreads(); +#else + code = TSDB_CODE_INVALID_CFG; +#endif + goto _exit; + } + { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag}, diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index bb12612273..31afb7377e 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -40,6 +40,46 @@ add_test( COMMAND dataformatTest ) +# cosCpTest.cpp +add_executable(cosCpTest "") +target_sources( + cosCpTest + PRIVATE + "cosCpTest.cpp" +) +target_link_libraries(cosCpTest gtest gtest_main util common) +target_include_directories( + cosCpTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosCpTest + COMMAND cosCpTest +) + +if(TD_LINUX) + +# cosTest.cpp +add_executable(cosTest "") +target_sources( + cosTest + PRIVATE + "cosTest.cpp" +) +target_link_libraries(cosTest gtest gtest_main util common) +target_include_directories( + cosTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) +add_test( + NAME cosTest + COMMAND cosTest +) + +endif() + if (${TD_LINUX}) # tmsg test add_executable(tmsgTest "") @@ -60,4 +100,4 @@ if (${TD_LINUX}) add_custom_command(TARGET tmsgTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${MSG_TBL_FILE} $ ) -endif () \ No newline at end of file +endif () diff --git a/source/common/test/cosCpTest.cpp b/source/common/test/cosCpTest.cpp new file mode 100644 index 0000000000..fc16daa8cc --- /dev/null +++ b/source/common/test/cosCpTest.cpp @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +TEST(testCase, cpOpenCloseRemove) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuild) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoad) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 0); + EXPECT_EQ(cp.parts[0].etag, std::string("")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpBuildUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + int seq = 1; + char *etags[1] = {"etags-1-xxx"}; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size); + + cos_cp_update(&cp, cp.parts[seq - 1].index, etags[seq - 1], 0); + + EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS); + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cpLoadUpdate) { + int32_t code = 0, lino = 0; + + int64_t contentLength = 1024; + const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3; + int totalSeq = (contentLength + chunk_size - 1) / chunk_size; + const int max_part_num = 10000; + if (totalSeq > max_part_num) { + chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num; + totalSeq = (contentLength + chunk_size - 1) / chunk_size; + } + SCheckpoint cp; + char const *file = "./afile"; + char file_cp_path[TSDB_FILENAME_LEN]; + int64_t lmtime = 20241220141705; + char const *upload_id = "upload-id-xxx"; + + (void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file); + (void)memset(&cp, 0, sizeof(cp)); + + cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart)); + if (!cp.parts) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosCheckExistFile(file_cp_path)) { + EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS); + + EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true); + + EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD); + EXPECT_EQ(cp.md5, std::string("")); + EXPECT_EQ(cp.thefile, nullptr); + EXPECT_EQ(std::string(cp.file_path), "./afile"); + EXPECT_EQ(cp.file_size, 1024); + EXPECT_EQ(cp.file_last_modified, 20241220141705); + EXPECT_EQ(cp.file_md5, std::string("")); + EXPECT_EQ(cp.object_name, std::string("")); + EXPECT_EQ(cp.object_size, 0); + EXPECT_EQ(cp.object_last_modified, std::string("")); + EXPECT_EQ(cp.object_etag, std::string("")); + EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx")); + + EXPECT_EQ(cp.part_num, 1); + EXPECT_EQ(cp.part_size, 8388608); + EXPECT_EQ(cp.parts[0].index, 0); + EXPECT_EQ(cp.parts[0].offset, 0); + EXPECT_EQ(cp.parts[0].size, 1024); + EXPECT_EQ(cp.parts[0].completed, 1); + EXPECT_EQ(cp.parts[0].etag, std::string("etags-1-xxx")); + EXPECT_EQ(cp.parts[0].crc64, 0); + } + + if (cp.thefile) { + EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS); + } + if (cp.parts) { + taosMemoryFree(cp.parts); + } + + EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} diff --git a/source/common/test/cosTest.cpp b/source/common/test/cosTest.cpp new file mode 100644 index 0000000000..5a6aee52d9 --- /dev/null +++ b/source/common/test/cosTest.cpp @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + +int32_t cosInitEnv() { + int32_t code = 0; + bool isBlob = false; + + extern int8_t tsS3Ablob; + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + extern char tsS3BucketName[TSDB_FQDN_LEN]; + + tsS3Ablob = isBlob; + /* + const char *hostname = "endpoint/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + */ + + // const char *hostname = "http://192.168.1.52:9000"; + // const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + // const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + // const char *bucketName = "test-bucket"; + const char *hostname = "192.168.1.52:9000"; + const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel"; + const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX"; + const char *bucketName = "ci-bucket19"; + + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + + // setup s3 env + extern int8_t tsS3EpNum; + extern int8_t tsS3Https[TSDB_MAX_EP_NUM]; + + tsS3EpNum = 1; + tsS3Https[0] = false; + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +TEST(testCase, cosCpPutError) { + int32_t code = 0, lino = 0; + + char const *objectName = "testObject"; + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), -1); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3EvictCache("", 0); + + s3End(); + + return; + +_exit: + std::cout << "code: " << code << std::endl; +} + +TEST(testCase, cosCpPut) { + int32_t code = 0, lino = 0; + + int8_t with_cp = 0; + char *data = nullptr; + + const long objectSize = 65 * 1024 * 1024; + char const *objectName = "cosut.bin"; + const char object_name[] = "cosut.bin"; + + EXPECT_EQ(std::string(object_name), objectName); + + EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS); + EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS); + + { + data = (char *)taosMemoryCalloc(1, objectSize); + if (!data) { + TAOS_CHECK_EXIT(terrno); + } + + for (int i = 0; i < objectSize / 2; ++i) { + data[i * 2 + 1] = 1; + } + + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, objectSize); + GTEST_ASSERT_EQ(n, objectSize); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + + with_cp = 1; + code = s3PutObjectFromFile2(path, objectName, with_cp); + GTEST_ASSERT_EQ(code, 0); + +#if defined(USE_S3) + EXPECT_EQ(s3Size(objectName), objectSize); +#else + EXPECT_EQ(s3Size(objectName), 0); +#endif + + s3End(); + s3EvictCache("", 0); + + taosMemoryFree(data); + + EXPECT_EQ(taosRemoveFile(path), TSDB_CODE_SUCCESS); + } + + return; + +_exit: + if (data) { + taosMemoryFree(data); + s3End(); + } + + std::cout << "code: " << code << std::endl; +} diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 8139e4aa98..9ed4ee83c4 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -475,27 +475,6 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -extern void tsdbAlterNumCompactThreads(); -static int32_t dmAlterMaxCompactTask(const char *value) { - int32_t max_compact_tasks; - char *endptr = NULL; - - max_compact_tasks = taosStr2Int32(value, &endptr, 10); - if (endptr == value || endptr[0] != '\0') { - return TSDB_CODE_INVALID_MSG; - } - - if (max_compact_tasks != tsNumOfCompactThreads) { - dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks); - tsNumOfCompactThreads = max_compact_tasks; -#ifdef TD_ENTERPRISE - (void)tsdbAlterNumCompactThreads(); -#endif - } - - return TSDB_CODE_SUCCESS; -} - int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t code = 0; SDCfgDnodeReq cfgReq = {0}; @@ -509,10 +488,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs); } - if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) { - return dmAlterMaxCompactTask(cfgReq.value); - } - dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value); code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 4fe4333534..68dc981338 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index df6fb17730..41e6c6c2c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur; pStore->streamStateGetKVByCur = streamStateGetKVByCur; - pStore->streamStateSetFillInfo = streamStateSetFillInfo; pStore->streamStateClearExpiredState = streamStateClearExpiredState; pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist; @@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; pStore->streamStateDestroy = streamStateDestroy; - pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateReloadInfo = streamStateReloadInfo; pStore->streamStateCopyBackend = streamStateCopyBackend; } diff --git a/source/libs/azure/src/td_block_blob_client.cpp b/source/libs/azure/src/td_block_blob_client.cpp index 33ac774d0c..ba2ac14551 100644 --- a/source/libs/azure/src/td_block_blob_client.cpp +++ b/source/libs/azure/src/td_block_blob_client.cpp @@ -38,6 +38,8 @@ TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::strin return newClient; } +TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} +#if 0 TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr credential, const BlobClientOptions& options) : BlobClient(blobUrl, std::move(credential), options) {} @@ -50,8 +52,6 @@ TDBlockBlobClient::TDBlockBlobClient(const std::string& TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options) : BlobClient(blobUrl, options) {} -TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {} - TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const { TDBlockBlobClient newClient(*this); if (snapshot.empty()) { @@ -74,47 +74,6 @@ TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId) return newClient; } -Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, - const UploadBlockBlobOptions& options, - const Azure::Core::Context& context) const { - _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; - if (options.TransactionalContentHash.HasValue()) { - if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { - protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; - } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { - protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; - } - } - protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; - protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; - protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; - protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; - protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; - protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; - protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); - protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); - protocolLayerOptions.Tier = options.AccessTier; - protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; - protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; - protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; - protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; - protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; - protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; - if (m_customerProvidedKey.HasValue()) { - protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; - protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; - protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); - } - protocolLayerOptions.EncryptionScope = m_encryptionScope; - if (options.ImmutabilityPolicy.HasValue()) { - protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; - protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; - } - protocolLayerOptions.LegalHold = options.HasLegalHold; - - return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); -} - Azure::Response TDBlockBlobClient::UploadFrom( const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options, const Azure::Core::Context& context) const { @@ -270,6 +229,47 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } +#endif +Azure::Response TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content, + const UploadBlockBlobOptions& options, + const Azure::Core::Context& context) const { + _detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions; + if (options.TransactionalContentHash.HasValue()) { + if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) { + protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value; + } else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) { + protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value; + } + } + protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType; + protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding; + protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage; + protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value; + protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition; + protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl; + protocolLayerOptions.Metadata = std::map(options.Metadata.begin(), options.Metadata.end()); + protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags); + protocolLayerOptions.Tier = options.AccessTier; + protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId; + protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince; + protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince; + protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch; + protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch; + protocolLayerOptions.IfTags = options.AccessConditions.TagConditions; + if (m_customerProvidedKey.HasValue()) { + protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key; + protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash; + protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString(); + } + protocolLayerOptions.EncryptionScope = m_encryptionScope; + if (options.ImmutabilityPolicy.HasValue()) { + protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn; + protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode; + } + protocolLayerOptions.LegalHold = options.HasLegalHold; + + return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); +} Azure::Response TDBlockBlobClient::UploadFrom( const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options, @@ -349,7 +349,7 @@ Azure::Response TDBlockBlobClient::UploadFrom return Azure::Response(std::move(result), std::move(commitBlockListResponse.RawResponse)); } - +#if 0 Azure::Response TDBlockBlobClient::UploadFromUri( const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options, const Azure::Core::Context& context) const { @@ -396,7 +396,7 @@ Azure::Response TDBlockBlobClient::UploadF return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::StageBlock(const std::string& blockId, Azure::Core::IO::BodyStream& content, const StageBlockOptions& options, @@ -419,7 +419,7 @@ Azure::Response TDBlockBlobClient::StageBlock(const st protocolLayerOptions.EncryptionScope = m_encryptionScope; return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::StageBlockFromUri( const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options, const Azure::Core::Context& context) const { @@ -457,7 +457,7 @@ Azure::Response TDBlockBlobClient::StageBlockFr return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#endif Azure::Response TDBlockBlobClient::CommitBlockList( const std::vector& blockIds, const CommitBlockListOptions& options, const Azure::Core::Context& context) const { @@ -492,7 +492,7 @@ Azure::Response TDBlockBlobClient::CommitBlockLis return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context); } - +#if 0 Azure::Response TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options, const Azure::Core::Context& context) const { _detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions; @@ -502,6 +502,7 @@ Azure::Response TDBlockBlobClient::GetBlockList(cons return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, _internal::WithReplicaStatus(context)); } +#endif /* Azure::Response TDBlockBlobClient::Query(const std::string& querySqlExpression, const QueryBlobOptions& options, diff --git a/source/libs/azure/test/azExceptionTest.cpp b/source/libs/azure/test/azExceptionTest.cpp new file mode 100644 index 0000000000..a83bb4d8f2 --- /dev/null +++ b/source/libs/azure/test/azExceptionTest.cpp @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +// clang-format off +#include "td_block_blob_client.hpp" +#include "az.h" +// clang-format on + +using namespace Azure::Storage; +using namespace Azure::Storage::Blobs; + +extern int8_t tsS3Enabled; +extern char tsS3BucketName[TSDB_FQDN_LEN]; + +static int32_t azInitEnv() { + int32_t code = 0; + + extern int8_t tsS3EpNum; + + extern char tsS3Hostname[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; + extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; + + /* TCS parameter format + tsS3Hostname[0] = "/.blob.core.windows.net"; + tsS3AccessKeyId[0] = ""; + tsS3AccessKeySecret[0] = ""; + tsS3BucketName = ""; + */ + + const char *hostname = "/.blob.core.windows.net"; + const char *accessKeyId = ""; + const char *accessKeySecret = ""; + const char *bucketName = ""; + + if (hostname[0] != '<') { + tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN); + tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN); + tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN); + } else { + const char *accountId = getenv("ablob_account_id"); + if (!accountId) { + return -1; + } + + const char *accountSecret = getenv("ablob_account_secret"); + if (!accountSecret) { + return -1; + } + + const char *containerName = getenv("ablob_container"); + if (!containerName) { + return -1; + } + + TAOS_STRCPY(&tsS3Hostname[0][0], accountId); + TAOS_STRCAT(&tsS3Hostname[0][0], ".blob.core.windows.net"); + TAOS_STRCPY(&tsS3AccessKeyId[0][0], accountId); + TAOS_STRCPY(&tsS3AccessKeySecret[0][0], accountSecret); + TAOS_STRCPY(tsS3BucketName, containerName); + } + + tstrncpy(tsTempDir, "/tmp/", PATH_MAX); + + tsS3Enabled = true; + + return code; +} + +// TEST(AzTest, DISABLED_InterfaceTest) { +TEST(AzETest, InterfaceTest) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 4096; + char data[size] = {0}; + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + TDBlockBlobClient blobClient = + TDBlockBlobClient::CreateFromConnectionString(std::getenv("ablob_cs"), std::string(tsS3BucketName), object_name); + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + azEnd(); +} diff --git a/source/libs/azure/test/azTest.cpp b/source/libs/azure/test/azTest.cpp index 0459cb5f6a..b57b7ca884 100644 --- a/source/libs/azure/test/azTest.cpp +++ b/source/libs/azure/test/azTest.cpp @@ -199,3 +199,132 @@ TEST(AzTest, InterfaceTest) { azEnd(); } + +// TEST(AzTest, DISABLED_InterfaceTestBig) { +TEST(AzTest, InterfaceTestBig) { + int code = 0; + bool check = false; + bool withcp = false; + + code = azInitEnv(); + if (code) { + std::cout << "ablob env init failed with: " << code << std::endl; + return; + } + + GTEST_ASSERT_EQ(code, 0); + GTEST_ASSERT_EQ(tsS3Enabled, 1); + + code = azBegin(); + GTEST_ASSERT_EQ(code, 0); + + code = azCheckCfg(); + GTEST_ASSERT_EQ(code, 0); + const int size = 256 * 1024 * 1024 + 1; + char *data = (char *)taosMemoryCalloc(1, size); + if (!data) { + std::cout << "code: " << code << "terrno: " << terrno << std::endl; + + return; + } + + for (int i = 0; i < size / 2; ++i) { + data[i * 2 + 1] = 1; + } + + const char object_name[] = "azut.bin"; + char path[PATH_MAX] = {0}; + char path_download[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name); + } + + tstrncpy(path_download, path, strlen(path) + 1); + tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1); + + TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH); + GTEST_ASSERT_NE(fp, nullptr); + + int n = taosWriteFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + code = azPutObjectFromFileOffset(path, object_name, 0, size); + GTEST_ASSERT_EQ(code, 0); + + uint8_t *pBlock = NULL; + code = azGetObjectBlock(object_name, 0, size, check, &pBlock); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(pBlock[i * 2], 0); + GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1); + } + + taosMemoryFree(pBlock); + + code = azGetObjectToFile(object_name, path_download); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + azDeleteObjectsByPrefix(object_name); + // list object to check + + code = azPutObjectFromFile2(path, object_name, withcp); + GTEST_ASSERT_EQ(code, 0); + + code = azGetObjectsByPrefix(object_name, tsTempDir); + GTEST_ASSERT_EQ(code, 0); + + { + TdFilePtr fp = taosOpenFile(path, TD_FILE_READ); + GTEST_ASSERT_NE(fp, nullptr); + + (void)memset(data, 0, size); + + int64_t n = taosReadFile(fp, data, size); + GTEST_ASSERT_EQ(n, size); + + code = taosCloseFile(&fp); + GTEST_ASSERT_EQ(code, 0); + + for (int i = 0; i < size / 2; ++i) { + GTEST_ASSERT_EQ(data[i * 2], 0); + GTEST_ASSERT_EQ(data[i * 2 + 1], 1); + } + } + + const char *object_name_arr[] = {object_name}; + code = azDeleteObjects(object_name_arr, 1); + GTEST_ASSERT_EQ(code, 0); + + taosMemoryFree(data); + + azEnd(); +} diff --git a/source/libs/executor/src/anomalywindowoperator.c b/source/libs/executor/src/anomalywindowoperator.c index dd1a52022e..eb72edb964 100644 --- a/source/libs/executor/src/anomalywindowoperator.c +++ b/source/libs/executor/src/anomalywindowoperator.c @@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock* int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + size_t keyBufSize = 0; + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + const char* id = GET_TASKID(pTaskInfo); - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode; @@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p } if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) { - qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt); + qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt); code = TSDB_CODE_ANA_ALGO_NOT_FOUND; goto _error; } if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) { - qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName); + qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName); code = TSDB_CODE_ANA_ALGO_NOT_LOAD; goto _error; } @@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p SExprInfo* pScalarExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); } - size_t keyBufSize = 0; - int32_t num = 0; - SExprInfo* pExprInfo = NULL; code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num); QUERY_CHECK_CODE(code, lino, _error); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState, + &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc); @@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p pInfo->anomalyCol = extractColumnFromColumnNode(pColNode); pInfo->anomalyKey.type = pInfo->anomalyCol.type; pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes; + pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes); - if (pInfo->anomalyKey.pData == NULL) { - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno) int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes; pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize); - if (pInfo->anomalySup.pResultRow == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno) + pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*)); - if (pInfo->anomalySup.blocks == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno) + pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow)); - if (pInfo->anomalySup.windows == NULL) { - code = terrno; - goto _error; - } + QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno) code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); @@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p *pOptrInfo = pOperator; - qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl, + qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl, pInfo->anomalyOpt); return TSDB_CODE_SUCCESS; _error: + qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt, + tstrerror(code)); + if (pInfo != NULL) { anomalyDestroyOperatorInfo(pInfo); } destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code); + return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 13ae220116..3f08db0e98 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1441,7 +1441,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet SMetaReader mr1 = {0}; pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn); - + int64_t suid = pMReader->me.ctbEntry.suid; code = pMetaReaderFn->getTableEntryByUid(&mr1, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1752,7 +1752,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); - + uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { @@ -2284,7 +2284,7 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) { // db_name pColInfoData = taosArrayGet(p->pDataBlock, index++); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); - code = colDataSetVal(pColInfoData, numOfRows, db, false); + code = colDataSetVal(pColInfoData, numOfRows, dbname, false); QUERY_CHECK_CODE(code, lino, _end); // vgroup_id diff --git a/source/libs/index/inc/indexFstRegex.h b/source/libs/index/inc/indexFstRegex.h index 2814b5dc16..a6954afab6 100644 --- a/source/libs/index/inc/indexFstRegex.h +++ b/source/libs/index/inc/indexFstRegex.h @@ -28,7 +28,7 @@ extern "C" { #endif -typedef enum { MATCH, JUMP, SPLIT, RANGE } InstType; +typedef enum { INS_MATCH, INS_JUMP, INS_SPLIT, INS_RANGE } InstType; typedef struct MatchValue { #ifdef WINDOWS diff --git a/source/libs/index/src/indexFstDfa.c b/source/libs/index/src/indexFstDfa.c index 3b0014f16a..fa7dbb5f1f 100644 --- a/source/libs/index/src/indexFstDfa.c +++ b/source/libs/index/src/indexFstDfa.c @@ -159,14 +159,14 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r if (false == sparSetGet(set, i, &ip)) continue; Inst *inst = taosArrayGet(builder->dfa->insts, ip); - if (inst->ty == JUMP || inst->ty == SPLIT) { + if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) { continue; - } else if (inst->ty == RANGE) { + } else if (inst->ty == INS_RANGE) { if (taosArrayPush(tinsts, &ip) == NULL) { code = terrno; goto _exception; } - } else if (inst->ty == MATCH) { + } else if (inst->ty == INS_MATCH) { isMatch = true; if (taosArrayPush(tinsts, &ip) == NULL) { code = terrno; @@ -234,11 +234,11 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) { } bool succ = sparSetAdd(set, ip, NULL); Inst *inst = taosArrayGet(dfa->insts, ip); - if (inst->ty == MATCH || inst->ty == RANGE) { + if (inst->ty == INS_MATCH || inst->ty == INS_RANGE) { // do nothing - } else if (inst->ty == JUMP) { + } else if (inst->ty == INS_JUMP) { dfaAdd(dfa, set, inst->jv.step); - } else if (inst->ty == SPLIT) { + } else if (inst->ty == INS_SPLIT) { dfaAdd(dfa, set, inst->sv.len1); dfaAdd(dfa, set, inst->sv.len2); } @@ -253,11 +253,11 @@ bool dfaRun(FstDfa *dfa, FstSparseSet *from, FstSparseSet *to, uint8_t byte) { if (false == sparSetGet(from, i, &ip)) continue; Inst *inst = taosArrayGet(dfa->insts, ip); - if (inst->ty == JUMP || inst->ty == SPLIT) { + if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) { continue; - } else if (inst->ty == MATCH) { + } else if (inst->ty == INS_MATCH) { isMatch = true; - } else if (inst->ty == RANGE) { + } else if (inst->ty == INS_RANGE) { if (inst->rv.start <= byte && byte <= inst->rv.end) { dfaAdd(dfa, to, ip + 1); } diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 299b62b6fb..5b22b51172 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -17,6 +17,7 @@ #include "tglobal.h" #include "tskiplist.h" #include "tutil.h" +#include "indexFstDfa.h" class UtilEnv : public ::testing::Test { protected: @@ -41,6 +42,29 @@ class UtilEnv : public ::testing::Test { SArray *rslt; }; +class UtilComm : public ::testing::Test { + protected: + virtual void SetUp() { + // src = (SArray *)taosArrayInit(2, sizeof(void *)); + // for (int i = 0; i < 3; i++) { + // SArray *m = taosArrayInit(10, sizeof(uint64_t)); + // taosArrayPush(src, &m); + // } + + // rslt = (SArray *)taosArrayInit(10, sizeof(uint64_t)); + } + virtual void TearDown() { + // for (int i = 0; i < taosArrayGetSize(src); i++) { + // SArray *m = (SArray *)taosArrayGetP(src, i); + // taosArrayDestroy(m); + // } + // taosArrayDestroy(src); + } + // SArray *src; + // SArray *rslt; + +}; + static void clearSourceArray(SArray *p) { for (int i = 0; i < taosArrayGetSize(p); i++) { SArray *m = (SArray *)taosArrayGetP(p, i); @@ -369,3 +393,35 @@ TEST_F(UtilEnv, testDictComm) { EXPECT_EQ(COMMON_INPUTS[v], i); } } + +TEST_F(UtilComm, testCompress) { + for (int32_t i = 0; i < 6; i++) { + _cache_range_compare cmpFunc = idxGetCompare((RangeType)(i)); + //char[32]a = 0, b = 1; + char a[32] = {0}; + char b[32] = {1}; + for (int32_t j = 0; j < TSDB_DATA_TYPE_MAX; j++) { + cmpFunc(a, b, j); + } + } +} +TEST_F(UtilComm, testfstDfa) { + { + FstDfaBuilder *builder = dfaBuilderCreate(NULL); + ASSERT_TRUE(builder != NULL); + dfaBuilderDestroy(builder); + } + { + SArray *pInst = taosArrayInit(32, sizeof(uint8_t)); + for (int32_t i = 0; i < 26; i++) { + uint8_t v = 'a' + i; + taosArrayPush(pInst, &v); + } + FstDfaBuilder *builder = dfaBuilderCreate(pInst); + FstDfa *dfa = dfaBuilderBuild(builder); + dfaBuilderDestroy(builder); + } +} + + + diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 342bd6d66e..9f411c4296 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10086,7 +10086,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt const char* validConfigs[] = { "encrypt_key", - tsAlterCompactTaskKeywords, }; if (0 == strncasecmp(cfgReq.config, validConfigs[0], strlen(validConfigs[0]) + 1)) { int32_t klen = strlen(cfgReq.value); @@ -10097,28 +10096,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN); } code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); - } else if (0 == strncasecmp(cfgReq.config, validConfigs[1], strlen(validConfigs[1]) + 1)) { - char* endptr = NULL; - int32_t maxCompactTasks = taosStr2Int32(cfgReq.value, &endptr, 10); - int32_t minMaxCompactTasks = MIN_MAX_COMPACT_TASKS; - int32_t maxMaxCompactTasks = MAX_MAX_COMPACT_TASKS; - - // check format - if (endptr == cfgReq.value || endptr[0] != '\0') { - tFreeSMCfgDnodeReq(&cfgReq); - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS, - "Invalid max compact tasks: %s", cfgReq.value); - } - - // check range - if (maxCompactTasks < minMaxCompactTasks || maxCompactTasks > maxMaxCompactTasks) { - tFreeSMCfgDnodeReq(&cfgReq); - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS, - "Invalid max compact tasks: %d, valid range [%d,%d]", maxCompactTasks, - minMaxCompactTasks, maxMaxCompactTasks); - } - - code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); } else { code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7af64c041d..f922a5e03e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); int32_t uploadCheckpointToS3(const char* id, const char* path); +int32_t deleteCheckpointFile(const char* id, const char* name); +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ebde9fe50e..d3eba382c9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -19,7 +19,6 @@ #include "tcs.h" static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); #ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); @@ -230,8 +229,8 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream return code; } -static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, - int32_t transId) { +int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId) { int32_t code = 0; int32_t vgId = pTask->pMeta->vgId; int32_t taskLevel = pTask->info.taskLevel; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 89f0ea9e1f..7259c0e49a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) { taosMemoryFreeClear(pState); } -int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { - return deleteExpiredCheckPoint(pState->pFileState, mark); -} - void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { @@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); } -void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); } - int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 05edad0f5f..aaff58d1b4 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe } } -int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) { - int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen); - int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); - if (pFileState->searchBuff != NULL) { - deleteHashSortRowBuff(pFileState, pKey); - } - if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) { - return TSDB_CODE_SUCCESS; - } - return TSDB_CODE_FAILED; -} - static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } -int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { - return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list); -} - int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; int64_t maxCheckPointId = 0; @@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { return pFileState->pGroupIdMap; } -void setFillInfo(SStreamFileState* pFileState) { - pFileState->hasFillCatch = false; -} - void clearExpiredState(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -1261,6 +1241,7 @@ _end: } } +#ifdef BUILD_NO_CALL int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { int32_t code = TSDB_CODE_SUCCESS; @@ -1328,6 +1309,7 @@ _end: } return code; } +#endif int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal, int32_t* pVLen, int32_t* pWinCode) { diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index c8297d56b7..c993743dc3 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) extern int8_t tsS3EpNum; tsS3EpNum = 1; - code = uploadCheckpointToS3("123", "/tmp/backend5/stream"); - EXPECT_EQ(code, TSDB_CODE_SUCCESS); + code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream"); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); code = downloadCheckpointByNameS3("123", "/root/download", ""); EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); + + code = deleteCheckpointFile("aaa123", "bbb"); + EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE); } + +TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, POINTER_BYTES); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + initTaskLock(pTask); + + const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.checkpointId = 123; + code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + pTask->chkInfo.pActiveInfo->failedId = 223; + code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + SStreamDataBlock block; + block.srcTaskId = 456; + SStreamTask upTask; + upTask = *pTask; + upTask.id.taskId = 456; + streamTaskSetUpstreamInfo(pTask, &upTask); + pTask->chkInfo.pActiveInfo->failedId = 23; + code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); + + streamTaskSetUpstreamInfo(pTask, &upTask); + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 223; + + STaskCheckpointReadyInfo readyInfo; + readyInfo.upstreamTaskId = 4567; + block.srcTaskId = 4567; + void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1); + + initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1); + taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo); + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 1111; + code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0); + ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT); +} \ No newline at end of file diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 19a3f211b1..c806bdfbd8 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -2023,16 +2023,29 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { memcpy(pKey, cd.pKey, (size_t)cd.kLen); if (ppVal) { - // TODO: vLen may be zero - pVal = tdbRealloc(*ppVal, cd.vLen); - if (pVal == NULL) { - tdbFree(pKey); - return terrno; + if (cd.vLen > 0) { + pVal = tdbRealloc(*ppVal, cd.vLen); + if (pVal == NULL) { + tdbFree(pKey); + return terrno; + } + + memcpy(pVal, cd.pVal, (size_t)cd.vLen); + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbTrace("tdb/btree-next decoder: %p pVal free: %p", &cd, cd.pVal); + tdbFree(cd.pVal); + } + } else { + pVal = NULL; } *ppVal = pVal; *vLen = cd.vLen; - memcpy(pVal, cd.pVal, (size_t)cd.vLen); + } else { + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal); + tdbFree(cd.pVal); + } } ret = tdbBtcMoveToPrev(pBtc); diff --git a/source/libs/tdb/src/db/tdbUtil.c b/source/libs/tdb/src/db/tdbUtil.c index 2249123ef5..14845ed8ee 100644 --- a/source/libs/tdb/src/db/tdbUtil.c +++ b/source/libs/tdb/src/db/tdbUtil.c @@ -66,3 +66,10 @@ int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) { *size = szBytes / szPage; return 0; } + +void tdbCloseDir(TdDirPtr *ppDir) { + int32_t ret = taosCloseDir(ppDir); + if (ret) { + tdbError("failed to close directory, reason:%s", tstrerror(ret)); + } +} diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 993c3ffab8..cd2d4ce57a 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -71,12 +71,7 @@ typedef TdFilePtr tdb_fd_t; #define tdbGetDirEntryName taosGetDirEntryName #define tdbDirEntryBaseName taosDirEntryBaseName -static FORCE_INLINE void tdbCloseDir(TdDirPtr *ppDir) { - int32_t ret = taosCloseDir(ppDir); - if (ret) { - tdbError("failed to close directory, reason:%s", tstrerror(ret)); - } -} +void tdbCloseDir(TdDirPtr *ppDir); #define tdbOsRemove remove #define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL) diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 3c2b67da01..5dd9ce568f 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -16,6 +16,10 @@ #ifndef _TD_TFS_INT_H_ #define _TD_TFS_INT_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "os.h" #include "taosdef.h" @@ -74,6 +78,7 @@ typedef struct STfs { SHashObj *hash; // name to did map } STfs; +int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg); int32_t tfsNewDisk(int32_t level, int32_t id, int8_t disable, const char *dir, STfsDisk **ppDisk); STfsDisk *tfsFreeDisk(STfsDisk *pDisk); int32_t tfsUpdateDiskSize(STfsDisk *pDisk); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index ecc55517b3..5021a6ae39 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -19,7 +19,6 @@ static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg); static int32_t tfsCheck(STfs *pTfs); -static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg); static int32_t tfsFormatDir(char *idir, char *odir); static int32_t tfsGetDiskByName(STfs *pTfs, const char *dir, STfsDisk **ppDisk); static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir); @@ -245,13 +244,13 @@ void tfsDirname(const STfsFile *pFile, char *dest) { tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN); tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN); } - +#if 0 void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) { STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); (void)snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); } - +#endif int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); } int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) { @@ -340,7 +339,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { TAOS_RETURN(0); } - +#if 0 bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) { STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); char aname[TMPNAME_LEN]; @@ -348,7 +347,7 @@ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) { (void)snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); return taosDirExist(aname); } - +#endif int32_t tfsRmdir(STfs *pTfs, const char *rname) { if (rname[0] == 0) { TAOS_RETURN(0); @@ -515,7 +514,7 @@ _exit: TAOS_RETURN(code); } -static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { +int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { int32_t code = 0; char dirName[TSDB_FILENAME_LEN] = "\0"; @@ -577,32 +576,32 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) { } static int32_t tfsFormatDir(char *idir, char *odir) { + int32_t code = 0, lino = 0; wordexp_t wep = {0}; + int32_t dirLen = 0; + char tmp[PATH_MAX] = {0}; - int32_t code = wordexp(idir, &wep, 0); + code = wordexp(idir, &wep, 0); if (code != 0) { - TAOS_RETURN(TAOS_SYSTEM_ERROR(code)); + TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(code)); } - char tmp[PATH_MAX] = {0}; - if (taosRealPath(wep.we_wordv[0], tmp, PATH_MAX) != 0) { - code = TAOS_SYSTEM_ERROR(errno); - wordfree(&wep); - TAOS_RETURN(code); - } + TAOS_CHECK_EXIT(taosRealPath(wep.we_wordv[0], tmp, PATH_MAX)); - int32_t dirLen = strlen(tmp); + dirLen = strlen(tmp); if (dirLen < 0 || dirLen >= TSDB_FILENAME_LEN) { - wordfree(&wep); - code = TSDB_CODE_OUT_OF_RANGE; - fError("failed to mount %s to FS since %s, real path:%s, len:%d", idir, tstrerror(code), tmp, dirLen); - TAOS_RETURN(code); + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_RANGE); } tstrncpy(odir, tmp, TSDB_FILENAME_LEN); +_exit: wordfree(&wep); - TAOS_RETURN(0); + if (code != 0) { + fError("failed to mount %s to FS at line %d since %s, real path:%s, len:%d", idir, lino, tstrerror(code), tmp, + dirLen); + } + TAOS_RETURN(code); } static int32_t tfsCheck(STfs *pTfs) { diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 2cfcdc6d0a..acc8168538 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -41,13 +41,13 @@ void tfsDestroyTier(STfsTier *pTier) { int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk) { int32_t code = 0; int32_t lino = 0; + int32_t id = 0; STfsDisk *pDisk = NULL; if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) { TAOS_CHECK_GOTO(TSDB_CODE_FS_TOO_MANY_MOUNT, &lino, _exit); } - int32_t id = 0; if (pTier->level == 0) { if (pTier->disks[0] != NULL) { id = pTier->ndisk; diff --git a/source/libs/tfs/test/CMakeLists.txt b/source/libs/tfs/test/CMakeLists.txt index 2fd0836a1d..050811f0f5 100644 --- a/source/libs/tfs/test/CMakeLists.txt +++ b/source/libs/tfs/test/CMakeLists.txt @@ -7,8 +7,13 @@ target_link_libraries( PUBLIC tfs PUBLIC gtest_main ) +target_include_directories( + tfs_test + PUBLIC "${TD_SOURCE_DIR}/include/libs/tfs" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) -# add_test( -# NAME tfs_test -# COMMAND tfs_test -# ) +add_test( + NAME tfs_test + COMMAND tfs_test +) diff --git a/source/libs/tfs/test/tfsTest.cpp b/source/libs/tfs/test/tfsTest.cpp index bb89fbe69f..1570cf173f 100644 --- a/source/libs/tfs/test/tfsTest.cpp +++ b/source/libs/tfs/test/tfsTest.cpp @@ -13,6 +13,7 @@ #include "os.h" #include "tfs.h" +#include "tfsInt.h" class TfsTest : public ::testing::Test { protected: @@ -280,6 +281,9 @@ TEST_F(TfsTest, 04_File) { const STfsFile *pf2 = tfsReaddir(pDir); EXPECT_EQ(pf2, nullptr); + pDir->pDir = taosOpenDir(fulldir); + EXPECT_NE(pDir->pDir, nullptr); + tfsClosedir(pDir); } @@ -744,3 +748,116 @@ TEST_F(TfsTest, 05_MultiDisk) { tfsClose(pTfs); } + +TEST_F(TfsTest, 06_Misc) { + // tfsDisk.c + STfsDisk *pDisk = NULL; + EXPECT_EQ(tfsNewDisk(0, 0, 0, NULL, &pDisk), TSDB_CODE_INVALID_PARA); + EXPECT_NE(tfsNewDisk(0, 0, 0, "", &pDisk), 0); + + STfsDisk disk = {0}; + EXPECT_EQ(tfsUpdateDiskSize(&disk), TSDB_CODE_INVALID_PARA); + + // tfsTier.c + STfsTier tfsTier = {0}; + EXPECT_EQ(taosThreadSpinInit(&tfsTier.lock, 0), 0); + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + + tfsTier.ndisk = 3; + tfsTier.nAvailDisks = 1; + + tfsTier.disks[1] = &disk; + disk.disable = 1; + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + disk.disable = 0; + disk.size.avail = 0; + EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK); + + tfsTier.ndisk = TFS_MAX_DISKS_PER_TIER; + SDiskCfg diskCfg = {0}; + tstrncpy(diskCfg.dir, "testDataDir", TSDB_FILENAME_LEN); + EXPECT_EQ(tfsMountDiskToTier(&tfsTier, &diskCfg, 0), TSDB_CODE_FS_TOO_MANY_MOUNT); + EXPECT_EQ(taosThreadSpinDestroy(&tfsTier.lock), 0); + + // tfs.c + STfs *pTfs = NULL; + EXPECT_EQ(tfsOpen(0, -1, &pTfs), TSDB_CODE_INVALID_PARA); + EXPECT_EQ(tfsOpen(0, 0, &pTfs), TSDB_CODE_INVALID_PARA); + EXPECT_EQ(tfsOpen(0, TFS_MAX_DISKS + 1, &pTfs), TSDB_CODE_INVALID_PARA); + taosMemoryFreeClear(pTfs); + + STfs tfs = {0}; + STfsTier *pTier = &tfs.tiers[0]; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, -1), false); + tfs.nlevel = 2; + pTier->ndisk = 3; + pTier->nAvailDisks = 1; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false); + pTier->disks[0] = &disk; + EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false); + + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, -1, 0), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, tfs.nlevel + 1, 0), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, -1), false); + EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, pTier->ndisk), false); + + EXPECT_EQ(tfsGetDisksAtLevel(&tfs, -1), 0); + EXPECT_EQ(tfsGetDisksAtLevel(&tfs, tfs.nlevel), 0); + + EXPECT_EQ(tfsGetLevel(&tfs), tfs.nlevel); + + for (int32_t l = 0; l < tfs.nlevel; ++l) { + EXPECT_EQ(taosThreadSpinInit(&tfs.tiers[l].lock, 0), 0); + } + + SDiskID diskID = {0}; + disk.size.avail = TFS_MIN_DISK_FREE_SIZE; + EXPECT_EQ(tfsAllocDisk(&tfs, tfs.nlevel, &diskID), 0); + tfs.nlevel = 0; + diskID.level = 0; + EXPECT_EQ(tfsAllocDisk(&tfs, 0, &diskID), 0); + tfs.nlevel = 2; + + diskID.id = 10; + EXPECT_EQ(tfsMkdirAt(&tfs, NULL, diskID), TSDB_CODE_FS_INVLD_CFG); + + EXPECT_NE(tfsMkdirRecurAt(&tfs, NULL, diskID), 0); + + const char *rname = ""; + EXPECT_EQ(tfsRmdir(&tfs, rname), 0); + + EXPECT_EQ(tfsSearch(&tfs, -1, NULL), -1); + EXPECT_EQ(tfsSearch(&tfs, tfs.nlevel, NULL), -1); + + diskCfg.level = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = TFS_MAX_TIERS; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = 0; + diskCfg.primary = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.primary = 2; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.primary = 1; + diskCfg.disable = -1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.disable = 2; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.disable = 0; + diskCfg.level = 1; + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + diskCfg.level = 0; + diskCfg.primary = 0; + tstrncpy(diskCfg.dir, "testDataDir1", TSDB_FILENAME_LEN); + EXPECT_NE(tfsCheckAndFormatCfg(&tfs, &diskCfg), 0); + + TdFilePtr pFile = taosCreateFile("testDataDir1", TD_FILE_CREATE); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG); + EXPECT_EQ(taosCloseFile(&pFile), 0); + EXPECT_EQ(taosRemoveFile("testDataDir1"), 0); + + for (int32_t l = 0; l < tfs.nlevel; ++l) { + EXPECT_EQ(taosThreadSpinDestroy(&tfs.tiers[l].lock), 0); + } +} diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 468d9d9b50..3b84fc4574 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -151,7 +151,6 @@ typedef struct SCliThrd { TdThreadMutex msgMtx; SDelayQueue* delayQueue; SDelayQueue* timeoutQueue; - SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout STrans* pInst; // @@ -159,8 +158,6 @@ typedef struct SCliThrd { SHashObj* fqdn2ipCache; SCvtAddr* pCvtAddr; - SHashObj* failFastCache; - SHashObj* batchCache; SHashObj* connHeapCache; SCliReq* stopMsg; @@ -224,8 +221,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -SCliBatch* cliGetHeadFromList(SCliBatchList* pList); - static void destroyCliConnQTable(SCliConn* conn); static void cliHandleException(SCliConn* conn); @@ -1299,8 +1294,8 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { -// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, -// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); + // tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, + // conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -2124,144 +2119,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) { tTrace("cli process batch size:%d", count); } } -SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { - if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { - return NULL; - } - queue* hr = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(hr); - pList->sending += 1; - pList->len -= 1; - - SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); - return batch; -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq); - -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port); - -static void destroyBatchList(SCliBatchList* pList); -static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { - int32_t code = 0; - STrans* pInst = pThrd->pInst; - SReqCtx* pCtx = pReq->ctx; - - char* ip = EPSET_GET_INUSE_IP(pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - size_t klen = strlen(key); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); - if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = NULL; - code = createBatchList(&pBatchList, key, ip, port); - if (code != 0) { - destroyReq(pReq); - return; - } - - pBatchList->batchLenLimit = pInst->shareConnLimit; - - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, pBatchList, pReq); - if (code != 0) { - destroyBatchList(pBatchList); - destroyReq(pReq); - return; - } - - code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); - if (code != 0) { - destroyBatchList(pBatchList); - } - } else { - if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - cliDestroyBatch(pBatch); - } - } else { - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->shareConnLimit += pReq->msg.contLen; - pBatch->wLen += 1; - } else { - SCliBatch* tBatch = NULL; - code = createBatch(&tBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - } - } - } - } - return; -} -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - if (pBatchList == NULL) { - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - QUEUE_INIT(&pBatchList->wq); - pBatchList->port = port; - pBatchList->connMax = 1; - pBatchList->connCnt = 0; - pBatchList->batchLenLimit = 0; - pBatchList->len += 1; - - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - *ppBatchList = pBatchList; - return 0; -} -static void destroyBatchList(SCliBatchList* pList) { - if (pList == NULL) { - return; - } - while (!QUEUE_IS_EMPTY(&pList->wq)) { - queue* h = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(h); - - SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); - cliDestroyBatch(pBatch); - } - taosMemoryFree(pList->ip); - taosMemoryFree(pList->dst); - taosMemoryFree(pList); -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, &pReq->q); - pBatch->wLen += 1; - pBatch->shareConnLimit = pReq->msg.contLen; - pBatch->pList = pList; - - QUEUE_PUSH(&pList->wq, &pBatch->listq); - pList->len += 1; - - *ppBatch = pBatch; - return 0; -} static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); } static void cliAsyncCb(uv_async_t* handle) { @@ -2494,10 +2352,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(code, NULL, _end); } - if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) { - TAOS_CHECK_GOTO(code, NULL, _end); - } - pThrd->destroyAhandleFp = pInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -2505,11 +2359,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(terrno, NULL, _end); } - pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - if (pThrd->batchCache == NULL) { - TAOS_CHECK_GOTO(terrno, NULL, _end); - } - pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->connHeapCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); @@ -2553,10 +2402,7 @@ _end: transDQDestroy(pThrd->delayQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL); - transDQDestroy(pThrd->waitConnQueue, NULL); taosHashCleanup(pThrd->fqdn2ipCache); - taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->batchCache); taosHashCleanup(pThrd->pIdConnTable); taosArrayDestroy(pThrd->pQIdBuf); @@ -2580,7 +2426,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde); transDQDestroy(pThrd->timeoutQueue, NULL); - transDQDestroy(pThrd->waitConnQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -2592,24 +2437,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); - void** pIter = taosHashIterate(pThrd->batchCache, NULL); - while (pIter != NULL) { - SCliBatchList* pBatchList = (SCliBatchList*)(*pIter); - while (!QUEUE_IS_EMPTY(&pBatchList->wq)) { - queue* h = QUEUE_HEAD(&pBatchList->wq); - QUEUE_REMOVE(h); - - SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); - cliDestroyBatch(pBatch); - } - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - - pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); - } - taosHashCleanup(pThrd->batchCache); - void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL); while (pIter2 != NULL) { SHeap* heap = (SHeap*)(pIter2); diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp index 6dfb5e503a..fabe9e9c4f 100644 --- a/source/libs/transport/test/transUT2.cpp +++ b/source/libs/transport/test/transUT2.cpp @@ -615,6 +615,21 @@ TEST_F(TransEnv, http) { #endif } +#if 1 + STelemAddrMgmt mgt; + taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com"); + int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + + taosMsleep(2000); + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + for (int32_t i = 0; i < 1; i++) { + code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT); + printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr); + taosMsleep(2000); + } + taosTelemetryDestroy(&mgt); +#endif { STelemAddrMgmt mgt; taosTelemetryMgtInit(&mgt, "error"); diff --git a/source/util/inc/tlogInt.h b/source/util/inc/tlogInt.h new file mode 100644 index 0000000000..1d7f3a063d --- /dev/null +++ b/source/util/inc/tlogInt.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_LOG_INT_H_ +#define _TD_UTIL_LOG_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tlog.h" + +void taosOpenNewSlowLogFile(); +void taosLogObjSetToday(int64_t ts); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_LOG_INT_H_*/ diff --git a/source/util/src/tanalytics.c b/source/util/src/tanalytics.c index e68edd4b76..bf2cb4fd07 100644 --- a/source/util/src/tanalytics.c +++ b/source/util/src/tanalytics.c @@ -20,7 +20,7 @@ #ifdef USE_ANALYTICS #include -#define ANAL_ALGO_SPLIT "," +#define ANALYTICS_ALOG_SPLIT_CHAR "," typedef struct { int64_t ver; @@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue, return false; } - pEnd = strstr(pStart, ANAL_ALGO_SPLIT); + pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR); if (optMaxLen > 0) { if (pEnd > pStart) { int32_t len = (int32_t)(pEnd - pStart); @@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName); char *pos1 = strstr(option, buf); - char *pos2 = strstr(option, ANAL_ALGO_SPLIT); + char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR); if (pos1 != NULL) { *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10); return true; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a9eef1bfc9..4df5b322a2 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -483,7 +483,7 @@ static int32_t taosOpenNewLogFile() { return 0; } -static void taosOpenNewSlowLogFile() { +void taosOpenNewSlowLogFile() { (void)taosThreadMutexLock(&tsLogObj.logMutex); int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday; if (delta >= 0 && delta < 86400) { @@ -539,6 +539,8 @@ void taosResetLog() { } } +void taosLogObjSetToday(int64_t ts) { tsLogObj.timestampToday = ts; } + static bool taosCheckFileIsOpen(char *logFileName) { TdFilePtr pFile = taosOpenFile(logFileName, TD_FILE_WRITE); if (pFile == NULL) { @@ -619,6 +621,7 @@ static void processLogFileName(const char *logName, int32_t maxFileNum) { } static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) { + int32_t code = 0, lino = 0; #ifdef WINDOWS_STASH /* * always set maxFileNum to 1 @@ -653,39 +656,28 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) { // only an estimate for number of lines int64_t filesize = 0; - if (taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL) != 0) { - (void)printf("\nfailed to fstat log file:%s, reason:%s\n", name, strerror(errno)); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } + TAOS_CHECK_EXIT(taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL)); + tsLogObj.lines = (int32_t)(filesize / 60); if (taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END) < 0) { - TAOS_UNUSED(printf("failed to seek to the end of log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; + TAOS_CHECK_EXIT(terrno); } - (void)snprintf(name, sizeof(name), "==================================================\n"); + (void)snprintf(name, sizeof(name), + "==================================================\n" + " new log file\n" + "==================================================\n"); if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } - (void)snprintf(name, sizeof(name), " new log file \n"); - if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; - } - (void)snprintf(name, sizeof(name), "==================================================\n"); - if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) { - TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno))); - taosUnLockLogFile(tsLogObj.logHandle->pFile); - return terrno; + TAOS_CHECK_EXIT(terrno); } - return 0; +_exit: + if (code != 0) { + taosUnLockLogFile(tsLogObj.logHandle->pFile); + TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code))); + } + return code; } static void taosUpdateLogNums(ELogLevel level) { diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index cde1392216..ec05a4e415 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -137,6 +137,10 @@ add_test( NAME logTest COMMAND logTest ) +target_include_directories( + logTest + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) IF(COMPILER_SUPPORT_AVX2) MESSAGE(STATUS "AVX2 instructions is ACTIVATED") diff --git a/source/util/test/log.cpp b/source/util/test/log.cpp index ba32d2d639..1899aac2c4 100644 --- a/source/util/test/log.cpp +++ b/source/util/test/log.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include using namespace std; @@ -44,3 +46,96 @@ TEST(log, check_log_refactor) { } taosCloseLog(); } + +extern char *tsLogOutput; +TEST(log, misc) { + // taosInitLog + const char *path = TD_TMP_DIR_PATH "td"; + taosRemoveDir(path); + taosMkDir(path); + tstrncpy(tsLogDir, path, PATH_MAX); + EXPECT_EQ(taosInitLog("taoslog", 1, true), 0); + + taosOpenNewSlowLogFile(); + taosLogObjSetToday(INT64_MIN); + taosPrintSlowLog("slow log test"); + + // test taosInitLogOutput + const char *pLogName = NULL; + tsLogOutput = (char *)taosMemCalloc(1, TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "stdout", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "stderr", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "/dev/null", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tsLogOutput[0] = '#'; + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "/", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "\\", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "testLogOutput", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, "testLogOutputDir/testLogOutput", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), 0); + tstrncpy(tsLogOutput, ".", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tstrncpy(tsLogOutput, "/..", TSDB_FILENAME_LEN); + EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG); + tsLogOutput[0] = 0; + + // test taosAssertDebug + tsAssert = false; + taosAssertDebug(true, __FILE__, __LINE__, 0, "test_assert_true_without_core"); + taosAssertDebug(false, __FILE__, __LINE__, 0, "test_assert_false_with_core"); + tsAssert = true; + + // test taosLogCrashInfo, taosReadCrashInfo and taosReleaseCrashLogFile + char nodeType[16] = "nodeType"; + char *pCrashMsg = (char *)taosMemoryCalloc(1, 16); + EXPECT_NE(pCrashMsg, nullptr); + tstrncpy(pCrashMsg, "crashMsg", 16); + +#if !defined(_TD_DARWIN_64) && !defined(WINDOWS) + pid_t pid = taosGetPId(); + EXPECT_EQ(pid > 0, true); + siginfo_t sigInfo = {0}; + sigInfo.si_pid = pid; + taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, &sigInfo); +#else + taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, nullptr); +#endif + + char crashInfo[PATH_MAX] = {0}; + snprintf(crashInfo, sizeof(crashInfo), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType); + + char *pReadMsg = NULL; + int64_t readMsgLen = 0; + TdFilePtr pFile = NULL; + taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile); + EXPECT_NE(pReadMsg, nullptr); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(strncasecmp(pReadMsg, "crashMsg", strlen("crashMsg")), 0); + EXPECT_EQ(taosCloseFile(&pFile), 0); + taosMemoryFreeClear(pReadMsg); + + pFile = taosOpenFile(crashInfo, TD_FILE_WRITE); + EXPECT_NE(pFile, nullptr); + EXPECT_EQ(taosWriteFile(pFile, "00000", 1), 1); + EXPECT_EQ(taosCloseFile(&pFile), 0); + + taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile); + EXPECT_EQ(pReadMsg, nullptr); + EXPECT_EQ(pFile, nullptr); + + pFile = taosOpenFile(crashInfo, TD_FILE_WRITE); + EXPECT_NE(pFile, nullptr); + taosReleaseCrashLogFile(pFile, true); + + // clean up + taosRemoveDir(path); + + taosCloseLog(); +} diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh index 8923804547..da2083b013 100755 --- a/tests/script/sh/stop_dnodes.sh +++ b/tests/script/sh/stop_dnodes.sh @@ -13,7 +13,7 @@ if [ -n "$PID" ]; then systemctl stop taosd fi -PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 taosd @@ -38,10 +38,10 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'` done -PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` +PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'` while [ -n "$PID" ]; do echo kill -9 $PID #pkill -9 tmq_sim @@ -52,5 +52,5 @@ while [ -n "$PID" ]; do else lsof -nti:6030 | xargs kill -9 fi - PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'` + PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'` done \ No newline at end of file diff --git a/tests/script/tsim/analytics/basic0.sim b/tests/script/tsim/analytics/basic0.sim index 77c9184e8f..3ac49b1fc3 100644 --- a/tests/script/tsim/analytics/basic0.sim +++ b/tests/script/tsim/analytics/basic0.sim @@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start sql connect -print =============== create anode +print =============== failed to create anode on '127.0.0.1:1101' +sql_error create anode '127.0.0.1:1101' + +sql show anodes +if $rows != 0 then + return -1 +endi + +sql_error drop anode 1 + +print ================ create anode sql create anode '192.168.1.116:6050' sql show anodes @@ -30,7 +40,7 @@ print $data00 $data01 $data02 sql use d0 print =============== create super table, include column type for count/sum/min/max/first -sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned) sql show stables if $rows != 1 then @@ -42,10 +52,11 @@ sql create table ct1 using stb tags(1000) print ==================== insert data # input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] -sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14) -sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22) -sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9) -sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40) +sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a') +sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a') +sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a') +sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a') +sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a') sql select count(*) from ct1 if $data00 != 17 then @@ -58,6 +69,87 @@ if $data00 != 1 then return -1 endi +print ================= try every loaded anomaly detection algorithm +sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=lof'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd'); +sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs'); + +print ================= try every column type of column +sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2'); +sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2'); + +print =================== invalid column type +sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2'); +sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 + + +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1 +sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1 +sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1 +sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1 + +print =================== valid column type +sql select forecast(c1, 'conf=50 ,algo = arima') from ct1 +sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1 +if $rows != 10 then + return -1 +endi + +if $data03 != 28 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + print expect 23-11-15 06:13:20.000 , actual $data00 + return -1 +endi + +if $data10 != @23-11-15 06:13:20.002@ then + print expect 23-11-15 06:13:20.002 , actual $data10 + return -1 +endi + +if $data20 != @23-11-15 06:13:20.004@ then + return -1 +endi + +print test the every option and rows option + +sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1 +if $rows != 5 then + return -1 +endi + +if $data00 != @23-11-15 06:13:20.000@ then + return -1 +endi + +if $data10 != @23-11-15 06:13:20.100@ then + return -1 +endi sql drop anode 1 sql show anodes @@ -66,6 +158,13 @@ if $rows != 0 then return -1 endi +sleep 1000 + +print ===================== query without anodes +sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1 +sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr'); + + _OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT print =============== check