Merge branch '3.0' into enh/TD-33262-3.0

This commit is contained in:
kailixu 2024-12-25 11:11:41 +08:00
commit 9e6ace9e9d
23 changed files with 870 additions and 2522 deletions

View File

@ -7,7 +7,8 @@ file_zh_changed = ''
file_en_changed = '' file_en_changed = ''
file_no_doc_changed = '1' file_no_doc_changed = '1'
file_only_tdgpt_change_except = '1' file_only_tdgpt_change_except = '1'
tdgpt_file = "forecastoperator.c\\|anomalywindowoperator.c\\|tanalytics.h\\|tanalytics.c\\|tdgpt_cases.task\\|analytics" tdgpt_file = "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics"
def abortPreviousBuilds() { def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME def currentJobName = env.JOB_NAME
@ -69,7 +70,7 @@ def check_docs(){
returnStdout: true returnStdout: true
) )
file_no_doc_changed = sh ( def file_no_doc_changed = sh (
script: ''' script: '''
cd ${WKC} cd ${WKC}
git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" || : git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" || :
@ -80,7 +81,7 @@ def check_docs(){
file_only_tdgpt_change_except = sh ( file_only_tdgpt_change_except = sh (
script: ''' script: '''
cd ${WKC} cd ${WKC}
git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -v ${tdgpt_file} || : git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"|grep -v ".md$" | grep -Ev "forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics" ||:
''', ''',
returnStdout: true returnStdout: true
).trim() ).trim()
@ -572,7 +573,7 @@ pipeline {
cd ${WKC}/tests/parallel_test cd ${WKC}/tests/parallel_test
./run_scan_container.sh -d ${WKDIR} -b ${BRANCH_NAME}_${BUILD_ID} -f ${WKDIR}/tmp/${BRANCH_NAME}_${BUILD_ID}/docs_changed.txt ''' + extra_param + ''' ./run_scan_container.sh -d ${WKDIR} -b ${BRANCH_NAME}_${BUILD_ID} -f ${WKDIR}/tmp/${BRANCH_NAME}_${BUILD_ID}/docs_changed.txt ''' + extra_param + '''
''' '''
if ( file_no_doc_changed =~ /orecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics/ ) { if ( file_no_doc_changed =~ /forecastoperator.c|anomalywindowoperator.c|tanalytics.h|tanalytics.c|tdgpt_cases.task|analytics/ ) {
sh ''' sh '''
cd ${WKC}/tests/parallel_test cd ${WKC}/tests/parallel_test
export DEFAULT_RETRY_TIME=2 export DEFAULT_RETRY_TIME=2

View File

@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG main GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG main GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taosws-rs # taosws-rs
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG main GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -99,7 +99,7 @@ def test_myfc(self):
s = loader.get_service("myfc") s = loader.get_service("myfc")
# 设置用于预测分析的数据 # 设置用于预测分析的数据
s.set_input_list(self.get_input_list()) s.set_input_list(self.get_input_list(), None)
# 检查预测结果应该全部为 1 # 检查预测结果应该全部为 1
r = s.set_params( r = s.set_params(
{"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "start_p": 0} {"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "start_p": 0}

View File

@ -44,10 +44,10 @@ class _MyAnomalyDetectionService(AbstractAnomalyDetectionService):
def set_params(self, params): def set_params(self, params):
"""该算法无需任何输入参数,直接重载父类该函数,不处理算法参数设置逻辑""" """该算法无需任何输入参数,直接重载父类该函数,不处理算法参数设置逻辑"""
pass return super().set_params(params)
``` ```
将该文件保存在 `./lib/taosanalytics/algo/ad/` 目录下,然后重启 taosanode 服务。在 TDengine 命令行接口 taos 中执行 `SHOW ANODES FULL` 就能够看到新加入的算法,然后应用就可以通过 SQL 语句调用该检测算法。 将该文件保存在 `./lib/taosanalytics/algo/ad/` 目录下,然后重启 taosanode 服务。在 TDengine 命令行接口 taos 中执行 `SHOW ANODES FULL` 就能够看到新加入的算法,然后就可以通过 SQL 语句调用该算法。
```SQL ```SQL
--- 对 col 列进行异常检测,通过指定 algo 参数为 myad 来调用新添加的异常检测类 --- 对 col 列进行异常检测,通过指定 algo 参数为 myad 来调用新添加的异常检测类
@ -65,7 +65,7 @@ def test_myad(self):
s = loader.get_service("myad") s = loader.get_service("myad")
# 设置需要进行检测的输入数据 # 设置需要进行检测的输入数据
s.set_input_list(AnomalyDetectionTest.input_list) s.set_input_list(AnomalyDetectionTest.input_list, None)
r = s.execute() r = s.execute()

View File

@ -237,18 +237,20 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR
(void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); (void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
} else { } else {
int32_t dataLen = varDataTLen(pData); int32_t dataLen = varDataTLen(pData);
if (meta->type == TSDB_DATA_TYPE_JSON) { // This is a piece of code to help users implement udf. It is only called during testing.
if (*pData == TSDB_DATA_TYPE_NULL) { // Currently, the json type is not supported and will not be called.
dataLen = 0; // if (meta->type == TSDB_DATA_TYPE_JSON) {
} else if (*pData == TSDB_DATA_TYPE_NCHAR) { // if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = varDataTLen(pData + sizeof(char)); // dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { // } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = sizeof(int64_t); // dataLen = varDataTLen(pData + sizeof(char));
} else if (*pData == TSDB_DATA_TYPE_BOOL) { // } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = sizeof(char); // dataLen = sizeof(int64_t);
} // } else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen += sizeof(char); // dataLen = sizeof(char);
} // }
// dataLen += sizeof(char);
// }
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
uint32_t newSize = data->varLenCol.payloadAllocLen; uint32_t newSize = data->varLenCol.payloadAllocLen;

View File

@ -348,6 +348,7 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
req.compactId = compactId; req.compactId = compactId;
req.vgId = pVgroup->vgId; req.vgId = pVgroup->vgId;
req.dnodeId = dnodeid; req.dnodeId = dnodeid;
terrno = 0;
mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId); mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req); int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
@ -367,8 +368,10 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
pHead->contLen = htonl(contLen); pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
if ((contLen = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) { mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
terrno = contLen; int32_t ret = 0;
if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
terrno = ret;
return NULL; return NULL;
} }
*pContLen = contLen; *pContLen = contLen;
@ -401,6 +404,8 @@ static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *p
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_KILL_COMPACT; action.msgType = TDMT_VND_KILL_COMPACT;
mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
TAOS_RETURN(code); TAOS_RETURN(code);

View File

@ -160,7 +160,7 @@ int32_t metaAlterSuperTable(SMeta* pMeta, int64_t version, SVCreateStbRe
int32_t metaDropSuperTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int32_t metaDropSuperTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp); int32_t metaCreateTable2(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** ppRsp);
int32_t metaDropTable2(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); int32_t metaDropTable2(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta, int64_t version);
int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tbUids); int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tbUids);
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);

View File

@ -1645,7 +1645,6 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
metaFetchEntryFree(&pOldEntry); metaFetchEntryFree(&pOldEntry);
return code; return code;
} }
// TAOS_CHECK_RETURN(metaGetSubtables(pMeta, pEntry->uid, uids));
TAOS_CHECK_RETURN(tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type)); TAOS_CHECK_RETURN(tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type));
} else if (deltaCol == -1) { } else if (deltaCol == -1) {
int16_t cid = -1; int16_t cid = -1;
@ -1667,7 +1666,6 @@ static int32_t metaHandleSuperTableUpdate(SMeta *pMeta, const SMetaEntry *pEntry
metaFetchEntryFree(&pOldEntry); metaFetchEntryFree(&pOldEntry);
return code; return code;
} }
// TAOS_CHECK_RETURN(metaGetSubtables(pMeta, pEntry->uid, uids));
TAOS_CHECK_RETURN(tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey)); TAOS_CHECK_RETURN(tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey));
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -607,9 +607,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} }
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64 vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 ", state.applyTerm:%" PRId64
", conn.applyTerm:%" PRId64, ", conn.applyTerm:%" PRId64 ", contLen:%d",
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm,
pMsg->info.conn.applyTerm); pMsg->info.conn.applyTerm, pMsg->contLen);
if (!(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm)) { if (!(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm)) {
return terrno = TSDB_CODE_INTERNAL_ERROR; return terrno = TSDB_CODE_INTERNAL_ERROR;
@ -843,7 +843,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb}; SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool redirected = false; bool redirected = false;
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_SCH_QUERY: case TDMT_SCH_QUERY:
@ -2145,7 +2145,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
pVnode->config.hashBegin, pVnode->config.hashEnd, ver); pVnode->config.hashBegin, pVnode->config.hashEnd, ver);
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd] // TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
code = metaTrimTables(pVnode->pMeta); code = metaTrimTables(pVnode->pMeta, ver);
return code; return code;
} }

