Merge pull request #29379 from taosdata/merge/3.0tomain

merge: from 3.0 to main branch
This commit is contained in:
Shengliang Guan 2024-12-28 16:19:04 +08:00 committed by GitHub
commit 1fb58e6b36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
46 changed files with 1580 additions and 453 deletions

View File

@ -190,7 +190,8 @@ The effective value of charset is UTF-8.
|Parameter Name |Supported Version |Dynamic Modification|Description|
|-----------------------|-------------------------|--------------------|------------|
|supportVnodes | |Supported, effective immediately |Maximum number of vnodes supported by a dnode, range 0-4096, default value is twice the number of CPU cores + 5|
|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 0-1024, default value 4|
|numOfCommitThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-1024, default value 4|
|numOfCompactThreads | |Supported, effective after restart|Maximum number of commit threads, range 1-16, default value 2|
|numOfMnodeReadThreads | |Supported, effective after restart|Number of Read threads for mnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)|
|numOfVnodeQueryThreads | |Supported, effective after restart|Number of Query threads for vnode, range 0-1024, default value is twice the number of CPU cores (not exceeding 16)|
|numOfVnodeFetchThreads | |Supported, effective after restart|Number of Fetch threads for vnode, range 0-1024, default value is one quarter of the CPU cores (not exceeding 4)|

View File

@ -185,7 +185,8 @@ charset 的有效值是 UTF-8。
|参数名称|支持版本|动态修改|参数含义|
|--------------------------|----------|-------------------------|-|
|supportVnodes | |支持动态修改 立即生效 |dnode 支持的最大 vnode 数目,取值范围 0-4096默认值 CPU 核数的 2 倍 + 5|
|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 0-1024默认值为 4|
|numOfCommitThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-1024默认值为 4|
|numOfCompactThreads | |支持动态修改 重启生效 |落盘线程的最大数量,取值范围 1-16默认值为 2|
|numOfMnodeReadThreads | |支持动态修改 重启生效 |mnode 的 Read 线程数目,取值范围 0-1024默认值为 CPU 核数的四分之一(不超过 4|
|numOfVnodeQueryThreads | |支持动态修改 重启生效 |vnode 的 Query 线程数目,取值范围 0-1024默认值为 CPU 核数的两倍(不超过 16|
|numOfVnodeFetchThreads | |支持动态修改 重启生效 |vnode 的 Fetch 线程数目,取值范围 0-1024默认值为 CPU 核数的四分之一(不超过 4|

View File

@ -112,9 +112,8 @@ extern int32_t tsNumOfSnodeWriteThreads;
extern int64_t tsQueueMemoryAllowed;
extern int32_t tsRetentionSpeedLimitMB;
extern const char *tsAlterCompactTaskKeywords;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;
extern int32_t tsNumOfCompactThreads;
extern int32_t tsNumOfRetentionThreads;
// sync raft
extern int32_t tsElectInterval;

View File

@ -389,7 +389,6 @@ typedef struct SStateStore {
int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
void (*streamStateSetFillInfo)(SStreamState* pState);
void (*streamStateClearExpiredState)(SStreamState* pState);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
@ -455,7 +454,6 @@ typedef struct SStateStore {
int32_t (*streamStateBegin)(SStreamState* pState);
void (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
void (*streamStateCopyBackend)(SStreamState* src, SStreamState* dst);
} SStateStore;

View File

@ -34,7 +34,6 @@ void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);
void streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState, bool remove);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
int32_t streamStateDelTaskDb(SStreamState* pState);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
@ -108,7 +107,6 @@ int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, con
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// twa
void streamStateSetFillInfo(SStreamState* pState);
void streamStateClearExpiredState(SStreamState* pState);
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -67,7 +67,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);

View File

@ -148,7 +148,7 @@ int32_t tfsMkdirRecur(STfs *pTfs, const char *rname);
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
#if 0
/**
* @brief check directories exist in tfs.
*
@ -158,7 +158,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
* @return true for exist, false for not exist.
*/
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId);
#endif
/**
* @brief Remove directory at all levels in tfs.
*
@ -241,7 +241,7 @@ void tfsBasename(const STfsFile *pFile, char *dest);
* @param dest The buffer where dirname will be saved.
*/
void tfsDirname(const STfsFile *pFile, char *dest);
#if 0
/**
* @brief Get the absolute file name of rname.
*
@ -251,7 +251,7 @@ void tfsDirname(const STfsFile *pFile, char *dest);
* @param aname absolute file name
*/
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname);
#endif
/**
* @brief Remove file in tfs.
*

View File

@ -66,9 +66,9 @@ int32_t s3Begin() {
void s3End() { S3_deinitialize(); }
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
#if 0
static int32_t s3ListBucket(char const *bucketname);
#endif
static void s3DumpCfgByEp(int8_t epIndex) {
// clang-format off
(void)fprintf(stdout,
@ -291,7 +291,7 @@ static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) {
TAOS_RETURN(code);
}
#if 0
static int32_t s3ListBucket(char const *bucketname) {
int32_t code = 0;
@ -312,7 +312,7 @@ static int32_t s3ListBucket(char const *bucketname) {
TAOS_RETURN(code);
}
#endif
typedef struct growbuffer {
// The total number of bytes, and the start byte
int size;

View File

@ -102,9 +102,8 @@ int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
const char *tsAlterCompactTaskKeywords = "max_compact_tasks";
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;
int32_t tsNumOfCompactThreads = 2;
int32_t tsNumOfRetentionThreads = 1;
// sync raft
int32_t tsElectInterval = 25 * 1000;
@ -744,8 +743,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX,
CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(
cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -794,9 +794,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
tsNumOfCompactThreads = tsNumOfCommitThreads;
tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4);
tsNumOfSupportVnodes = tsNumOfCores * 2 + 5;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
@ -841,7 +838,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY, CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "maxCompactConcurrency", tsNumOfCompactThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCompactThreads", tsNumOfCompactThreads, 1, 16, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0);
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_LOCAL) != 0);
@ -1038,10 +1035,8 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(pCfg, "maxCompactConcurrency");
pItem = cfgGetItem(pCfg, "numOfCompactThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCompactThreads = numOfCores / 2;
tsNumOfCompactThreads = TRANGE(tsNumOfCompactThreads, 2, 4);
pItem->i32 = tsNumOfCompactThreads;
pItem->stype = stype;
}
@ -1546,7 +1541,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCommitThreads");
tsNumOfCommitThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "maxCompactConcurrency");
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "numOfCompactThreads");
tsNumOfCompactThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "retentionSpeedLimitMB");
@ -2349,6 +2344,8 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
TAOS_RETURN(code);
}
extern void tsdbAlterNumCompactThreads();
static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = -1;
@ -2399,6 +2396,17 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
goto _exit;
}
if (strcasecmp(name, "numOfCompactThreads") == 0) {
#ifdef TD_ENTERPRISE
tsNumOfCompactThreads = pItem->i32;
code = TSDB_CODE_SUCCESS;
// tsdbAlterNumCompactThreads();
#else
code = TSDB_CODE_INVALID_CFG;
#endif
goto _exit;
}
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function
static OptionNameAndVar debugOptions[] = {
{"dDebugFlag", &dDebugFlag}, {"vDebugFlag", &vDebugFlag},

View File

@ -40,6 +40,46 @@ add_test(
COMMAND dataformatTest
)
# cosCpTest.cpp
add_executable(cosCpTest "")
target_sources(
cosCpTest
PRIVATE
"cosCpTest.cpp"
)
target_link_libraries(cosCpTest gtest gtest_main util common)
target_include_directories(
cosCpTest
PUBLIC "${TD_SOURCE_DIR}/include/common"
PUBLIC "${TD_SOURCE_DIR}/include/util"
)
add_test(
NAME cosCpTest
COMMAND cosCpTest
)
if(TD_LINUX)
# cosTest.cpp
add_executable(cosTest "")
target_sources(
cosTest
PRIVATE
"cosTest.cpp"
)
target_link_libraries(cosTest gtest gtest_main util common)
target_include_directories(
cosTest
PUBLIC "${TD_SOURCE_DIR}/include/common"
PUBLIC "${TD_SOURCE_DIR}/include/util"
)
add_test(
NAME cosTest
COMMAND cosTest
)
endif()
if (${TD_LINUX})
# tmsg test
add_executable(tmsgTest "")
@ -60,4 +100,4 @@ if (${TD_LINUX})
add_custom_command(TARGET tmsgTest POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different ${MSG_TBL_FILE} $<TARGET_FILE_DIR:tmsgTest>
)
endif ()
endif ()

View File

@ -0,0 +1,305 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cos_cp.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, cpOpenCloseRemove) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpBuild) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size);
EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpLoad) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
if (taosCheckExistFile(file_cp_path)) {
EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS);
EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true);
EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD);
EXPECT_EQ(cp.md5, std::string(""));
EXPECT_EQ(cp.thefile, nullptr);
EXPECT_EQ(std::string(cp.file_path), "./afile");
EXPECT_EQ(cp.file_size, 1024);
EXPECT_EQ(cp.file_last_modified, 20241220141705);
EXPECT_EQ(cp.file_md5, std::string(""));
EXPECT_EQ(cp.object_name, std::string(""));
EXPECT_EQ(cp.object_size, 0);
EXPECT_EQ(cp.object_last_modified, std::string(""));
EXPECT_EQ(cp.object_etag, std::string(""));
EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx"));
EXPECT_EQ(cp.part_num, 1);
EXPECT_EQ(cp.part_size, 8388608);
EXPECT_EQ(cp.parts[0].index, 0);
EXPECT_EQ(cp.parts[0].offset, 0);
EXPECT_EQ(cp.parts[0].size, 1024);
EXPECT_EQ(cp.parts[0].completed, 0);
EXPECT_EQ(cp.parts[0].etag, std::string(""));
EXPECT_EQ(cp.parts[0].crc64, 0);
}
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpBuildUpdate) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
int seq = 1;
char *etags[1] = {"etags-1-xxx"};
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
EXPECT_EQ(cos_cp_open(file_cp_path, &cp), TSDB_CODE_SUCCESS);
cos_cp_build_upload(&cp, file, contentLength, lmtime, upload_id, chunk_size);
cos_cp_update(&cp, cp.parts[seq - 1].index, etags[seq - 1], 0);
EXPECT_EQ(cos_cp_dump(&cp), TSDB_CODE_SUCCESS);
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cpLoadUpdate) {
int32_t code = 0, lino = 0;
int64_t contentLength = 1024;
const int64_t MULTIPART_CHUNK_SIZE = 64 << 20; // multipart is 768M
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
SCheckpoint cp;
char const *file = "./afile";
char file_cp_path[TSDB_FILENAME_LEN];
int64_t lmtime = 20241220141705;
char const *upload_id = "upload-id-xxx";
(void)snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
(void)memset(&cp, 0, sizeof(cp));
cp.parts = (SCheckpointPart *)taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (!cp.parts) {
TAOS_CHECK_EXIT(terrno);
}
if (taosCheckExistFile(file_cp_path)) {
EXPECT_EQ(cos_cp_load(file_cp_path, &cp), TSDB_CODE_SUCCESS);
EXPECT_EQ(cos_cp_is_valid_upload(&cp, contentLength, lmtime), true);
EXPECT_EQ(cp.cp_type, COS_CP_TYPE_UPLOAD);
EXPECT_EQ(cp.md5, std::string(""));
EXPECT_EQ(cp.thefile, nullptr);
EXPECT_EQ(std::string(cp.file_path), "./afile");
EXPECT_EQ(cp.file_size, 1024);
EXPECT_EQ(cp.file_last_modified, 20241220141705);
EXPECT_EQ(cp.file_md5, std::string(""));
EXPECT_EQ(cp.object_name, std::string(""));
EXPECT_EQ(cp.object_size, 0);
EXPECT_EQ(cp.object_last_modified, std::string(""));
EXPECT_EQ(cp.object_etag, std::string(""));
EXPECT_EQ(cp.upload_id, std::string("upload-id-xxx"));
EXPECT_EQ(cp.part_num, 1);
EXPECT_EQ(cp.part_size, 8388608);
EXPECT_EQ(cp.parts[0].index, 0);
EXPECT_EQ(cp.parts[0].offset, 0);
EXPECT_EQ(cp.parts[0].size, 1024);
EXPECT_EQ(cp.parts[0].completed, 1);
EXPECT_EQ(cp.parts[0].etag, std::string("etags-1-xxx"));
EXPECT_EQ(cp.parts[0].crc64, 0);
}
if (cp.thefile) {
EXPECT_EQ(cos_cp_close(cp.thefile), TSDB_CODE_SUCCESS);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
EXPECT_EQ(cos_cp_remove(file_cp_path), TSDB_CODE_SUCCESS);
return;
_exit:
std::cout << "code: " << code << std::endl;
}

View File

@ -0,0 +1,185 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cos.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
int32_t cosInitEnv() {
int32_t code = 0;
bool isBlob = false;
extern int8_t tsS3Ablob;
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
extern char tsS3BucketName[TSDB_FQDN_LEN];
tsS3Ablob = isBlob;
/*
const char *hostname = "endpoint/<account-name>.blob.core.windows.net";
const char *accessKeyId = "<access-key-id/account-name>";
const char *accessKeySecret = "<access-key-secret/account-key>";
const char *bucketName = "<bucket/container-name>";
*/
// const char *hostname = "http://192.168.1.52:9000";
// const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel";
// const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX";
// const char *bucketName = "test-bucket";
const char *hostname = "192.168.1.52:9000";
const char *accessKeyId = "zOgllR6bSnw2Ah3mCNel";
const char *accessKeySecret = "cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX";
const char *bucketName = "ci-bucket19";
tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN);
// setup s3 env
extern int8_t tsS3EpNum;
extern int8_t tsS3Https[TSDB_MAX_EP_NUM];
tsS3EpNum = 1;
tsS3Https[0] = false;
tstrncpy(tsTempDir, "/tmp/", PATH_MAX);
tsS3Enabled = true;
return code;
}
TEST(testCase, cosCpPutError) {
int32_t code = 0, lino = 0;
char const *objectName = "testObject";
EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS);
EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS);
#if defined(USE_S3)
EXPECT_EQ(s3Size(objectName), -1);
#else
EXPECT_EQ(s3Size(objectName), 0);
#endif
s3EvictCache("", 0);
s3End();
return;
_exit:
std::cout << "code: " << code << std::endl;
}
TEST(testCase, cosCpPut) {
int32_t code = 0, lino = 0;
int8_t with_cp = 0;
char *data = nullptr;
const long objectSize = 65 * 1024 * 1024;
char const *objectName = "cosut.bin";
const char object_name[] = "cosut.bin";
EXPECT_EQ(std::string(object_name), objectName);
EXPECT_EQ(cosInitEnv(), TSDB_CODE_SUCCESS);
EXPECT_EQ(s3Begin(), TSDB_CODE_SUCCESS);
{
data = (char *)taosMemoryCalloc(1, objectSize);
if (!data) {
TAOS_CHECK_EXIT(terrno);
}
for (int i = 0; i < objectSize / 2; ++i) {
data[i * 2 + 1] = 1;
}
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, objectSize);
GTEST_ASSERT_EQ(n, objectSize);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = s3PutObjectFromFile2(path, objectName, with_cp);
GTEST_ASSERT_EQ(code, 0);
with_cp = 1;
code = s3PutObjectFromFile2(path, objectName, with_cp);
GTEST_ASSERT_EQ(code, 0);
#if defined(USE_S3)
EXPECT_EQ(s3Size(objectName), objectSize);
#else
EXPECT_EQ(s3Size(objectName), 0);
#endif
s3End();
s3EvictCache("", 0);
taosMemoryFree(data);
EXPECT_EQ(taosRemoveFile(path), TSDB_CODE_SUCCESS);
}
return;
_exit:
if (data) {
taosMemoryFree(data);
s3End();
}
std::cout << "code: " << code << std::endl;
}