View File

@ -142,6 +142,50 @@ target_link_libraries(
udf2_dup PUBLIC os ${LINK_JEMALLOC} udf2_dup PUBLIC os ${LINK_JEMALLOC}
) )
set(TARGET_NAMES
change_udf_normal
change_udf_no_init
change_udf_no_process
change_udf_no_destroy
change_udf_init_failed
change_udf_process_failed
change_udf_destory_failed
)
set(COMPILE_DEFINITIONS
CHANGE_UDF_NORMAL
CHANGE_UDF_NO_INIT
CHANGE_UDF_NO_PROCESS
CHANGE_UDF_NO_DESTROY
CHANGE_UDF_INIT_FAILED
CHANGE_UDF_PROCESS_FAILED
CHANGE_UDF_DESTORY_FAILED
)
foreach(index RANGE 0 6)
list(GET TARGET_NAMES ${index} target_name)
list(GET COMPILE_DEFINITIONS ${index} compile_def)
add_library(${target_name} STATIC MODULE test/change_udf.c)
target_include_directories(
${target_name}
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_compile_definitions(${target_name} PRIVATE ${compile_def})
IF(TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(${target_name} jemalloc)
ENDIF()
target_link_libraries(
${target_name} PUBLIC os ${LINK_JEMALLOC}
)
endforeach()
# SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin) # SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c) add_executable(udfd src/udfd.c)

View File

@ -0,0 +1,172 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
// rename function name
#ifdef CHANGE_UDF_NORMAL
#define UDFNAME change_udf_normal
#define UDFNAMEINIT change_udf_normal_init
#define UDFNAMEDESTROY change_udf_normal_destroy
#elif defined(CHANGE_UDF_NO_INIT)
#define UDFNAME change_udf_no_init
#define UDFNAMEINIT change_udf_no_init_init
#define UDFNAMEDESTROY change_udf_no_init_destroy
#elif defined(CHANGE_UDF_NO_PROCESS)
#define UDFNAME change_udf_no_process
#define UDFNAMEINIT change_udf_no_process_init
#define UDFNAMEDESTROY change_udf_no_process_destroy
#elif defined(CHANGE_UDF_NO_DESTROY)
#define UDFNAME change_udf_no_destroy
#define UDFNAMEINIT change_udf_no_destroy_init
#define UDFNAMEDESTROY change_udf_no_destroy_destroy
#elif defined(CHANGE_UDF_INIT_FAILED)
#define UDFNAME change_udf_init_failed
#define UDFNAMEINIT change_udf_init_failed_init
#define UDFNAMEDESTROY change_udf_init_failed_destroy
#elif defined(CHANGE_UDF_PROCESS_FAILED)
#define UDFNAME change_udf_process_failed
#define UDFNAMEINIT change_udf_process_failed_init
#define UDFNAMEDESTROY change_udf_process_failed_destroy
#elif defined(CHANGE_UDF_DESTORY_FAILED)
#define UDFNAME change_udf_destory_failed
#define UDFNAMEINIT change_udf_destory_failed_init
#define UDFNAMEDESTROY change_udf_destory_failed_destroy
#else
#define UDFNAME change_udf_normal
#define UDFNAMEINIT change_udf_normal_init
#define UDFNAMEDESTROY change_udf_normal_destroy
#endif
#ifdef CHANGE_UDF_NO_INIT
#else
DLL_EXPORT int32_t UDFNAMEINIT() {
#ifdef CHANGE_UDF_INIT_FAILED
return -1;
#else
return 0;
#endif // ifdef CHANGE_UDF_INIT_FAILED
}
#endif // ifdef CHANGE_UDF_NO_INIT
#ifdef CHANGE_UDF_NO_DESTROY
#else
DLL_EXPORT int32_t UDFNAMEDESTROY() {
#ifdef CHANGE_UDF_DESTORY_FAILED
return -1;
#else
return 0;
#endif // ifdef CHANGE_UDF_DESTORY_FAILED
}
#endif // ifdef CHANGE_UDF_NO_DESTROY
#ifdef CHANGE_UDF_NO_PROCESS
#else
DLL_EXPORT int32_t UDFNAME(SUdfDataBlock *block, SUdfColumn *resultCol) {
#ifdef CHANGE_UDF_PROCESS_FAILED
return -1;
#else
int32_t code = 0;
SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
code = udfColDataSetNull(resultCol, i);
if (code != 0) {
return code;
}
break;
}
}
if (j == block->numOfCols) {
int32_t luckyNum = 1;
code = udfColDataSet(resultCol, i, (char *)&luckyNum, false);
if (code != 0) {
return code;
}
}
}
// to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif // ifdef LINUX
#ifdef WINDOWS
Sleep(1);
#endif // ifdef WINDOWS
resultData->numOfRows = block->numOfRows;
return 0;
#endif // ifdef CHANGE_UDF_PROCESS_FAILED
}
#endif // ifdef CHANGE_UDF_NO_PROCESS
/********************************************************************************************************************/
// udf revert functions
/********************************************************************************************************************/
DLL_EXPORT int32_t udf_reverse_init() { return 0; }
DLL_EXPORT int32_t udf_reverse_destroy() { return 0; }
static void reverse_data(char* data, size_t len) {
size_t i, j;
char temp;
for (i = 0, j = len - 1; i < j; i++, j--) {
temp = data[i];
data[i] = data[j];
data[j] = temp;
}
}
DLL_EXPORT int32_t udf_reverse(SUdfDataBlock *block, SUdfColumn *resultCol) {
int32_t code = 0;
SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
code = udfColDataSetNull(resultCol, i);
if (code != 0) {
return code;
}
break;
} else {
int32_t oldLen = udfColDataGetDataLen(block->udfCols[j], i);
char *pOldData = udfColDataGetData(block->udfCols[j], i);
char *buff = malloc(sizeof(VarDataLenT) + oldLen);
if (buff == NULL) {
return -1;
}
((VarDataLenT *)buff)[0] = (VarDataLenT)oldLen;
memcpy(buff, pOldData, oldLen + sizeof(VarDataLenT));
reverse_data(buff + sizeof(VarDataLenT), oldLen);
code = udfColDataSet(resultCol, i, buff, false);
if (code != 0) {
free(buff);
return code;
}
}
}
}
// to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
resultData->numOfRows = block->numOfRows;
return 0;
}

View File

@ -603,11 +603,11 @@ int32_t checkItemDyn(SConfigItem *pItem, bool isServer) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (isServer) { if (isServer) {
if (pItem->dynScope == CFG_DYN_ENT_CLIENT || pItem->dynScope == CFG_DYN_ENT_CLIENT_LAZY) { if (pItem->dynScope == CFG_DYN_CLIENT || pItem->dynScope == CFG_DYN_CLIENT_LAZY) {
return TSDB_CODE_INVALID_CFG; return TSDB_CODE_INVALID_CFG;
} }
} else { } else {
if (pItem->dynScope == CFG_DYN_ENT_SERVER || pItem->dynScope == CFG_DYN_ENT_SERVER_LAZY) { if (pItem->dynScope == CFG_DYN_SERVER || pItem->dynScope == CFG_DYN_SERVER_LAZY) {
return TSDB_CODE_INVALID_CFG; return TSDB_CODE_INVALID_CFG;
} }
} }

View File

@ -0,0 +1,81 @@
import taos
import sys
import os
import subprocess
import glob
import shutil
import time
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.srvCtl import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
from frame import epath
# from frame.server.dnodes import *
# from frame.server.cluster import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
updatecfgDict = {'dDebugFlag':131}
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
self.valgrind = 0
self.db = "test"
self.stb = "meters"
self.childtable_count = 10
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
time.sleep(1)
count = 0
while count < 100:
tdSql.query("show arbgroups;")
if tdSql.getData(0, 4) == 1:
break
tdLog.info("wait 1 seconds for is sync")
time.sleep(1)
count += 1
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "follower") and (tdSql.getData(0, 6) == "leader"):
tdLog.info("stop dnode2")
sc.dnodeStop(2)
if(tdSql.getData(0, 6) == "follower") and (tdSql.getData(0, 4) == "leader"):
tdLog.info("stop dnode 3")
sc.dnodeStop(3)
count = 0
while count < 100:
tdSql.query("show db.vgroups;")
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
break
tdLog.info("wait 1 seconds for set assigned")
time.sleep(1)
count += 1
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -11,6 +11,7 @@
# #
,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3 ,,y,army,./pytest.sh python3 ./test.py -f db-encrypt/basic.py -N 3 -M 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/arbitrator.py -N 3
,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3 ,,n,army,python3 ./test.py -f storage/s3/s3Basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f cluster/snapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f query/function/test_func_elapsed.py
@ -365,6 +366,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdNewMonitor.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosd_audit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosd_audit.py
,,n,system-test,python3 ./test.py -f 0-others/taosdlog.py ,,n,system-test,python3 ./test.py -f 0-others/taosdlog.py
,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3 ,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3
@ -407,7 +409,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/persisit_config.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/persisit_config.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/qmemCtrl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/qmemCtrl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact_vgroups.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact_vgroups.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py