View File

@ -475,27 +475,6 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
extern void tsdbAlterNumCompactThreads();
static int32_t dmAlterMaxCompactTask(const char *value) {
int32_t max_compact_tasks;
char *endptr = NULL;
max_compact_tasks = taosStr2Int32(value, &endptr, 10);
if (endptr == value || endptr[0] != '\0') {
return TSDB_CODE_INVALID_MSG;
}
if (max_compact_tasks != tsNumOfCompactThreads) {
dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
tsNumOfCompactThreads = max_compact_tasks;
#ifdef TD_ENTERPRISE
(void)tsdbAlterNumCompactThreads();
#endif
}
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SDCfgDnodeReq cfgReq = {0};
@ -509,10 +488,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return taosUpdateTfsItemDisable(pCfg, cfgReq.value, pMgmt->pTfs);
}
if (strncmp(cfgReq.config, tsAlterCompactTaskKeywords, strlen(tsAlterCompactTaskKeywords) + 1) == 0) {
return dmAlterMaxCompactTask(cfgReq.value);
}
dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
code = cfgGetAndSetItem(pCfg, &pItem, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_SERVER_CMD, true);

View File

@ -68,7 +68,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -117,7 +116,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -191,7 +191,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateSetFillInfo = streamStateSetFillInfo;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
@ -243,7 +242,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
pStore->streamStateCopyBackend = streamStateCopyBackend;
}

View File

@ -38,6 +38,8 @@ TDBlockBlobClient TDBlockBlobClient::CreateFromConnectionString(const std::strin
return newClient;
}
TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {}
#if 0
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, std::shared_ptr<StorageSharedKeyCredential> credential,
const BlobClientOptions& options)
: BlobClient(blobUrl, std::move(credential), options) {}
@ -50,8 +52,6 @@ TDBlockBlobClient::TDBlockBlobClient(const std::string&
TDBlockBlobClient::TDBlockBlobClient(const std::string& blobUrl, const BlobClientOptions& options)
: BlobClient(blobUrl, options) {}
TDBlockBlobClient::TDBlockBlobClient(BlobClient blobClient) : BlobClient(std::move(blobClient)) {}
TDBlockBlobClient TDBlockBlobClient::WithSnapshot(const std::string& snapshot) const {
TDBlockBlobClient newClient(*this);
if (snapshot.empty()) {
@ -74,47 +74,6 @@ TDBlockBlobClient TDBlockBlobClient::WithVersionId(const std::string& versionId)
return newClient;
}
Azure::Response<Models::UploadBlockBlobResult> TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content,
const UploadBlockBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const uint8_t* buffer, size_t bufferSize, const UploadBlockBlobFromOptions& options,
const Azure::Core::Context& context) const {
@ -270,6 +229,47 @@ Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
#endif
Azure::Response<Models::UploadBlockBlobResult> TDBlockBlobClient::Upload(Azure::Core::IO::BodyStream& content,
const UploadBlockBlobOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::UploadBlockBlobOptions protocolLayerOptions;
if (options.TransactionalContentHash.HasValue()) {
if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Md5) {
protocolLayerOptions.TransactionalContentMD5 = options.TransactionalContentHash.Value().Value;
} else if (options.TransactionalContentHash.Value().Algorithm == HashAlgorithm::Crc64) {
protocolLayerOptions.TransactionalContentCrc64 = options.TransactionalContentHash.Value().Value;
}
}
protocolLayerOptions.BlobContentType = options.HttpHeaders.ContentType;
protocolLayerOptions.BlobContentEncoding = options.HttpHeaders.ContentEncoding;
protocolLayerOptions.BlobContentLanguage = options.HttpHeaders.ContentLanguage;
protocolLayerOptions.BlobContentMD5 = options.HttpHeaders.ContentHash.Value;
protocolLayerOptions.BlobContentDisposition = options.HttpHeaders.ContentDisposition;
protocolLayerOptions.BlobCacheControl = options.HttpHeaders.CacheControl;
protocolLayerOptions.Metadata = std::map<std::string, std::string>(options.Metadata.begin(), options.Metadata.end());
protocolLayerOptions.BlobTagsString = _detail::TagsToString(options.Tags);
protocolLayerOptions.Tier = options.AccessTier;
protocolLayerOptions.LeaseId = options.AccessConditions.LeaseId;
protocolLayerOptions.IfModifiedSince = options.AccessConditions.IfModifiedSince;
protocolLayerOptions.IfUnmodifiedSince = options.AccessConditions.IfUnmodifiedSince;
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
protocolLayerOptions.IfTags = options.AccessConditions.TagConditions;
if (m_customerProvidedKey.HasValue()) {
protocolLayerOptions.EncryptionKey = m_customerProvidedKey.Value().Key;
protocolLayerOptions.EncryptionKeySha256 = m_customerProvidedKey.Value().KeyHash;
protocolLayerOptions.EncryptionAlgorithm = m_customerProvidedKey.Value().Algorithm.ToString();
}
protocolLayerOptions.EncryptionScope = m_encryptionScope;
if (options.ImmutabilityPolicy.HasValue()) {
protocolLayerOptions.ImmutabilityPolicyExpiry = options.ImmutabilityPolicy.Value().ExpiresOn;
protocolLayerOptions.ImmutabilityPolicyMode = options.ImmutabilityPolicy.Value().PolicyMode;
}
protocolLayerOptions.LegalHold = options.HasLegalHold;
return _detail::BlockBlobClient::Upload(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom(
const std::string& fileName, int64_t offset, int64_t size, const UploadBlockBlobFromOptions& options,
@ -349,7 +349,7 @@ Azure::Response<Models::UploadBlockBlobFromResult> TDBlockBlobClient::UploadFrom
return Azure::Response<Models::UploadBlockBlobFromResult>(std::move(result),
std::move(commitBlockListResponse.RawResponse));
}
#if 0
Azure::Response<Models::UploadBlockBlobFromUriResult> TDBlockBlobClient::UploadFromUri(
const std::string& sourceUri, const UploadBlockBlobFromUriOptions& options,
const Azure::Core::Context& context) const {
@ -396,7 +396,7 @@ Azure::Response<Models::UploadBlockBlobFromUriResult> TDBlockBlobClient::UploadF
return _detail::BlockBlobClient::UploadFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#endif
Azure::Response<Models::StageBlockResult> TDBlockBlobClient::StageBlock(const std::string& blockId,
Azure::Core::IO::BodyStream& content,
const StageBlockOptions& options,
@ -419,7 +419,7 @@ Azure::Response<Models::StageBlockResult> TDBlockBlobClient::StageBlock(const st
protocolLayerOptions.EncryptionScope = m_encryptionScope;
return _detail::BlockBlobClient::StageBlock(*m_pipeline, m_blobUrl, content, protocolLayerOptions, context);
}
#if 0
Azure::Response<Models::StageBlockFromUriResult> TDBlockBlobClient::StageBlockFromUri(
const std::string& blockId, const std::string& sourceUri, const StageBlockFromUriOptions& options,
const Azure::Core::Context& context) const {
@ -457,7 +457,7 @@ Azure::Response<Models::StageBlockFromUriResult> TDBlockBlobClient::StageBlockFr
return _detail::BlockBlobClient::StageBlockFromUri(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#endif
Azure::Response<Models::CommitBlockListResult> TDBlockBlobClient::CommitBlockList(
const std::vector<std::string>& blockIds, const CommitBlockListOptions& options,
const Azure::Core::Context& context) const {
@ -492,7 +492,7 @@ Azure::Response<Models::CommitBlockListResult> TDBlockBlobClient::CommitBlockLis
return _detail::BlockBlobClient::CommitBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions, context);
}
#if 0
Azure::Response<Models::GetBlockListResult> TDBlockBlobClient::GetBlockList(const GetBlockListOptions& options,
const Azure::Core::Context& context) const {
_detail::BlockBlobClient::GetBlockBlobBlockListOptions protocolLayerOptions;
@ -502,6 +502,7 @@ Azure::Response<Models::GetBlockListResult> TDBlockBlobClient::GetBlockList(cons
return _detail::BlockBlobClient::GetBlockList(*m_pipeline, m_blobUrl, protocolLayerOptions,
_internal::WithReplicaStatus(context));
}
#endif
/*
Azure::Response<Models::QueryBlobResult> TDBlockBlobClient::Query(const std::string& querySqlExpression,
const QueryBlobOptions& options,

View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
// clang-format off
#include "td_block_blob_client.hpp"
#include "az.h"
// clang-format on
using namespace Azure::Storage;
using namespace Azure::Storage::Blobs;
extern int8_t tsS3Enabled;
extern char tsS3BucketName[TSDB_FQDN_LEN];
static int32_t azInitEnv() {
int32_t code = 0;
extern int8_t tsS3EpNum;
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
/* TCS parameter format
tsS3Hostname[0] = "<endpoint>/<account-name>.blob.core.windows.net";
tsS3AccessKeyId[0] = "<access-key-id/account-name>";
tsS3AccessKeySecret[0] = "<access-key-secret/account-key>";
tsS3BucketName = "<bucket/container-name>";
*/
const char *hostname = "<endpoint>/<account-name>.blob.core.windows.net";
const char *accessKeyId = "<access-key-id/account-name>";
const char *accessKeySecret = "<access-key-secret/account-key>";
const char *bucketName = "<bucket/container-name>";
if (hostname[0] != '<') {
tstrncpy(&tsS3Hostname[0][0], hostname, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeyId[0][0], accessKeyId, TSDB_FQDN_LEN);
tstrncpy(&tsS3AccessKeySecret[0][0], accessKeySecret, TSDB_FQDN_LEN);
tstrncpy(tsS3BucketName, bucketName, TSDB_FQDN_LEN);
} else {
const char *accountId = getenv("ablob_account_id");
if (!accountId) {
return -1;
}
const char *accountSecret = getenv("ablob_account_secret");
if (!accountSecret) {
return -1;
}
const char *containerName = getenv("ablob_container");
if (!containerName) {
return -1;
}
TAOS_STRCPY(&tsS3Hostname[0][0], accountId);
TAOS_STRCAT(&tsS3Hostname[0][0], ".blob.core.windows.net");
TAOS_STRCPY(&tsS3AccessKeyId[0][0], accountId);
TAOS_STRCPY(&tsS3AccessKeySecret[0][0], accountSecret);
TAOS_STRCPY(tsS3BucketName, containerName);
}
tstrncpy(tsTempDir, "/tmp/", PATH_MAX);
tsS3Enabled = true;
return code;
}
// TEST(AzTest, DISABLED_InterfaceTest) {
TEST(AzETest, InterfaceTest) {
int code = 0;
bool check = false;
bool withcp = false;
code = azInitEnv();
if (code) {
std::cout << "ablob env init failed with: " << code << std::endl;
return;
}
GTEST_ASSERT_EQ(code, 0);
GTEST_ASSERT_EQ(tsS3Enabled, 1);
code = azBegin();
GTEST_ASSERT_EQ(code, 0);
code = azCheckCfg();
GTEST_ASSERT_EQ(code, 0);
const int size = 4096;
char data[size] = {0};
for (int i = 0; i < size / 2; ++i) {
data[i * 2 + 1] = 1;
}
const char object_name[] = "azut.bin";
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = azPutObjectFromFileOffset(path, object_name, 0, size);
GTEST_ASSERT_EQ(code, 0);
uint8_t *pBlock = NULL;
code = azGetObjectBlock(object_name, 0, size, check, &pBlock);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(pBlock[i * 2], 0);
GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1);
}
taosMemoryFree(pBlock);
code = azGetObjectToFile(object_name, path_download);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
azDeleteObjectsByPrefix(object_name);
// list object to check
code = azPutObjectFromFile2(path, object_name, withcp);
GTEST_ASSERT_EQ(code, 0);
code = azGetObjectsByPrefix(object_name, tsTempDir);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
TDBlockBlobClient blobClient =
TDBlockBlobClient::CreateFromConnectionString(std::getenv("ablob_cs"), std::string(tsS3BucketName), object_name);
const char *object_name_arr[] = {object_name};
code = azDeleteObjects(object_name_arr, 1);
GTEST_ASSERT_EQ(code, 0);
azEnd();
}

View File

@ -199,3 +199,132 @@ TEST(AzTest, InterfaceTest) {
azEnd();
}
// TEST(AzTest, DISABLED_InterfaceTestBig) {
TEST(AzTest, InterfaceTestBig) {
int code = 0;
bool check = false;
bool withcp = false;
code = azInitEnv();
if (code) {
std::cout << "ablob env init failed with: " << code << std::endl;
return;
}
GTEST_ASSERT_EQ(code, 0);
GTEST_ASSERT_EQ(tsS3Enabled, 1);
code = azBegin();
GTEST_ASSERT_EQ(code, 0);
code = azCheckCfg();
GTEST_ASSERT_EQ(code, 0);
const int size = 256 * 1024 * 1024 + 1;
char *data = (char *)taosMemoryCalloc(1, size);
if (!data) {
std::cout << "code: " << code << "terrno: " << terrno << std::endl;
return;
}
for (int i = 0; i < size / 2; ++i) {
data[i * 2 + 1] = 1;
}
const char object_name[] = "azut.bin";
char path[PATH_MAX] = {0};
char path_download[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", object_name);
} else {
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", object_name);
}
tstrncpy(path_download, path, strlen(path) + 1);
tstrncpy(path_download + strlen(path), ".download", strlen(".download") + 1);
TdFilePtr fp = taosOpenFile(path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_WRITE_THROUGH);
GTEST_ASSERT_NE(fp, nullptr);
int n = taosWriteFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
code = azPutObjectFromFileOffset(path, object_name, 0, size);
GTEST_ASSERT_EQ(code, 0);
uint8_t *pBlock = NULL;
code = azGetObjectBlock(object_name, 0, size, check, &pBlock);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(pBlock[i * 2], 0);
GTEST_ASSERT_EQ(pBlock[i * 2 + 1], 1);
}
taosMemoryFree(pBlock);
code = azGetObjectToFile(object_name, path_download);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
azDeleteObjectsByPrefix(object_name);
// list object to check
code = azPutObjectFromFile2(path, object_name, withcp);
GTEST_ASSERT_EQ(code, 0);
code = azGetObjectsByPrefix(object_name, tsTempDir);
GTEST_ASSERT_EQ(code, 0);
{
TdFilePtr fp = taosOpenFile(path, TD_FILE_READ);
GTEST_ASSERT_NE(fp, nullptr);
(void)memset(data, 0, size);
int64_t n = taosReadFile(fp, data, size);
GTEST_ASSERT_EQ(n, size);
code = taosCloseFile(&fp);
GTEST_ASSERT_EQ(code, 0);
for (int i = 0; i < size / 2; ++i) {
GTEST_ASSERT_EQ(data[i * 2], 0);
GTEST_ASSERT_EQ(data[i * 2 + 1], 1);
}
}
const char *object_name_arr[] = {object_name};
code = azDeleteObjects(object_name_arr, 1);
GTEST_ASSERT_EQ(code, 0);
taosMemoryFree(data);
azEnd();
}

View File

@ -61,9 +61,13 @@ static int32_t anomalyCacheBlock(SAnomalyWindowOperatorInfo* pInfo, SSDataBlock*
int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
const char* id = GET_TASKID(pTaskInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAnomalyWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnomalyWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SAnomalyWindowPhysiNode* pAnomalyNode = (SAnomalyWindowPhysiNode*)physiNode;
@ -74,13 +78,13 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
}
if (!taosAnalGetOptStr(pAnomalyNode->anomalyOpt, "algo", pInfo->algoName, sizeof(pInfo->algoName))) {
qError("failed to get anomaly_window algorithm name from %s", pAnomalyNode->anomalyOpt);
qError("%s failed to get anomaly_window algorithm name from %s", id, pAnomalyNode->anomalyOpt);
code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
goto _error;
}
if (taosAnalGetAlgoUrl(pInfo->algoName, ANAL_ALGO_TYPE_ANOMALY_DETECT, pInfo->algoUrl, sizeof(pInfo->algoUrl)) != 0) {
qError("failed to get anomaly_window algorithm url from %s", pInfo->algoName);
qError("%s failed to get anomaly_window algorithm url from %s", id, pInfo->algoName);
code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
goto _error;
}
@ -94,20 +98,18 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
size_t keyBufSize = 0;
int32_t num = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pAnomalyNode->window.pFuncs, NULL, &pExprInfo, &num);
QUERY_CHECK_CODE(code, lino, _error);
initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, id, pTaskInfo->streamInfo.pState,
&pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pAnomalyNode->window.node.pOutputDataBlockDesc);
@ -124,27 +126,19 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
pInfo->anomalyCol = extractColumnFromColumnNode(pColNode);
pInfo->anomalyKey.type = pInfo->anomalyCol.type;
pInfo->anomalyKey.bytes = pInfo->anomalyCol.bytes;
pInfo->anomalyKey.pData = taosMemoryCalloc(1, pInfo->anomalyCol.bytes);
if (pInfo->anomalyKey.pData == NULL) {
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalyKey.pData, code, lino, _error, terrno)
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize + pInfo->anomalyKey.bytes;
pInfo->anomalySup.pResultRow = taosMemoryCalloc(1, itemSize);
if (pInfo->anomalySup.pResultRow == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.pResultRow, code, lino, _error, terrno)
pInfo->anomalySup.blocks = taosArrayInit(16, sizeof(SSDataBlock*));
if (pInfo->anomalySup.blocks == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.blocks, code, lino, _error, terrno)
pInfo->anomalySup.windows = taosArrayInit(16, sizeof(STimeWindow));
if (pInfo->anomalySup.windows == NULL) {
code = terrno;
goto _error;
}
QUERY_CHECK_NULL(pInfo->anomalySup.windows, code, lino, _error, terrno)
code = filterInitFromNode((SNode*)pAnomalyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
@ -162,18 +156,21 @@ int32_t createAnomalywindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* p
*pOptrInfo = pOperator;
qDebug("anomaly_window operator is created, algo:%s url:%s opt:%s", pInfo->algoName, pInfo->algoUrl,
qDebug("%s anomaly_window operator is created, algo:%s url:%s opt:%s", id, pInfo->algoName, pInfo->algoUrl,
pInfo->anomalyOpt);
return TSDB_CODE_SUCCESS;
_error:
qError("%s failed to create anomaly_window operator, line:%d algo:%s code:%s", id, lino, pAnomalyNode->anomalyOpt,
tstrerror(code));
if (pInfo != NULL) {
anomalyDestroyOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
qError("failed to create anomaly_window operator, algo:%s code:0x%x", pInfo->algoName, code);
return code;
}

View File

@ -1441,7 +1441,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet
SMetaReader mr1 = {0};
pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn);
int64_t suid = pMReader->me.ctbEntry.suid;
code = pMetaReaderFn->getTableEntryByUid(&mr1, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -1752,7 +1752,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -2284,7 +2284,7 @@ static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) {
// db_name
pColInfoData = taosArrayGet(p->pDataBlock, index++);
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
code = colDataSetVal(pColInfoData, numOfRows, db, false);
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
QUERY_CHECK_CODE(code, lino, _end);
// vgroup_id

View File

@ -28,7 +28,7 @@
extern "C" {
#endif
typedef enum { MATCH, JUMP, SPLIT, RANGE } InstType;
typedef enum { INS_MATCH, INS_JUMP, INS_SPLIT, INS_RANGE } InstType;
typedef struct MatchValue {
#ifdef WINDOWS

View File

@ -159,14 +159,14 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
if (false == sparSetGet(set, i, &ip)) continue;
Inst *inst = taosArrayGet(builder->dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
goto _exception;
}
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
if (taosArrayPush(tinsts, &ip) == NULL) {
code = terrno;
@ -234,11 +234,11 @@ void dfaAdd(FstDfa *dfa, FstSparseSet *set, uint32_t ip) {
}
bool succ = sparSetAdd(set, ip, NULL);
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == MATCH || inst->ty == RANGE) {
if (inst->ty == INS_MATCH || inst->ty == INS_RANGE) {
// do nothing
} else if (inst->ty == JUMP) {
} else if (inst->ty == INS_JUMP) {
dfaAdd(dfa, set, inst->jv.step);
} else if (inst->ty == SPLIT) {
} else if (inst->ty == INS_SPLIT) {
dfaAdd(dfa, set, inst->sv.len1);
dfaAdd(dfa, set, inst->sv.len2);
}
@ -253,11 +253,11 @@ bool dfaRun(FstDfa *dfa, FstSparseSet *from, FstSparseSet *to, uint8_t byte) {
if (false == sparSetGet(from, i, &ip)) continue;
Inst *inst = taosArrayGet(dfa->insts, ip);
if (inst->ty == JUMP || inst->ty == SPLIT) {
if (inst->ty == INS_JUMP || inst->ty == INS_SPLIT) {
continue;
} else if (inst->ty == MATCH) {
} else if (inst->ty == INS_MATCH) {
isMatch = true;
} else if (inst->ty == RANGE) {
} else if (inst->ty == INS_RANGE) {
if (inst->rv.start <= byte && byte <= inst->rv.end) {
dfaAdd(dfa, to, ip + 1);
}

View File

@ -17,6 +17,7 @@
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
#include "indexFstDfa.h"
class UtilEnv : public ::testing::Test {
protected:
@ -41,6 +42,29 @@ class UtilEnv : public ::testing::Test {
SArray *rslt;
};
class UtilComm : public ::testing::Test {
protected:
virtual void SetUp() {
// src = (SArray *)taosArrayInit(2, sizeof(void *));
// for (int i = 0; i < 3; i++) {
// SArray *m = taosArrayInit(10, sizeof(uint64_t));
// taosArrayPush(src, &m);
// }
// rslt = (SArray *)taosArrayInit(10, sizeof(uint64_t));
}
virtual void TearDown() {
// for (int i = 0; i < taosArrayGetSize(src); i++) {
// SArray *m = (SArray *)taosArrayGetP(src, i);
// taosArrayDestroy(m);
// }
// taosArrayDestroy(src);
}
// SArray *src;
// SArray *rslt;
};
static void clearSourceArray(SArray *p) {
for (int i = 0; i < taosArrayGetSize(p); i++) {
SArray *m = (SArray *)taosArrayGetP(p, i);
@ -369,3 +393,35 @@ TEST_F(UtilEnv, testDictComm) {
EXPECT_EQ(COMMON_INPUTS[v], i);
}
}
TEST_F(UtilComm, testCompress) {
for (int32_t i = 0; i < 6; i++) {
_cache_range_compare cmpFunc = idxGetCompare((RangeType)(i));
//char[32]a = 0, b = 1;
char a[32] = {0};
char b[32] = {1};
for (int32_t j = 0; j < TSDB_DATA_TYPE_MAX; j++) {
cmpFunc(a, b, j);
}
}
}
TEST_F(UtilComm, testfstDfa) {
{
FstDfaBuilder *builder = dfaBuilderCreate(NULL);
ASSERT_TRUE(builder != NULL);
dfaBuilderDestroy(builder);
}
{
SArray *pInst = taosArrayInit(32, sizeof(uint8_t));
for (int32_t i = 0; i < 26; i++) {
uint8_t v = 'a' + i;
taosArrayPush(pInst, &v);
}
FstDfaBuilder *builder = dfaBuilderCreate(pInst);
FstDfa *dfa = dfaBuilderBuild(builder);
dfaBuilderDestroy(builder);
}
}

View File

@ -10086,7 +10086,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
const char* validConfigs[] = {
"encrypt_key",
tsAlterCompactTaskKeywords,
};
if (0 == strncasecmp(cfgReq.config, validConfigs[0], strlen(validConfigs[0]) + 1)) {
int32_t klen = strlen(cfgReq.value);
@ -10097,28 +10096,6 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
ENCRYPT_KEY_LEN_MIN, ENCRYPT_KEY_LEN);
}
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ENCRYPT_KEY, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
} else if (0 == strncasecmp(cfgReq.config, validConfigs[1], strlen(validConfigs[1]) + 1)) {
char* endptr = NULL;
int32_t maxCompactTasks = taosStr2Int32(cfgReq.value, &endptr, 10);
int32_t minMaxCompactTasks = MIN_MAX_COMPACT_TASKS;
int32_t maxMaxCompactTasks = MAX_MAX_COMPACT_TASKS;
// check format
if (endptr == cfgReq.value || endptr[0] != '\0') {
tFreeSMCfgDnodeReq(&cfgReq);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
"Invalid max compact tasks: %s", cfgReq.value);
}
// check range
if (maxCompactTasks < minMaxCompactTasks || maxCompactTasks > maxMaxCompactTasks) {
tFreeSMCfgDnodeReq(&cfgReq);
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_DNODE_INVALID_COMPACT_TASKS,
"Invalid max compact tasks: %d, valid range [%d,%d]", maxCompactTasks,
minMaxCompactTasks, maxMaxCompactTasks);
}
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
} else {
code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
}

View File

@ -254,6 +254,9 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList);
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t uploadCheckpointToS3(const char* id, const char* path);
int32_t deleteCheckpointFile(const char* id, const char* name);
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId);
#ifdef __cplusplus
}

View File

@ -19,7 +19,6 @@
#include "tcs.h"
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
#ifdef BUILD_NO_CALL
static int32_t deleteCheckpoint(const char* id);
@ -230,8 +229,8 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
return code;
}
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;

View File

@ -546,10 +546,6 @@ void streamStateDestroy(SStreamState* pState, bool remove) {
taosMemoryFreeClear(pState);
}
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
return deleteExpiredCheckPoint(pState->pFileState, mark);
}
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
@ -617,8 +613,6 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
void streamStateClearExpiredState(SStreamState* pState) { clearExpiredState(pState->pFileState); }
void streamStateSetFillInfo(SStreamState* pState) { setFillInfo(pState->pFileState); }
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);

View File

@ -667,18 +667,6 @@ void deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLe
}
}
int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, keyLen);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
if (pFileState->searchBuff != NULL) {
deleteHashSortRowBuff(pFileState, pKey);
}
if (code_buff == TSDB_CODE_SUCCESS || code_file == TSDB_CODE_SUCCESS) {
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -868,10 +856,6 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
}
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
}
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t maxCheckPointId = 0;
@ -1227,10 +1211,6 @@ SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}
void setFillInfo(SStreamFileState* pFileState) {
pFileState->hasFillCatch = false;
}
void clearExpiredState(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1261,6 +1241,7 @@ _end:
}
}
#ifdef BUILD_NO_CALL
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1328,6 +1309,7 @@ _end:
}
return code;
}
#endif
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) {

View File

@ -390,9 +390,78 @@ TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest)
extern int8_t tsS3EpNum;
tsS3EpNum = 1;
code = uploadCheckpointToS3("123", "/tmp/backend5/stream");
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
code = downloadCheckpointByNameS3("123", "/root/download", "");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
code = deleteCheckpointFile("aaa123", "bbb");
EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
}
TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) {
SStreamTask* pTask = NULL;
int64_t uid = 2222222222222;
SArray* array = taosArrayInit(4, POINTER_BYTES);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array,
false, 1, &pTask);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
initTaskLock(pTask);
const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream";
code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
ASSERT(pState != NULL);
pTask->pBackend = pState->pTdbState->pOwner->pBackend;
code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.checkpointId = 123;
code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
pTask->chkInfo.pActiveInfo->failedId = 223;
code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
SStreamDataBlock block;
block.srcTaskId = 456;
SStreamTask upTask;
upTask = *pTask;
upTask.id.taskId = 456;
streamTaskSetUpstreamInfo(pTask, &upTask);
pTask->chkInfo.pActiveInfo->failedId = 23;
code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
streamTaskSetUpstreamInfo(pTask, &upTask);
streamTaskSetStatusReady(pTask);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 223;
STaskCheckpointReadyInfo readyInfo;
readyInfo.upstreamTaskId = 4567;
block.srcTaskId = 4567;
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_NE(code, TSDB_CODE_SUCCESS);
pTask->chkInfo.pActiveInfo->activeId = 1111;
code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
}