View File

@ -155,6 +155,9 @@ class TDSql:
try: try:
self.cursor.execute(sql) self.cursor.execute(sql)
self.queryResult = self.cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self.cursor.description)
except BaseException as e: except BaseException as e:
tdLog.info("err:%s" % (e)) tdLog.info("err:%s" % (e))
expectErrNotOccured = False expectErrNotOccured = False
@ -165,10 +168,6 @@ class TDSql:
if expectErrNotOccured: if expectErrNotOccured:
tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql)) tdLog.exit("%s(%d) failed: sql:%s, expect error not occured" % (caller.filename, caller.lineno, sql))
else: else:
self.queryRows = 0
self.queryCols = 0
self.queryResult = None
if fullMatched: if fullMatched:
if expectedErrno != None: if expectedErrno != None:
expectedErrno_rest = expectedErrno & 0x0000ffff expectedErrno_rest = expectedErrno & 0x0000ffff

View File

@ -0,0 +1,13 @@
import ctypes
TAOS_SYSTEM_ERROR = ctypes.c_int32(0x80ff0000).value
TAOS_DEF_ERROR_CODE = ctypes.c_int32(0x80000000).value
TSDB_CODE_MND_FUNC_NOT_EXIST = (TAOS_DEF_ERROR_CODE | 0x0374)
TSDB_CODE_UDF_FUNC_EXEC_FAILURE = (TAOS_DEF_ERROR_CODE | 0x290A)
TSDB_CODE_TSC_INTERNAL_ERROR = (TAOS_DEF_ERROR_CODE | 0x02FF)