View File

@ -2023,16 +2023,29 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
memcpy(pKey, cd.pKey, (size_t)cd.kLen);
if (ppVal) {
// TODO: vLen may be zero
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return terrno;
if (cd.vLen > 0) {
pVal = tdbRealloc(*ppVal, cd.vLen);
if (pVal == NULL) {
tdbFree(pKey);
return terrno;
}
memcpy(pVal, cd.pVal, (size_t)cd.vLen);
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbTrace("tdb/btree-next decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
} else {
pVal = NULL;
}
*ppVal = pVal;
*vLen = cd.vLen;
memcpy(pVal, cd.pVal, (size_t)cd.vLen);
} else {
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbTrace("tdb/btree-next2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal);
}
}
ret = tdbBtcMoveToPrev(pBtc);

View File

@ -66,3 +66,10 @@ int tdbGetFileSize(tdb_fd_t fd, int szPage, SPgno *size) {
*size = szBytes / szPage;
return 0;
}
void tdbCloseDir(TdDirPtr *ppDir) {
int32_t ret = taosCloseDir(ppDir);
if (ret) {
tdbError("failed to close directory, reason:%s", tstrerror(ret));
}
}

View File

@ -71,12 +71,7 @@ typedef TdFilePtr tdb_fd_t;
#define tdbGetDirEntryName taosGetDirEntryName
#define tdbDirEntryBaseName taosDirEntryBaseName
static FORCE_INLINE void tdbCloseDir(TdDirPtr *ppDir) {
int32_t ret = taosCloseDir(ppDir);
if (ret) {
tdbError("failed to close directory, reason:%s", tstrerror(ret));
}
}
void tdbCloseDir(TdDirPtr *ppDir);
#define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL)

View File

@ -16,6 +16,10 @@
#ifndef _TD_TFS_INT_H_
#define _TD_TFS_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
@ -74,6 +78,7 @@ typedef struct STfs {
SHashObj *hash; // name to did map
} STfs;
int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg);
int32_t tfsNewDisk(int32_t level, int32_t id, int8_t disable, const char *dir, STfsDisk **ppDisk);
STfsDisk *tfsFreeDisk(STfsDisk *pDisk);
int32_t tfsUpdateDiskSize(STfsDisk *pDisk);