View File

@ -0,0 +1,85 @@
from util.log import *
from util.cases import *
from util.dnodes import *
from util.sql import *
import socket
import taos
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
self.replicaVar = int(replicaVar)
def run(self):
tdSql.query("CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 WAL_LEVEL 1 vgroups 1 replica 1;")
tdSql.query("CREATE DATABASE power1 KEEP 365 DURATION 10 BUFFER 16 WAL_LEVEL 1 vgroups 1 replica 1;")
#first
tdSql.query("compact database power;")
tdLog.info("compact id:%d"%tdSql.queryResult[0][1])
tdSql.query("show compact %d;"%tdSql.queryResult[0][1])
tdLog.info("detail:%d"%tdSql.queryRows)
#second
tdSql.query("compact database power1;")
tdLog.info("compact id:%d"%tdSql.queryResult[0][1])
tdSql.query("show compact %d;"%tdSql.queryResult[0][1])
tdLog.info("detail:%d"%tdSql.queryRows)
#kill
tdSql.query("show compacts;")
number1 = tdSql.queryResult[0][0]
number2 = tdSql.queryResult[1][0]
#first
tdLog.info("kill compact %d;"%number1)
tdSql.query("kill compact %d;"%number1)
#second
tdLog.info("kill compact %d;"%number2)
tdSql.query("kill compact %d;"%number2)
#show
count = 0
tdLog.info("query progress")
while count < 50:
tdSql.query("show compact %d;"%number1)
row1 = tdSql.queryRows
tdSql.query("show compact %d;"%number2)
row2 = tdSql.queryRows
tdLog.info("compact%d:detail count:%d"%(number1, row1))
tdLog.info("compact%d:detail count:%d"%(number2, row2))
if row1 == 0 and row2 == 0 :
break
time.sleep(1)
count +=1
#tdLog.info("loop%d"%count)
if row1 != 0 or row2 != 0:
tdLog.exit("compact failed")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,243 @@
import taos
import sys
import time
import socket
# import pexpect
import os
import http.server
import gzip
import threading
import json
import pickle
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
telemetryPort = '6043'
serverPort = '7080'
hostname = socket.gethostname()
class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
hostPort = hostname + ":" + serverPort
def telemetryInfoCheck(self, infoDict=''):
if len(infoDict) == 0:
return
if "ts" not in infoDict[0] or len(infoDict[0]["ts"]) == 0:
tdLog.exit("ts is null!")
if "protocol" not in infoDict[0] or infoDict[0]["protocol"] != 2:
tdLog.exit("protocol is null!")
if "tables" not in infoDict[0]:
tdLog.exit("tables is null!")
if infoDict[0]["tables"][0]["name"] != "taosd_dnodes_info":
tdLog.exit("taosd_dnodes_info is null!")
# dnode_info ====================================
dnode_infos = ['disk_engine', 'system_net_in', 'vnodes_num', 'system_net_out', 'uptime', 'has_mnode', 'io_read_disk', 'error_log_count',
'io_read', 'cpu_cores', 'has_qnode', 'has_snode', 'disk_total', 'mem_engine', 'info_log_count', 'cpu_engine', 'io_write_disk',
'debug_log_count', 'disk_used', 'mem_total', 'io_write', 'masters', 'cpu_system',
'trace_log_count', 'mem_free']
index = 0
for elem in dnode_infos:
tdLog.debug(f"elem: {index},{elem}")
if infoDict[0]["tables"][0]["metric_groups"][0]["metrics"][index]["name"] != elem:
tdLog.exit(f"{elem} is null!")
index += 1
if infoDict[0]["tables"][1]["name"] != "taosd_dnodes_log_dirs":
tdLog.exit("taosd_dnodes_log_dirs is null!")
# logdir
if infoDict[0]["tables"][1]["metric_groups"][0]["tags"][3]["name"] != "data_dir_name":
tdLog.exit("data_dir_name is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][0]["name"] != "total":
tdLog.exit("total is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][1]["name"] != "used":
tdLog.exit("used is null!")
if infoDict[0]["tables"][1]["metric_groups"][0]["metrics"][2]["name"] != "avail":
tdLog.exit("avail is null!")
if infoDict[0]["tables"][2]["name"] != "taosd_dnodes_data_dirs":
tdLog.exit("taosd_dnodes_data_dirs is null!")
# data_dir
if infoDict[0]["tables"][2]["metric_groups"][0]["tags"][3]["name"] != "data_dir_name":
tdLog.exit("data_dir_name is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][0]["name"] != "avail":
tdLog.exit("total is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][1]["name"] != "total":
tdLog.exit("used is null!")
if infoDict[0]["tables"][2]["metric_groups"][0]["metrics"][2]["name"] != "used":
tdLog.exit("avail is null!")
if infoDict[0]["tables"][3]["name"] != "taosd_cluster_info":
tdLog.exit("taosd_cluster_info is null!")
# cluster_info ====================================
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][0]["name"] != "cluster_uptime":
tdLog.exit("cluster_uptime is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][1]["name"] != "dbs_total":
tdLog.exit("dbs_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][4]["name"] != "vgroups_total":
tdLog.exit("vgroups_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][5]["name"] != "vgroups_alive":
tdLog.exit("vgroups_alive is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][10]["name"] != "connections_total":
tdLog.exit("connections_total is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][13]["name"] != "dnodes_total":
tdLog.exit("dnodes_total is null!")
# grant_info ====================================
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][15]["name"] != "grants_expire_time":
tdLog.exit("grants_expire_time is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][16]["name"] != "grants_timeseries_used":
tdLog.exit("grants_timeseries_used is null!")
if infoDict[0]["tables"][3]["metric_groups"][0]["metrics"][17]["name"] != "grants_timeseries_total":
tdLog.exit("grants_timeseries_total is null!")
# vgroup_infos ====================================
vgroup_infos_nums = len(infoDict[0]["tables"][4]["metric_groups"])
for index in range(vgroup_infos_nums):
if infoDict[0]["tables"][4]["metric_groups"][index]["metrics"][0]["name"] != "tables_num":
tdLog.exit("tables_num is null!")
if infoDict[0]["tables"][4]["metric_groups"][index]["metrics"][1]["name"] != "status":
tdLog.exit("status is null!")
if infoDict[0]["tables"][5]["name"] != "taosd_dnodes_status":
tdLog.exit("taosd_dnodes_status is null!")
if infoDict[0]["tables"][6]["name"] != "taosd_mnodes_info":
tdLog.exit("taosd_mnodes_info is null!")
if infoDict[0]["tables"][7]["name"] != "taosd_vnodes_info":
tdLog.exit("taosd_vnodes_info is null!")
def do_GET(self):
"""
process GET request
"""
def do_POST(self):
"""
process POST request
"""
contentEncoding = self.headers["Content-Encoding"]
if contentEncoding == 'gzip':
req_body = self.rfile.read(int(self.headers["Content-Length"]))
plainText = gzip.decompress(req_body).decode()
else:
plainText = self.rfile.read(int(self.headers["Content-Length"])).decode()
print(plainText)
# 1. send response code and header
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
# 2. send response content
#self.wfile.write(("Hello World: " + req_body + "\n").encode("utf-8"))
# 3. check request body info
infoDict = json.loads(plainText)
#print("================")
# print(infoDict)
self.telemetryInfoCheck(infoDict)
# 4. shutdown the server and exit case
assassin = threading.Thread(target=self.server.shutdown)
assassin.daemon = True
assassin.start()
print ("==== shutdown http server ====")
class TDTestCase:
global hostname
global serverPort
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP )
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort
clientCfgDict["firstEp"] = hostname + ':' + serverPort
clientCfgDict["secondEp"] = hostname + ':' + serverPort
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
clientCfgDict["fqdn"] = hostname
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["clientCfg"] = clientCfgDict
updatecfgDict["serverPort"] = serverPort
updatecfgDict["firstEp"] = hostname + ':' + serverPort
updatecfgDict["secondEp"] = hostname + ':' + serverPort
updatecfgDict["fqdn"] = hostname
updatecfgDict["monitorFqdn"] = hostname
updatecfgDict["monitorPort"] = '6043'
updatecfgDict["monitor"] = '1'
updatecfgDict["monitorInterval"] = "5"
updatecfgDict["monitorMaxLogs"] = "10"
updatecfgDict["monitorComp"] = "1"
updatecfgDict["monitorForceV2"] = "1"
updatecfgDict["audit"] = '0'
print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "4"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)"
tdSql.query(sql)
sql = "create table db3.tb using db3.stb tags (1)"
tdSql.query(sql)
# create http server: bing ip/port , and request processor
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
RequestHandlerImplStr = base64.b64encode(pickle.dumps(RequestHandlerImpl)).decode()
cmdStr = "import pickle\nimport http\nRequestHandlerImpl=pickle.loads(base64.b64decode(\"%s\".encode()))\nclass NewRequestHandlerImpl(RequestHandlerImpl):\n hostPort = \'%s\'\nhttp.server.HTTPServer((\"\", %s), NewRequestHandlerImpl).serve_forever()"%(RequestHandlerImplStr,hostname+":"+serverPort,telemetryPort)
tdDnodes.dnodes[0].remoteExec({}, cmdStr)
else:
serverAddress = ("", int(telemetryPort))
http.server.HTTPServer(serverAddress, RequestHandlerImpl).serve_forever()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -11,6 +11,7 @@ from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.tserror import *
import subprocess import subprocess
class TDTestCase: class TDTestCase:
@ -56,8 +57,32 @@ class TDTestCase:
else: else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_init = subprocess.Popen('find %s -name "libchange_udf_no_init.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_process = subprocess.Popen('find %s -name "libchange_udf_no_process.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_destroy = subprocess.Popen('find %s -name "libchange_udf_no_destroy.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_init_failed = subprocess.Popen('find %s -name "libchange_udf_init_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_process_failed = subprocess.Popen('find %s -name "libchange_udf_process_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_destroy_failed = subprocess.Popen('find %s -name "libchange_udf_destory_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_normal = subprocess.Popen('find %s -name "libchange_udf_normal.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','') self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
self.libudf2 = self.libudf2.replace('\r','').replace('\n','') self.libudf2 = self.libudf2.replace('\r','').replace('\n','')
self.libchange_udf_no_init = self.libchange_udf_no_init.replace('\r','').replace('\n','')
self.libchange_udf_no_process = self.libchange_udf_no_process.replace('\r','').replace('\n','')
self.libchange_udf_normal = self.libchange_udf_normal.replace('\r','').replace('\n','')
self.libchange_udf_no_destroy = self.libchange_udf_no_destroy.replace('\r','').replace('\n','')
self.libchange_udf_init_failed = self.libchange_udf_init_failed.replace('\r','').replace('\n','')
self.libchange_udf_process_failed = self.libchange_udf_process_failed.replace('\r','').replace('\n','')
self.libchange_udf_destroy_failed = self.libchange_udf_destroy_failed.replace('\r','').replace('\n','')
tdLog.info(f"udf1 so path is {self.libudf1}")
tdLog.info(f"udf2 so path is {self.libudf2}")
tdLog.info(f"change_udf_no_init so path is {self.libchange_udf_no_init}")
tdLog.info(f"change_udf_no_process so path is {self.libchange_udf_no_process}")
tdLog.info(f"change_udf_no_destroy so path is {self.libchange_udf_no_destroy}")
tdLog.info(f"change_udf_init_failed so path is {self.libchange_udf_init_failed}")
tdLog.info(f"change_udf_process_failed so path is {self.libchange_udf_process_failed}")
tdLog.info(f"change_udf_destroy_failed so path is {self.libchange_udf_destroy_failed}")
tdLog.info(f"change_udf_normal so path is {self.libchange_udf_normal}")
def prepare_data(self): def prepare_data(self):
@ -664,13 +689,118 @@ class TDTestCase:
path = ''.join(random.choice(letters) for i in range(5000)) path = ''.join(random.choice(letters) for i in range(5000))
os.system(f"udfd -c {path}") os.system(f"udfd -c {path}")
def test_change_udf_normal(self, func_name):
# create function with normal file
tdSql.execute(f"create function {func_name} as '%s' outputtype int"%self.libchange_udf_normal)
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
break
tdSql.query(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.checkData(0,0,None)
tdSql.checkData(0,1,None)
tdSql.checkData(0,2,1)
tdSql.checkData(0,3,1)
tdSql.checkData(0,4,1.000000000)
tdSql.checkData(0,5,1)
tdSql.checkData(0,6,"binary1")
tdSql.checkData(0,7,1)
tdSql.query(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.execute(f"drop function {func_name}")
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdLog.info(f"change udf test finished, using {self.libchange_udf_normal}")
def test_change_udf_failed(self, func_name, lib_name):
tdLog.info(f"test change udf start: using {lib_name}")
tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdSql.execute(f"create function {func_name} as '{lib_name}' outputtype int")
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {lib_name}")
break
tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.execute(f"drop function {func_name}")
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdLog.info(f"change udf test finished, using {lib_name}")
def test_change_udf_reverse(self):
tdSql.execute("create database if not exists db duration 100")
tdSql.execute("use db")
func_name = "udf_reverse"
tdSql.execute(f"create function {func_name} as '%s' outputtype nchar(256)"%self.libchange_udf_normal)
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
break
tdSql.error(f"select {func_name}(c8) from db.t1", TSDB_CODE_TSC_INTERNAL_ERROR)
tdSql.execute(f"drop function {func_name}")
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
self.test_change_udf_normal("change_udf_normal")
tdSql.execute(f"create function {func_name} as '%s' outputtype varchar(256)"%self.libchange_udf_normal)
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
break
tdSql.query(f"select {func_name}(c8) from db.t1 order by ts")
tdSql.checkData(0,0, None)
tdSql.checkData(1,0, "1yranib")
tdSql.checkData(2,0, "2yranib")
tdSql.checkData(3,0, "3yranib")
def unexpected_using_test(self):
tdSql.execute("use db ")
# create function without wrong file path
tdSql.error("create function udf1 as '%s_wrongpath' outputtype int;"%self.libudf1, TAOS_SYSTEM_ERROR|2)
tdSql.error("create aggregate function udf2 as '%s_wrongpath' outputtype double;"%self.libudf2, TAOS_SYSTEM_ERROR|2)
tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1)
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_init", self.libchange_udf_no_init)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_process", self.libchange_udf_no_process)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_destroy", self.libchange_udf_no_destroy)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_init_failed", self.libchange_udf_init_failed)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_process_failed", self.libchange_udf_process_failed)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("libchange_udf_destroy_failed", self.libchange_udf_destroy_failed)
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
tdSql.execute(f"drop function udf1")
tdSql.error("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
print(" env is ok for all ") print(" env is ok for all ")
self.test_udfd_cmd() self.test_udfd_cmd()
self.prepare_udf_so() self.prepare_udf_so()
self.prepare_data() self.prepare_data()
self.unexpected_using_test()
self.create_udf_function() self.create_udf_function()
self.test_change_udf_reverse()
self.basic_udf_query() self.basic_udf_query()
self.loop_kill_udfd() self.loop_kill_udfd()
tdSql.execute(" drop function udf1 ") tdSql.execute(" drop function udf1 ")

View File

@ -0,0 +1,50 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
from util.cases import *
from util.sql import *
from util.dnodes import *
from util.log import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
time.sleep(1)
tdSql.query("show db.vgroups;")
if(tdSql.queryResult[0][4] == "follower") and (tdSql.queryResult[0][6] == "leader"):
tdLog.info("stop dnode2")
sc.dnodeStop(2)
if(tdSql.queryResult[0][6] == "follower") and (tdSql.queryResult[0][4] == "leader"):
tdLog.info("stop dnode 3")
sc.dnodeStop(3)
tdLog.info("wait 10 seconds")
time.sleep(10)
tdSql.query("show db.vgroups;")
if(tdSql.queryResult[0][4] != "assigned") and (tdSql.queryResult[0][6] != "assigned"):
tdLog.exit("failed to set aasigned")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())