View File

@ -19,7 +19,6 @@
static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsCheck(STfs *pTfs);
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsFormatDir(char *idir, char *odir);
static int32_t tfsGetDiskByName(STfs *pTfs, const char *dir, STfsDisk **ppDisk);
static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir);
@ -245,13 +244,13 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN);
tstrncpy(dest, taosDirName(tname), TSDB_FILENAME_LEN);
}
#if 0
void tfsAbsoluteName(STfs *pTfs, SDiskID diskId, const char *rname, char *aname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
(void)snprintf(aname, TSDB_FILENAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
}
#endif
int32_t tfsRemoveFile(const STfsFile *pFile) { return taosRemoveFile(pFile->aname); }
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
@ -340,7 +339,7 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
TAOS_RETURN(0);
}
#if 0
bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
char aname[TMPNAME_LEN];
@ -348,7 +347,7 @@ bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) {
(void)snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
return taosDirExist(aname);
}
#endif
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
if (rname[0] == 0) {
TAOS_RETURN(0);
@ -515,7 +514,7 @@ _exit:
TAOS_RETURN(code);
}
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
int32_t code = 0;
char dirName[TSDB_FILENAME_LEN] = "\0";
@ -577,32 +576,32 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
}
static int32_t tfsFormatDir(char *idir, char *odir) {
int32_t code = 0, lino = 0;
wordexp_t wep = {0};
int32_t dirLen = 0;
char tmp[PATH_MAX] = {0};
int32_t code = wordexp(idir, &wep, 0);
code = wordexp(idir, &wep, 0);
if (code != 0) {
TAOS_RETURN(TAOS_SYSTEM_ERROR(code));
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(code));
}
char tmp[PATH_MAX] = {0};
if (taosRealPath(wep.we_wordv[0], tmp, PATH_MAX) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
wordfree(&wep);
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(taosRealPath(wep.we_wordv[0], tmp, PATH_MAX));
int32_t dirLen = strlen(tmp);
dirLen = strlen(tmp);
if (dirLen < 0 || dirLen >= TSDB_FILENAME_LEN) {
wordfree(&wep);
code = TSDB_CODE_OUT_OF_RANGE;
fError("failed to mount %s to FS since %s, real path:%s, len:%d", idir, tstrerror(code), tmp, dirLen);
TAOS_RETURN(code);
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_RANGE);
}
tstrncpy(odir, tmp, TSDB_FILENAME_LEN);
_exit:
wordfree(&wep);
TAOS_RETURN(0);
if (code != 0) {
fError("failed to mount %s to FS at line %d since %s, real path:%s, len:%d", idir, lino, tstrerror(code), tmp,
dirLen);
}
TAOS_RETURN(code);
}
static int32_t tfsCheck(STfs *pTfs) {

View File

@ -41,13 +41,13 @@ void tfsDestroyTier(STfsTier *pTier) {
int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk) {
int32_t code = 0;
int32_t lino = 0;
int32_t id = 0;
STfsDisk *pDisk = NULL;
if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) {
TAOS_CHECK_GOTO(TSDB_CODE_FS_TOO_MANY_MOUNT, &lino, _exit);
}
int32_t id = 0;
if (pTier->level == 0) {
if (pTier->disks[0] != NULL) {
id = pTier->ndisk;

View File

@ -7,8 +7,13 @@ target_link_libraries(
PUBLIC tfs
PUBLIC gtest_main
)
target_include_directories(
tfs_test
PUBLIC "${TD_SOURCE_DIR}/include/libs/tfs"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
# add_test(
# NAME tfs_test
# COMMAND tfs_test
# )
add_test(
NAME tfs_test
COMMAND tfs_test
)

View File

@ -13,6 +13,7 @@
#include "os.h"
#include "tfs.h"
#include "tfsInt.h"
class TfsTest : public ::testing::Test {
protected:
@ -280,6 +281,9 @@ TEST_F(TfsTest, 04_File) {
const STfsFile *pf2 = tfsReaddir(pDir);
EXPECT_EQ(pf2, nullptr);
pDir->pDir = taosOpenDir(fulldir);
EXPECT_NE(pDir->pDir, nullptr);
tfsClosedir(pDir);
}
@ -744,3 +748,116 @@ TEST_F(TfsTest, 05_MultiDisk) {
tfsClose(pTfs);
}
TEST_F(TfsTest, 06_Misc) {
// tfsDisk.c
STfsDisk *pDisk = NULL;
EXPECT_EQ(tfsNewDisk(0, 0, 0, NULL, &pDisk), TSDB_CODE_INVALID_PARA);
EXPECT_NE(tfsNewDisk(0, 0, 0, "", &pDisk), 0);
STfsDisk disk = {0};
EXPECT_EQ(tfsUpdateDiskSize(&disk), TSDB_CODE_INVALID_PARA);
// tfsTier.c
STfsTier tfsTier = {0};
EXPECT_EQ(taosThreadSpinInit(&tfsTier.lock, 0), 0);
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
tfsTier.ndisk = 3;
tfsTier.nAvailDisks = 1;
tfsTier.disks[1] = &disk;
disk.disable = 1;
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
disk.disable = 0;
disk.size.avail = 0;
EXPECT_EQ(tfsAllocDiskOnTier(&tfsTier), TSDB_CODE_FS_NO_VALID_DISK);
tfsTier.ndisk = TFS_MAX_DISKS_PER_TIER;
SDiskCfg diskCfg = {0};
tstrncpy(diskCfg.dir, "testDataDir", TSDB_FILENAME_LEN);
EXPECT_EQ(tfsMountDiskToTier(&tfsTier, &diskCfg, 0), TSDB_CODE_FS_TOO_MANY_MOUNT);
EXPECT_EQ(taosThreadSpinDestroy(&tfsTier.lock), 0);
// tfs.c
STfs *pTfs = NULL;
EXPECT_EQ(tfsOpen(0, -1, &pTfs), TSDB_CODE_INVALID_PARA);
EXPECT_EQ(tfsOpen(0, 0, &pTfs), TSDB_CODE_INVALID_PARA);
EXPECT_EQ(tfsOpen(0, TFS_MAX_DISKS + 1, &pTfs), TSDB_CODE_INVALID_PARA);
taosMemoryFreeClear(pTfs);
STfs tfs = {0};
STfsTier *pTier = &tfs.tiers[0];
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, -1), false);
tfs.nlevel = 2;
pTier->ndisk = 3;
pTier->nAvailDisks = 1;
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false);
pTier->disks[0] = &disk;
EXPECT_EQ(tfsDiskSpaceAvailable(&tfs, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, -1, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, tfs.nlevel + 1, 0), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, -1), false);
EXPECT_EQ(tfsDiskSpaceSufficient(&tfs, 0, pTier->ndisk), false);
EXPECT_EQ(tfsGetDisksAtLevel(&tfs, -1), 0);
EXPECT_EQ(tfsGetDisksAtLevel(&tfs, tfs.nlevel), 0);
EXPECT_EQ(tfsGetLevel(&tfs), tfs.nlevel);
for (int32_t l = 0; l < tfs.nlevel; ++l) {
EXPECT_EQ(taosThreadSpinInit(&tfs.tiers[l].lock, 0), 0);
}
SDiskID diskID = {0};
disk.size.avail = TFS_MIN_DISK_FREE_SIZE;
EXPECT_EQ(tfsAllocDisk(&tfs, tfs.nlevel, &diskID), 0);
tfs.nlevel = 0;
diskID.level = 0;
EXPECT_EQ(tfsAllocDisk(&tfs, 0, &diskID), 0);
tfs.nlevel = 2;
diskID.id = 10;
EXPECT_EQ(tfsMkdirAt(&tfs, NULL, diskID), TSDB_CODE_FS_INVLD_CFG);
EXPECT_NE(tfsMkdirRecurAt(&tfs, NULL, diskID), 0);
const char *rname = "";
EXPECT_EQ(tfsRmdir(&tfs, rname), 0);
EXPECT_EQ(tfsSearch(&tfs, -1, NULL), -1);
EXPECT_EQ(tfsSearch(&tfs, tfs.nlevel, NULL), -1);
diskCfg.level = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = TFS_MAX_TIERS;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = 0;
diskCfg.primary = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.primary = 2;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.primary = 1;
diskCfg.disable = -1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.disable = 2;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.disable = 0;
diskCfg.level = 1;
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
diskCfg.level = 0;
diskCfg.primary = 0;
tstrncpy(diskCfg.dir, "testDataDir1", TSDB_FILENAME_LEN);
EXPECT_NE(tfsCheckAndFormatCfg(&tfs, &diskCfg), 0);
TdFilePtr pFile = taosCreateFile("testDataDir1", TD_FILE_CREATE);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(tfsCheckAndFormatCfg(&tfs, &diskCfg), TSDB_CODE_FS_INVLD_CFG);
EXPECT_EQ(taosCloseFile(&pFile), 0);
EXPECT_EQ(taosRemoveFile("testDataDir1"), 0);
for (int32_t l = 0; l < tfs.nlevel; ++l) {
EXPECT_EQ(taosThreadSpinDestroy(&tfs.tiers[l].lock), 0);
}
}

View File

@ -151,7 +151,6 @@ typedef struct SCliThrd {
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout
STrans* pInst; //
@ -159,8 +158,6 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache;
SCvtAddr* pCvtAddr;
SHashObj* failFastCache;
SHashObj* batchCache;
SHashObj* connHeapCache;
SCliReq* stopMsg;
@ -224,8 +221,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleException(SCliConn* conn);
@ -1299,8 +1294,8 @@ static void cliHandleException(SCliConn* conn) {
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
@ -2124,144 +2119,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
tTrace("cli process batch size:%d", count);
}
}
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
return NULL;
}
queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr);
pList->sending += 1;
pList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq);
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port);
static void destroyBatchList(SCliBatchList* pList);
static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
int32_t code = 0;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
return;
}
pBatchList->batchLenLimit = pInst->shareConnLimit;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
return;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->shareConnLimit += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
return;
}
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
if (pBatchList == NULL) {
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = 1;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 0;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip);
pBatchList->dst = taosStrdup(key);
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
*ppBatchList = pBatchList;
return 0;
}
static void destroyBatchList(SCliBatchList* pList) {
if (pList == NULL) {
return;
}
while (!QUEUE_IS_EMPTY(&pList->wq)) {
queue* h = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pList->ip);
taosMemoryFree(pList->dst);
taosMemoryFree(pList);
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, &pReq->q);
pBatch->wLen += 1;
pBatch->shareConnLimit = pReq->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
pList->len += 1;
*ppBatch = pBatch;
return 0;
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); }
static void cliAsyncCb(uv_async_t* handle) {
@ -2494,10 +2352,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
pThrd->destroyAhandleFp = pInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -2505,11 +2359,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->batchCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->batchCache == NULL) {
TAOS_CHECK_GOTO(terrno, NULL, _end);
}
pThrd->connHeapCache = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
if (pThrd->connHeapCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
@ -2553,10 +2402,7 @@ _end:
transDQDestroy(pThrd->delayQueue, NULL);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
taosHashCleanup(pThrd->pIdConnTable);
taosArrayDestroy(pThrd->pQIdBuf);
@ -2580,7 +2426,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyReqAndAhanlde);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
@ -2592,24 +2437,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
SCliBatchList* pBatchList = (SCliBatchList*)(*pIter);
while (!QUEUE_IS_EMPTY(&pBatchList->wq)) {
queue* h = QUEUE_HEAD(&pBatchList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
void* pIter2 = taosHashIterate(pThrd->connHeapCache, NULL);
while (pIter2 != NULL) {
SHeap* heap = (SHeap*)(pIter2);

View File

@ -615,6 +615,21 @@ TEST_F(TransEnv, http) {
#endif
}
#if 1
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "telemetry.taosdata.com");
int32_t code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, "test", strlen("test"),HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
for (int32_t i = 0; i < 1; i++) {
code = taosSendTelemReport(&mgt,tsTelemUri, tsTelemPort, pCont, len,HTTP_FLAT);
printf("old addr:%s new addr:%s\n",mgt.defaultAddr, mgt.cachedAddr);
taosMsleep(2000);
}
taosTelemetryDestroy(&mgt);
#endif
{
STelemAddrMgmt mgt;
taosTelemetryMgtInit(&mgt, "error");

32
source/util/inc/tlogInt.h Normal file
View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_LOG_INT_H_
#define _TD_UTIL_LOG_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
void taosOpenNewSlowLogFile();
void taosLogObjSetToday(int64_t ts);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_LOG_INT_H_*/

View File

@ -20,7 +20,7 @@
#ifdef USE_ANALYTICS
#include <curl/curl.h>
#define ANAL_ALGO_SPLIT ","
#define ANALYTICS_ALOG_SPLIT_CHAR ","
typedef struct {
int64_t ver;
@ -136,7 +136,7 @@ bool taosAnalGetOptStr(const char *option, const char *optName, char *optValue,
return false;
}
pEnd = strstr(pStart, ANAL_ALGO_SPLIT);
pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR);
if (optMaxLen > 0) {
if (pEnd > pStart) {
int32_t len = (int32_t)(pEnd - pStart);
@ -168,7 +168,7 @@ bool taosAnalGetOptInt(const char *option, const char *optName, int64_t *optValu
int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
char *pos1 = strstr(option, buf);
char *pos2 = strstr(option, ANAL_ALGO_SPLIT);
char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR);
if (pos1 != NULL) {
*optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
return true;

View File

@ -483,7 +483,7 @@ static int32_t taosOpenNewLogFile() {
return 0;
}
static void taosOpenNewSlowLogFile() {
void taosOpenNewSlowLogFile() {
(void)taosThreadMutexLock(&tsLogObj.logMutex);
int64_t delta = taosGetTimestampSec() - tsLogObj.timestampToday;
if (delta >= 0 && delta < 86400) {
@ -539,6 +539,8 @@ void taosResetLog() {
}
}
void taosLogObjSetToday(int64_t ts) { tsLogObj.timestampToday = ts; }
static bool taosCheckFileIsOpen(char *logFileName) {
TdFilePtr pFile = taosOpenFile(logFileName, TD_FILE_WRITE);
if (pFile == NULL) {
@ -619,6 +621,7 @@ static void processLogFileName(const char *logName, int32_t maxFileNum) {
}
static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
int32_t code = 0, lino = 0;
#ifdef WINDOWS_STASH
/*
* always set maxFileNum to 1
@ -653,39 +656,28 @@ static int32_t taosInitNormalLog(const char *logName, int32_t maxFileNum) {
// only an estimate for number of lines
int64_t filesize = 0;
if (taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL) != 0) {
(void)printf("\nfailed to fstat log file:%s, reason:%s\n", name, strerror(errno));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
TAOS_CHECK_EXIT(taosFStatFile(tsLogObj.logHandle->pFile, &filesize, NULL));
tsLogObj.lines = (int32_t)(filesize / 60);
if (taosLSeekFile(tsLogObj.logHandle->pFile, 0, SEEK_END) < 0) {
TAOS_UNUSED(printf("failed to seek to the end of log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
TAOS_CHECK_EXIT(terrno);
}
(void)snprintf(name, sizeof(name), "==================================================\n");
(void)snprintf(name, sizeof(name),
"==================================================\n"
" new log file\n"
"==================================================\n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)snprintf(name, sizeof(name), " new log file \n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
}
(void)snprintf(name, sizeof(name), "==================================================\n");
if (taosWriteFile(tsLogObj.logHandle->pFile, name, (uint32_t)strlen(name)) <= 0) {
TAOS_UNUSED(printf("failed to write to log file:%s, reason:%s\n", name, tstrerror(terrno)));
taosUnLockLogFile(tsLogObj.logHandle->pFile);
return terrno;
TAOS_CHECK_EXIT(terrno);
}
return 0;
_exit:
if (code != 0) {
taosUnLockLogFile(tsLogObj.logHandle->pFile);
TAOS_UNUSED(printf("failed to init normal log file:%s at line %d, reason:%s\n", name, lino, tstrerror(code)));
}
return code;
}
static void taosUpdateLogNums(ELogLevel level) {

View File

@ -137,6 +137,10 @@ add_test(
NAME logTest
COMMAND logTest
)
target_include_directories(
logTest
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")

View File

@ -2,7 +2,9 @@
#include <stdlib.h>
#include <time.h>
#include <random>
#include <tdef.h>
#include <tlog.h>
#include <tlogInt.h>
#include <iostream>
using namespace std;
@ -44,3 +46,96 @@ TEST(log, check_log_refactor) {
}
taosCloseLog();
}
extern char *tsLogOutput;
TEST(log, misc) {
// taosInitLog
const char *path = TD_TMP_DIR_PATH "td";
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
EXPECT_EQ(taosInitLog("taoslog", 1, true), 0);
taosOpenNewSlowLogFile();
taosLogObjSetToday(INT64_MIN);
taosPrintSlowLog("slow log test");
// test taosInitLogOutput
const char *pLogName = NULL;
tsLogOutput = (char *)taosMemCalloc(1, TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "stdout", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "stderr", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "/dev/null", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tsLogOutput[0] = '#';
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "/", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "\\", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "testLogOutput", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, "testLogOutputDir/testLogOutput", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), 0);
tstrncpy(tsLogOutput, ".", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tstrncpy(tsLogOutput, "/..", TSDB_FILENAME_LEN);
EXPECT_EQ(taosInitLogOutput(&pLogName), TSDB_CODE_INVALID_CFG);
tsLogOutput[0] = 0;
// test taosAssertDebug
tsAssert = false;
taosAssertDebug(true, __FILE__, __LINE__, 0, "test_assert_true_without_core");
taosAssertDebug(false, __FILE__, __LINE__, 0, "test_assert_false_with_core");
tsAssert = true;
// test taosLogCrashInfo, taosReadCrashInfo and taosReleaseCrashLogFile
char nodeType[16] = "nodeType";
char *pCrashMsg = (char *)taosMemoryCalloc(1, 16);
EXPECT_NE(pCrashMsg, nullptr);
tstrncpy(pCrashMsg, "crashMsg", 16);
#if !defined(_TD_DARWIN_64) && !defined(WINDOWS)
pid_t pid = taosGetPId();
EXPECT_EQ(pid > 0, true);
siginfo_t sigInfo = {0};
sigInfo.si_pid = pid;
taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, &sigInfo);
#else
taosLogCrashInfo(nodeType, pCrashMsg, strlen(pCrashMsg), 0, nullptr);
#endif
char crashInfo[PATH_MAX] = {0};
snprintf(crashInfo, sizeof(crashInfo), "%s%s.%sCrashLog", tsLogDir, TD_DIRSEP, nodeType);
char *pReadMsg = NULL;
int64_t readMsgLen = 0;
TdFilePtr pFile = NULL;
taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile);
EXPECT_NE(pReadMsg, nullptr);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(strncasecmp(pReadMsg, "crashMsg", strlen("crashMsg")), 0);
EXPECT_EQ(taosCloseFile(&pFile), 0);
taosMemoryFreeClear(pReadMsg);
pFile = taosOpenFile(crashInfo, TD_FILE_WRITE);
EXPECT_NE(pFile, nullptr);
EXPECT_EQ(taosWriteFile(pFile, "00000", 1), 1);
EXPECT_EQ(taosCloseFile(&pFile), 0);
taosReadCrashInfo(crashInfo, &pReadMsg, &readMsgLen, &pFile);
EXPECT_EQ(pReadMsg, nullptr);
EXPECT_EQ(pFile, nullptr);
pFile = taosOpenFile(crashInfo, TD_FILE_WRITE);
EXPECT_NE(pFile, nullptr);
taosReleaseCrashLogFile(pFile, true);
// clean up
taosRemoveDir(path);
taosCloseLog();
}

View File

@ -13,7 +13,7 @@ if [ -n "$PID" ]; then
systemctl stop taosd
fi
PID=`ps -ef|grep -w taosd | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w taosd | grep -v grep | grep -v taosanode | awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 taosd
@ -38,10 +38,10 @@ while [ -n "$PID" ]; do
else
lsof -nti:6030 | xargs kill -9
fi
PID=`ps -ef|grep -w taos | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w taos | grep -v grep |grep -v taosanode| awk '{print $2}'`
done
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode|awk '{print $2}'`
while [ -n "$PID" ]; do
echo kill -9 $PID
#pkill -9 tmq_sim
@ -52,5 +52,5 @@ while [ -n "$PID" ]; do
else
lsof -nti:6030 | xargs kill -9
fi
PID=`ps -ef|grep -w tmq_sim | grep -v grep | awk '{print $2}'`
PID=`ps -ef|grep -w tmq_sim | grep -v grep | grep -v taosanode| awk '{print $2}'`
done

View File

@ -3,7 +3,17 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create anode
print =============== failed to create anode on '127.0.0.1:1101'
sql_error create anode '127.0.0.1:1101'
sql show anodes
if $rows != 0 then
return -1
endi
sql_error drop anode 1
print ================ create anode
sql create anode '192.168.1.116:6050'
sql show anodes
@ -30,7 +40,7 @@ print $data00 $data01 $data02
sql use d0
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double, c4 tinyint, c5 bigint, c6 varchar(12)) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
@ -42,10 +52,11 @@ sql create table ct1 using stb tags(1000)
print ==================== insert data
# input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
sql insert into ct1(ts, c1) values(now-1a, 5)(now+1a, 14)(now+2a, 15)(now+3a, 15)(now+4a, 14)
sql insert into ct1(ts, c1) values(now+5a, 19)(now+6a, 17)(now+7a, 16)(now+8a, 20)(now+9a, 22)
sql insert into ct1(ts, c1) values(now+10a, 8)(now+11a, 21)(now+12a, 28)(now+13a, 11)(now+14a, 9)
sql insert into ct1(ts, c1) values(now+15a, 29)(now+16a, 40)
sql insert into ct1(ts, c1, c2, c3, c4, c5, c6) values(now-1a, 5, 5, 5, 5, 5, 'a')(now+1a, 14, 14, 14, 14, 14, 'a')(now+2a, 15, 15, 15, 15, 15, 'a')
sql insert into ct1 values(now+3a, 15, 15, 15, 15, 15, 'a')(now+4a, 14, 14, 14, 14, 14, 'a')(now+5a, 19, 19, 19, 19, 19, 'a')(now+6a, 17, 17, 17, 17, 17, 'a')
sql insert into ct1 values(now+7a, 16, 16, 16, 16, 16, 'a')(now+8a, 20, 20, 20, 20, 20, 'a')(now+9a, 22, 22, 22, 22, 22, 'a')
sql insert into ct1 values(now+10a, 8, 8, 8, 8, 8, 'a')(now+11a, 21, 21, 21, 21, 21, 'a')(now+12a, 28, 28, 28, 28, 28, 'a')(now+13a, 11, 11, 11, 11, 11, 'a')(now+14a, 9, 9, 9, 9, 9, 'a')
sql insert into ct1 values(now+15a, 29, 29, 29, 29, 29, 'a')(now+16a, 40, 40, 40, 40, 40, 'a')
sql select count(*) from ct1
if $data00 != 17 then
@ -58,6 +69,87 @@ if $data00 != 1 then
return -1
endi
print ================= try every loaded anomaly detection algorithm
sql select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma');
sql select count(*) from ct1 anomaly_window(c1, 'algo=lof');
sql select count(*) from ct1 anomaly_window(c1, 'algo=shesd');
sql select count(*) from ct1 anomaly_window(c1, 'algo=grubbs');
print ================= try every column type of column
sql select count(*) from ct1 anomaly_window(c1, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c2, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c3, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c4, 'algo=ksigma,k=2');
sql select count(*) from ct1 anomaly_window(c5, 'algo=ksigma,k=2');
print =================== invalid column type
sql_error select count(*) from ct1 anomaly_window(c6, 'algo=ksigma,k=2');
sql_error select forecast(c6, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=0.5,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=119,wncheck=1,period=0') from ct1
sql_error select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters1,conf=0.5,wncheck=1,period=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=0') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, rows=-10') from ct1
sql_error select forecast(c1, 'conf=50 ,algo = arima, every=0') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters, conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo=holtwinters , conf=50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, ' algo = holtwinters , conf = 50 ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, , ,') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, a =') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'conf=50 ,algo = holtwinters, = a ,') from ct1
print =================== valid column type
sql select forecast(c1, 'conf=50 ,algo = arima') from ct1
sql select forecast(c1, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c2, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c3, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c4, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=2') from ct1
if $rows != 10 then
return -1
endi
if $data03 != 28 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
print expect 23-11-15 06:13:20.000 , actual $data00
return -1
endi
if $data10 != @23-11-15 06:13:20.002@ then
print expect 23-11-15 06:13:20.002 , actual $data10
return -1
endi
if $data20 != @23-11-15 06:13:20.004@ then
return -1
endi
print test the every option and rows option
sql select _frowts, _flow, _fhigh, forecast(c1, 'algo=holtwinters,conf=50,wncheck=1,period=0,start=1700000000000,every=100,rows=5') from ct1
if $rows != 5 then
return -1
endi
if $data00 != @23-11-15 06:13:20.000@ then
return -1
endi
if $data10 != @23-11-15 06:13:20.100@ then
return -1
endi
sql drop anode 1
sql show anodes
@ -66,6 +158,13 @@ if $rows != 0 then
return -1
endi
sleep 1000
print ===================== query without anodes
sql_error select forecast(c5, 'conf=50 ,algo = arima, rows=1') from ct1
sql_error select count(*) from ct1 anomaly_window(c1, 'algo=iqr');
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check