Merge branch '3.0' into feature/TD-14481-3.0

This commit is contained in:
Cary Xu 2022-05-11 07:43:34 +08:00
commit 6c5255b95a
68 changed files with 1232 additions and 476 deletions

View File

@ -0,0 +1,12 @@
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -75,6 +75,14 @@ IF (TD_WINDOWS)
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
ELSE ()
IF (${COVER} MATCHES "true")
MESSAGE(STATUS "Test coverage mode, add extra flags")
SET(GCC_COVERAGE_COMPILE_FLAGS "-fprofile-arcs -ftest-coverage")
SET(GCC_COVERAGE_LINK_FLAGS "-lgcov --coverage")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}")
ENDIF ()
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")

View File

@ -48,6 +48,12 @@ IF(${TD_WINDOWS})
ENDIF ()
option(
BUILD_ADDR2LINE
"If build addr2line"
OFF
)
option(
BUILD_TEST
"If build unit tests using googletest"

View File

@ -0,0 +1,12 @@
# libdwarf
ExternalProject_Add(libdwarf
GIT_REPOSITORY https://github.com/davea42/libdwarf-code.git
GIT_TAG libdwarf-0.3.1
SOURCE_DIR "${TD_CONTRIB_DIR}/libdwarf"
BINARY_DIR "${TD_CONTRIB_DIR}/libdwarf"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -98,6 +98,12 @@ if(${BUILD_WITH_NURAFT})
cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
# addr2line
if(${BUILD_ADDR2LINE})
cat("${TD_SUPPORT_DIR}/libdwarf_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/addr2line_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_ADDR2LINE})
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
@ -327,7 +333,48 @@ if(${BUILD_WITH_SQLITE})
endif(NOT TD_WINDOWS)
endif(${BUILD_WITH_SQLITE})
# pthread
# addr2line
if(${BUILD_ADDR2LINE})
check_include_file( "sys/types.h" HAVE_SYS_TYPES_H)
check_include_file( "sys/stat.h" HAVE_SYS_STAT_H )
check_include_file( "inttypes.h" HAVE_INTTYPES_H )
check_include_file( "stddef.h" HAVE_STDDEF_H )
check_include_file( "stdlib.h" HAVE_STDLIB_H )
check_include_file( "string.h" HAVE_STRING_H )
check_include_file( "memory.h" HAVE_MEMORY_H )
check_include_file( "strings.h" HAVE_STRINGS_H )
check_include_file( "stdint.h" HAVE_STDINT_H )
check_include_file( "unistd.h" HAVE_UNISTD_H )
check_include_file( "sgidefs.h" HAVE_SGIDEFS_H )
check_include_file( "stdafx.h" HAVE_STDAFX_H )
check_include_file( "elf.h" HAVE_ELF_H )
check_include_file( "libelf.h" HAVE_LIBELF_H )
check_include_file( "libelf/libelf.h" HAVE_LIBELF_LIBELF_H)
check_include_file( "alloca.h" HAVE_ALLOCA_H )
check_include_file( "elfaccess.h" HAVE_ELFACCESS_H)
check_include_file( "sys/elf_386.h" HAVE_SYS_ELF_386_H )
check_include_file( "sys/elf_amd64.h" HAVE_SYS_ELF_AMD64_H)
check_include_file( "sys/elf_sparc.h" HAVE_SYS_ELF_SPARC_H)
check_include_file( "sys/ia64/elf.h" HAVE_SYS_IA64_ELF_H )
set(VERSION 0.3.1)
set(PACKAGE_VERSION "\"${VERSION}\"")
configure_file(libdwarf/cmake/config.h.cmake config.h)
file(GLOB_RECURSE LIBDWARF_SOURCES "libdwarf/src/lib/libdwarf/*.c")
add_library(libdwarf STATIC ${LIBDWARF_SOURCES})
set_target_properties(libdwarf PROPERTIES OUTPUT_NAME "libdwarf")
if(HAVE_LIBELF_H OR HAVE_LIBELF_LIBELF_H)
target_link_libraries(libdwarf PUBLIC libelf)
endif()
target_include_directories(libdwarf SYSTEM PUBLIC "libdwarf/src/lib/libdwarf" ${CMAKE_BINARY_DIR}/contrib)
file(READ "addr2line/addr2line.c" ADDR2LINE_CONTENT)
string(REPLACE "static int" "int" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
string(REPLACE "static void" "void" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
string(REPLACE "main(" "main_addr2line(" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}")
file(WRITE "addr2line/addr2line.c" "${ADDR2LINE_CONTENT}")
add_library(addr2line STATIC "addr2line/addr2line.c")
target_link_libraries(addr2line PUBLIC libdwarf dl z)
target_include_directories(addr2line PUBLIC "libdwarf/src/lib/libdwarf" )
endif(${BUILD_ADDR2LINE})
# ================================================================================================

View File

@ -121,6 +121,9 @@ extern char tsCompressor[];
extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[];
// udf
extern bool tsStartUdfd;
// internal
extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;

View File

@ -328,6 +328,9 @@ typedef struct {
int8_t alterType;
int32_t numOfFields;
SArray* pFields;
int32_t ttl;
int32_t commentLen;
char* comment;
} SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
@ -1455,7 +1458,7 @@ typedef struct {
static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo));
if (pRebInfo == NULL) {
goto _err;
return NULL;
}
strcpy(pRebInfo->key, key);
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
@ -1490,7 +1493,7 @@ typedef struct {
} SMVSubscribeRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igNotExists;
} SMDropTopicReq;

View File

@ -16,8 +16,8 @@
#ifndef _TD_COMMON_NAME_H_
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tarray.h"
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
@ -65,19 +65,19 @@ bool tNameDBNameEqual(SName* left, SName* right);
typedef struct {
// input
SArray *tags; // element is SSmlKV
const char *sTableName; // super table name
uint8_t sTableNameLen; // the length of super table name
SArray* tags; // element is SSmlKv
const char* sTableName; // super table name
uint8_t sTableNameLen; // the length of super table name
// output
char *childTableName; // must have size of TSDB_TABLE_NAME_LEN;
uint64_t uid; // child table uid, may be useful
char* childTableName; // must have size of TSDB_TABLE_NAME_LEN;
uint64_t uid; // child table uid, may be useful
} RandTableName;
void buildChildTableName(RandTableName *rName);
void buildChildTableName(RandTableName* rName);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_NAME_H_*/
#endif /*_TD_COMMON_NAME_H_*/

View File

@ -179,6 +179,7 @@ typedef enum ENodeType {
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_QUERY,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,

View File

@ -295,6 +295,37 @@ typedef struct SExplainStmt {
SNode* pQuery;
} SExplainStmt;
typedef struct SCmdMsgInfo {
int16_t msgType;
SEpSet epSet;
void* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SCmdMsgInfo;
typedef enum EQueryExecMode {
QUERY_EXEC_MODE_LOCAL = 1,
QUERY_EXEC_MODE_RPC,
QUERY_EXEC_MODE_SCHEDULE,
QUERY_EXEC_MODE_EMPTY_RESULT
} EQueryExecMode;
typedef struct SQuery {
ENodeType type;
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
int32_t placeholderNum;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);

View File

@ -48,36 +48,6 @@ typedef struct SParseContext {
bool isSuperUser;
} SParseContext;
typedef struct SCmdMsgInfo {
int16_t msgType;
SEpSet epSet;
void* pMsg;
int32_t msgLen;
void* pExtension; // todo remove it soon
} SCmdMsgInfo;
typedef enum EQueryExecMode {
QUERY_EXEC_MODE_LOCAL = 1,
QUERY_EXEC_MODE_RPC,
QUERY_EXEC_MODE_SCHEDULE,
QUERY_EXEC_MODE_EMPTY_RESULT
} EQueryExecMode;
typedef struct SQuery {
EQueryExecMode execMode;
bool haveResultSet;
SNode* pRoot;
int32_t numOfResCols;
SSchema* pResSchema;
int8_t precision;
SCmdMsgInfo* pCmdMsg;
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
int32_t placeholderNum;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length);
@ -103,9 +73,10 @@ void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen);
void* smlInitHandle(SQuery *pQuery);
void smlDestroyHandle(void *pHandle);
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen);
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format,
STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus

View File

@ -16,6 +16,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "trpc.h"
#ifdef __cplusplus
@ -154,6 +155,10 @@ struct SStreamTask {
STaskDispatcherShuffle shuffleDispatcher;
};
// msg buffer
int32_t memUsed;
STaosQueue* inputQ;
// application storage
void* ahandle;
};
@ -194,6 +199,8 @@ typedef struct {
SArray* res; // SArray<SSDataBlock>
} SStreamSinkReq;
int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType);
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
#ifdef __cplusplus

View File

@ -35,6 +35,7 @@ void *taosMemoryRealloc(void *ptr, int32_t size);
void *taosMemoryStrDup(void *ptr);
void taosMemoryFree(void *ptr);
int32_t taosMemorySize(void *ptr);
void taosPrintBackTrace();
#define taosMemoryFreeClear(ptr) \
do { \

View File

@ -633,6 +633,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN TAOS_DEF_ERROR_CODE(0, 0x2642)
#define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643)
#define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644)
#define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)

View File

@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo);
void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo);
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);

View File

@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) {
taosMemoryFreeClear(pTscObj);
}
void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) {
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return NULL;
}
pObj->connType = connType;
pObj->pAppInfo = pAppInfo;
tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);

View File

@ -25,7 +25,7 @@
#include "tref.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static bool stringLengthCheck(const char* str, size_t maxsize) {
@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType) {
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj;
@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return NULL;
}
SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj;
}
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
}
taosMemoryFreeClear(db);
connectReq.connType = connType;
connectReq.pid = htonl(appInfo.pid);
connectReq.connType = pObj->connType;
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));

View File

@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
// TODO
if (taos == NULL || sql == NULL) {
// todo directly call fp
}
taos_query_l(taos, sql, (int32_t) strlen(sql));
}
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {

View File

@ -187,7 +187,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->autoCommit = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
return conf;
@ -1307,7 +1307,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
}
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
// TODO
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_list_t* lst = tmq_list_new();
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
return TMQ_RESP_ERR__FAIL;
}
}
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
}

View File

@ -262,7 +262,7 @@ static const SSysDbTableSchema topicSchema[] = {
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
@ -275,7 +275,7 @@ static const SSysDbTableSchema consumerSchema[] = {
static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
};

View File

@ -169,6 +169,9 @@ uint32_t tsMaxRange = 500; // max range
uint32_t tsCurRange = 100; // range
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// udf
bool tsStartUdfd = true;
// internal
int32_t tsTransPullupInterval = 6;
int32_t tsMqRebalanceInterval = 2;
@ -441,6 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1;
return 0;
}
@ -581,6 +585,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval;
if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}

View File

@ -607,6 +607,11 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
if (pReq->commentLen > 0) {
if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -639,6 +644,14 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
}
}
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
if (pReq->commentLen > 0) {
pReq->comment = taosMemoryMalloc(pReq->commentLen);
if (pReq->comment == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;

View File

@ -18,11 +18,9 @@
#include "tcommon.h"
#include "tstrbuild.h"
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; }
#if 0
// TODO refactor
@ -95,12 +93,12 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
start += timezone * t;
@ -142,10 +140,10 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
int32_t tNameLen(const SName* name) {
assert(name != NULL);
char tmp[12] = {0};
char tmp[12] = {0};
int32_t len = sprintf(tmp, "%d", name->acctId);
int32_t len1 = (int32_t) strlen(name->dbname);
int32_t len2 = (int32_t) strlen(name->tname);
int32_t len1 = (int32_t)strlen(name->dbname);
int32_t len2 = (int32_t)strlen(name->tname);
if (name->type == TSDB_DB_NAME_T) {
assert(len2 == 0);
@ -200,9 +198,7 @@ const char* tNameGetTableName(const SName* name) {
return &name->tname[0];
}
void tNameAssign(SName* dst, const SName* src) {
memcpy(dst, src, sizeof(SName));
}
void tNameAssign(SName* dst, const SName* src) { memcpy(dst, src, sizeof(SName)); }
int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t nameLen) {
assert(dst != NULL && dbName != NULL && nameLen > 0);
@ -244,7 +240,6 @@ bool tNameDBNameEqual(SName* left, SName* right) {
return (0 == strcmp(left->dbname, right->dbname));
}
int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
assert(dst != NULL && str != NULL && strlen(str) > 0);
@ -260,14 +255,14 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
if ((type & T_NAME_DB) == T_NAME_DB) {
dst->type = TSDB_DB_NAME_T;
char* start = (char*)((p == NULL)? str:(p+1));
char* start = (char*)((p == NULL) ? str : (p + 1));
int32_t len = 0;
p = strstr(start, TS_PATH_DELIMITER);
if (p == NULL) {
len = (int32_t) strlen(start);
len = (int32_t)strlen(start);
} else {
len = (int32_t) (p - start);
len = (int32_t)(p - start);
}
// too long account id or too long db name
@ -275,21 +270,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return -1;
}
memcpy (dst->dbname, start, len);
memcpy(dst->dbname, start, len);
dst->dbname[len] = 0;
}
if ((type & T_NAME_TABLE) == T_NAME_TABLE) {
dst->type = TSDB_TABLE_NAME_T;
char* start = (char*) ((p == NULL)? str: (p+1));
char* start = (char*)((p == NULL) ? str : (p + 1));
// too long account id or too long db name
int32_t len = (int32_t) strlen(start);
int32_t len = (int32_t)strlen(start);
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
}
memcpy (dst->tname, start, len);
memcpy(dst->tname, start, len);
dst->tname[len] = 0;
}
@ -305,14 +300,14 @@ static int compareKv(const void* p1, const void* p2) {
if (res != 0) {
return res;
} else {
return kvLen1-kvLen2;
return kvLen1 - kvLen2;
}
}
/*
* use stable name and tags to grearate child table name
*/
void buildChildTableName(RandTableName *rName) {
void buildChildTableName(RandTableName* rName) {
int32_t size = taosArrayGetSize(rName->tags);
ASSERT(size > 0);
taosArraySort(rName->tags, compareKv);
@ -320,19 +315,19 @@ void buildChildTableName(RandTableName *rName) {
SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
for (int j = 0; j < size; ++j) {
SSmlKv *tagKv = taosArrayGetP(rName->tags, j);
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
tMD5Update(&context, (uint8_t*)keyJoined, (uint32_t)len);
tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016" PRIx64 "%016" PRIx64, digest1, digest2);
taosStringBuilderDestroy(&sb);
rName->uid = digest1;
}

View File

@ -29,6 +29,7 @@ enum {
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__LOST_IN_REB,
MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
};
int32_t mndInitConsumer(SMnode *pMnode);

View File

@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
}
}
if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto SUBSCRIBE_OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
@ -789,6 +797,10 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break;
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
sdbRelease(pSdb, pConsumer);
continue;
}
taosRLockLatch(&pConsumer->lock);
@ -810,12 +822,12 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
// group id
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(groupId, strlen(varDataVal(groupId)));
// consumer group
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN);
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};

View File

@ -1550,10 +1550,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)t, false);
// single stable model
int8_t m = 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&m, false);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)b, false);

View File

@ -196,7 +196,9 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
return pVgEpNew;
}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
}
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0;

View File

@ -171,14 +171,21 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
return 0;
}
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
int32_t i = 0;
while (key[i] != TMQ_SEPARATOR) {
i++;
}
memcpy(cgroup, key, i);
cgroup[i] = 0;
strcpy(topic, &key[i + 1]);
if (fullName) {
strcpy(topic, &key[i + 1]);
} else {
while (key[i] != '.') {
i++;
}
strcpy(topic, &key[i + 1]);
}
return 0;
}
@ -426,7 +433,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup);
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
@ -444,7 +451,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup);
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
@ -494,7 +501,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// split sub key and extract topic
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup);
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
@ -747,7 +754,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
@ -790,7 +797,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));

View File

@ -298,6 +298,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
@ -307,16 +309,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast);
taosMemoryFree(topicObj.sql);
return -1;
}
} else {
@ -331,6 +339,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFreeClear(topicObj.ast);
taosMemoryFreeClear(topicObj.sql);
taosMemoryFreeClear(topicObj.physicalPlan);
return -1;
}

View File

@ -459,7 +459,7 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
if (strcmp(alterReq.dbname, "1.*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {
@ -483,7 +483,7 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_DB) {
if (strcmp(alterReq.dbname, "*") != 0) {
if (strcmp(alterReq.dbname, "1.*") != 0) {
int32_t len = strlen(alterReq.dbname) + 1;
SDbObj *pDb = mndAcquireDb(pMnode, alterReq.dbname);
if (pDb == NULL) {

View File

@ -233,17 +233,19 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0;
// make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
return -1;
}
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
return -1;
}
memcpy(data, msg, msgLen);
// make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
return -1;
}
SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER,
.pCont = data,

View File

@ -62,11 +62,6 @@ enum {
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
TASK_OVER = 0x4u,
};
typedef struct SResultRowCell {
@ -288,12 +283,6 @@ typedef struct SOperatorInfo {
SOperatorFpSet fpSet;
} SOperatorInfo;
typedef struct {
int32_t numOfTags;
int32_t numOfCols;
SColumnInfo* colList;
} SQueriedTableInfo;
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput);
#endif
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model);

View File

@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
return isTaskKilled(pTaskInfo);
}
void qDestroyTask(qTaskInfo_t qTaskHandle) {

View File

@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
@ -620,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
continue;
}
if (functionNeedToExecute(&pCtx[k])) {
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]);
}
@ -803,9 +802,12 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs; // this can be set during create the struct
if (pCtx[k].fpSet.process != NULL)
pCtx[k].startTs = startTs;
// this can be set during create the struct
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
pCtx[k].fpSet.process(&pCtx[k]);
}
}
}
}
@ -922,6 +924,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
if (IS_VAR_DATA_TYPE(type)) {
// todo disable this
// if (pResultRow->key == NULL) {
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
// varDataCopy(pResultRow->key, pData);
@ -1075,7 +1078,7 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
}
// set the output buffer for the selectivity + tag query
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
int32_t num = 0;
SqlFunctionCtx* p = NULL;
@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
pValCtx[num++] = &pCtx[i];
} else {
} else if (fmIsAggFunc(pCtx[i].functionId)) {
p = &pCtx[i];
}
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
@ -1215,7 +1218,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
(int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
}
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
setSelectValueColumnInfo(pFuncCtx, numOfOutput);
return pFuncCtx;
}
@ -1451,8 +1454,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in
}
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
@ -2009,8 +2010,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
// todo merged with the build group result.
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRowPosition* pPos = &pResultRowInfo->pPosition[i];
@ -2023,17 +2024,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
// }
for (int32_t j = 0; j < numOfOutput; ++j) {
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset);
if (!isRowEntryInitialized(pResInfo)) {
continue;
}
if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr.
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
@ -2187,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
int32_t start = 0;
int32_t step = -1;
int32_t step = 1;
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
if (orderType == TSDB_ORDER_ASC) {
start = pGroupResInfo->index;
step = 1;
} else { // desc order copy all data
start = numOfRows - pGroupResInfo->index - 1;
step = -1;
}
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
@ -2227,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes);
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for(int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
@ -3682,10 +3678,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
bool newgroup = true;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
@ -3698,6 +3692,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
int32_t order = getTableScanOrder(pOperator);
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
@ -3730,8 +3726,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf,
&pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset);
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo,
pAggInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false);
OPTR_SET_OPENED(pOperator);
@ -4014,7 +4010,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
}
// todo dynamic set tags
// todo set tags
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// if (pTableQueryInfo != NULL) {
// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
@ -4028,7 +4025,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
pProjectInfo->pPseudoColInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
@ -4279,11 +4275,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
// pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey;
// pTQueryInfo->groupIndex = i;
}
}
@ -4302,7 +4295,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
int32_t numOfRows = 1;
int32_t numOfRows = 10;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
@ -4313,9 +4306,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
pOperator->resultInfo.capacity = 4096;
pOperator->resultInfo.threshold = 4096 * 0.75;
int32_t numOfGroup = 10; // todo replaced with true value
pInfo->groupId = INT32_MIN;
initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup);
@ -4545,42 +4535,6 @@ _error:
return NULL;
}
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExpr->pParam[0].pCol->type)) {
if (pExpr->pParam[0].pCol->colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
}
while (j < pTableInfo->numOfTags) {
if (pExpr->pParam[0].pCol->colId == pTagCols[j].colId) {
return j;
}
j += 1;
}
} /*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data
return TSDB_UD_COLUMN_INDEX;
} else {
while (j < pTableInfo->numOfCols) {
if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) {
return j;
}
j += 1;
}
}*/
return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value
}
bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols);
return j != INT32_MIN;
}
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision,
const char* name) {
SResSchema s = {0};
@ -4739,7 +4693,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SInterval interval = {
@ -5240,28 +5194,6 @@ _complete:
return code;
}
static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs,
int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) {
int16_t functId = getExprFunctionId(&pExprs[i]);
if (functId == FUNCTION_TOP || functId == FUNCTION_BOTTOM) {
int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols);
if (j < 0 || j >= pTableInfo->numOfCols) {
return TSDB_CODE_QRY_INVALID_MSG;
} else {
SColumnInfo* pCol = &pTableInfo->colList[j];
// int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i,
// &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes,
// &pExprs[i].base.interBytes, tagLen, superTable, NULL);
// assert(ret == TSDB_CODE_SUCCESS);
}
}
}
return TSDB_CODE_SUCCESS;
}
void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);

View File

@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
pInfo->binfo.rowCellInfoOffset);
// if (!stableQuery) { // finalize include the update of result rows
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
// } else {

View File

@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
closeAllResultRows(&pInfo->binfo.resultRowInfo);
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf,
&pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
pInfo->binfo.rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
OPTR_SET_OPENED(pOperator);
@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
pBInfo->rowCellInfoOffset);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);

View File

@ -23,34 +23,34 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "tglobal.h"
#include "executor.h"
#include "executorimpl.h"
#include "function.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tdatablock.h"
#include "trpc.h"
#include "stub.h"
#include "executor.h"
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tname.h"
#include "trpc.h"
#include "tvariant.h"
namespace {
enum {
data_rand = 0x1,
data_asc = 0x2,
data_asc = 0x2,
data_desc = 0x3,
};
typedef struct SDummyInputInfo {
int32_t totalPages; // numOfPages
int32_t totalPages; // numOfPages
int32_t current;
int32_t startVal;
int32_t type;
int32_t numOfRowsPerPage;
int32_t numOfCols; // number of columns
int32_t numOfCols; // number of columns
int64_t tsStart;
SSDataBlock* pBlock;
} SDummyInputInfo;
@ -75,26 +75,26 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo);
// SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
// colInfo1.info.bytes = 40;
// colInfo1.info.colId = 2;
//
// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
// colInfo1.varmeta.length = 0;
// colInfo1.varmeta.offset = static_cast<int32_t*>(taosMemoryCalloc(1, numOfRows * sizeof(int32_t)));
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
// SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
// colInfo1.info.bytes = 40;
// colInfo1.info.colId = 2;
//
// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t);
// colInfo1.varmeta.length = 0;
// colInfo1.varmeta.offset = static_cast<int32_t*>(taosMemoryCalloc(1, numOfRows * sizeof(int32_t)));
//
// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1);
} else {
blockDataCleanup(pInfo->pBlock);
}
SSDataBlock* pBlock = pInfo->pBlock;
char buf[128] = {0};
char b1[128] = {0};
char buf[128] = {0};
char b1[128] = {0};
int32_t v = 0;
for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) {
for (int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
if (pInfo->type == data_desc) {
@ -107,11 +107,11 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock->info.rows = pInfo->numOfRowsPerPage;
@ -137,7 +137,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
colInfo.info.bytes = sizeof(int64_t);
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(taosMemoryCalloc(pInfo->numOfRowsPerPage, sizeof(int64_t)));
// colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
// colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8));
taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo);
@ -156,11 +156,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pInfo->pBlock;
char buf[128] = {0};
char b1[128] = {0};
char buf[128] = {0};
char b1[128] = {0};
int64_t ts = 0;
int32_t v = 0;
for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) {
int32_t v = 0;
for (int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
ts = (++pInfo->tsStart);
@ -177,11 +177,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
colDataAppend(pColInfo1, i, reinterpret_cast<const char*>(&v), false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
// sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf);
//
// SColumnInfoData* pColInfo2 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
// colDataAppend(pColInfo2, i, b1, false);
}
pBlock->info.rows = pInfo->numOfRowsPerPage;
@ -191,10 +191,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow(pBlock);
return pBlock;
}
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) {
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type,
int32_t numOfCols) {
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(taosMemoryCalloc(1, sizeof(SOperatorInfo)));
pOperator->name = "dummyInputOpertor4Test";
@ -204,24 +204,25 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
pOperator->fpSet.getNextFn = get2ColsDummyBlock;
}
SDummyInputInfo *pInfo = (SDummyInputInfo*) taosMemoryCalloc(1, sizeof(SDummyInputInfo));
SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo));
pInfo->totalPages = numOfBlocks;
pInfo->startVal = startVal;
pInfo->startVal = startVal;
pInfo->numOfRowsPerPage = rowsPerPage;
pInfo->type = type;
pInfo->tsStart = 1620000000000;
pInfo->type = type;
pInfo->tsStart = 1620000000000;
pOperator->info = pInfo;
return pOperator;
}
}
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, build_executor_tree_Test) {
const char* msg = "{\n"
const char* msg =
"{\n"
" \"NodeType\": \"48\",\n"
" \"Name\": \"PhysiSubplan\",\n"
" \"PhysiSubplan\": {\n"
@ -938,16 +939,19 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr;
SReadHandle handle = { reinterpret_cast<void*>(0x1), reinterpret_cast<void*>(0x1), NULL };
SReadHandle handle = {reinterpret_cast<void*>(0x1), reinterpret_cast<void*>(0x1), NULL};
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(msg, &plan);
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(msg, &plan);
ASSERT_EQ(code, 0);
code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
ASSERT_EQ(code, 0);
}
TEST(testCase, index_plan_test) {
// add later
EXPECT_EQ(0, 0);
}
#if 0
TEST(testCase, inMem_sort_Test) {
@ -983,19 +987,19 @@ TEST(testCase, inMem_sort_Test) {
typedef struct su {
int32_t v;
char *c;
char* c;
} su;
int32_t cmp(const void* p1, const void* p2) {
su* v1 = (su*) p1;
su* v2 = (su*) p2;
su* v1 = (su*)p1;
su* v2 = (su*)p2;
int32_t x1 = *(int32_t*) v1->c;
int32_t x2 = *(int32_t*) v2->c;
int32_t x1 = *(int32_t*)v1->c;
int32_t x2 = *(int32_t*)v2->c;
if (x1 == x2) {
return 0;
} else {
return x1 < x2? -1:1;
return x1 < x2 ? -1 : 1;
}
}
@ -1228,4 +1232,4 @@ TEST(testCase, time_interval_Operator_Test) {
}
#endif
#pragma GCC diagnosti
#pragma GCC diagnosti

View File

@ -25,6 +25,7 @@ extern "C" {
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);

View File

@ -987,6 +987,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_c0",
.type = FUNCTION_TYPE_ROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "tbname",
.type = FUNCTION_TYPE_TBNAME,
@ -1064,7 +1074,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateSelectValue,
.getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
.initFunc = functionSetup,
.sprocessFunc = NULL,
.processFunc = NULL,
.finalizeFunc = NULL
}
};

View File

@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
/*cleanupResultRowEntry(pResInfo);*/
char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
return 0;
}
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

View File

@ -18,6 +18,7 @@
#include "tudf.h"
#include "tudfInt.h"
#include "tarray.h"
#include "tglobal.h"
#include "tdatablock.h"
#include "querynodes.h"
#include "builtinsimpl.h"
@ -138,6 +139,10 @@ static void udfWatchUdfd(void *args) {
}
int32_t udfStartUdfd(int32_t startDnodeId) {
if (!tsStartUdfd) {
fnInfo("start udfd is disabled.")
return 0;
}
SUdfdData *pData = &udfdGlobal;
if (pData->startCalled) {
fnInfo("dnode-mgmt start udfd already called");

View File

@ -278,9 +278,9 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
printf("json val: %s\n", c->colVal);
if (0 != strncmp(c->colVal, term->colName, term->nColName)) {
continue;
// printf("json val: %s\n", c->colVal);
if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
break;
}
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
@ -640,30 +640,30 @@ static int indexFindCh(char* a, char c) {
return p - a;
}
static int indexCacheJsonTermCompareImpl(char* a, char* b) {
int alen = indexFindCh(a, '&');
int blen = indexFindCh(b, '&');
// int alen = indexFindCh(a, '&');
// int blen = indexFindCh(b, '&');
int cmp = strncmp(a, b, MIN(alen, blen));
if (cmp == 0) {
cmp = alen - blen;
if (cmp != 0) {
return cmp;
}
cmp = *(a + alen) - *(b + blen);
if (cmp != 0) {
return cmp;
}
alen += 2;
blen += 2;
cmp = strcmp(a + alen, b + blen);
}
return cmp;
// int cmp = strncmp(a, b, MIN(alen, blen));
// if (cmp == 0) {
// cmp = alen - blen;
// if (cmp != 0) {
// return cmp;
// }
// cmp = *(a + alen) - *(b + blen);
// if (cmp != 0) {
// return cmp;
// }
// alen += 2;
// blen += 2;
// cmp = strcmp(a + alen, b + blen);
//}
return 0;
}
static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
// compare colVal
int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal);
int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) {
return rt->version - lt->version;
}
@ -704,6 +704,8 @@ static bool indexCacheIteratorNext(Iterate* itera) {
iv->type = ct->operaType;
iv->ver = ct->version;
iv->colVal = tstrdup(ct->colVal);
// printf("col Val: %s\n", iv->colVal);
// iv->colType = cv->colType;
taosArrayPush(iv->val, &ct->uid);
}

View File

@ -334,7 +334,12 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
// if (0 != strncmp(ch, tem->colName, tem->nColName)) {
// swsResultDestroy(rt);
// break;
//}
TExeCond cond = cmpFn(ch, p, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
@ -455,16 +460,22 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
fstStreamBuilderSetRange(sb, &h, ctype);
fstSliceDestroy(&h);
// FstSlice h = fstSliceCreate((uint8_t*)p, skip);
// fstStreamBuilderSetRange(sb, &h, ctype);
// fstSliceDestroy(&h);
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
char* ch = (char*)fstSliceData(s, NULL);
if (0 != strncmp(ch, p, skip)) {
swsResultDestroy(rt);
break;
}
TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
@ -594,13 +605,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
if (tfileWriteData(tw, v) != 0) {
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
// printf("write faile\n");
} else {
// printf("write sucee\n");
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
}
}
fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb);
tw->fb = NULL;
@ -845,18 +859,24 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
uint8_t colType = header->colType;
colType = INDEX_TYPE_GET_TYPE(colType);
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
if (fstBuilderInsert(write->fb, key, tval->offset)) {
fstSliceDestroy(&key);
return 0;
}
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
if (fstBuilderInsert(write->fb, key, tval->offset)) {
fstSliceDestroy(&key);
return -1;
} else {
// handle other type later
return 0;
}
return 0;
return -1;
// if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
// FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
// if (fstBuilderInsert(write->fb, key, tval->offset)) {
// fstSliceDestroy(&key);
// return 0;
// }
// fstSliceDestroy(&key);
// return -1;
//} else {
// // handle other type later
//}
}
static int tfileWriteFooter(TFileWriter* write) {
char buf[sizeof(tfileMagicNumber) + 1] = {0};
@ -913,8 +933,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
// TODO(yihao): opt later
WriterCtx* ctx = reader->ctx;
char block[1024] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
// add block cache
char block[1024] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
assert(nread >= sizeof(uint32_t));
char* p = block;

View File

@ -272,9 +272,8 @@ void checkFstCheckIterator1() {
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
fw->Put("Hello world", 1);
fw->Put("Hello worle", 2);
fw->Put("hello worlf", 4);
fw->Put("test1&^D&10", 1);
fw->Put("test2&^D&10", 2);
delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64);
@ -645,11 +644,11 @@ int main(int argc, char* argv[]) {
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
//}
checkFstCheckIterator1();
checkFstCheckIterator2();
checkFstCheckIteratorPrefix();
checkFstCheckIteratorRange1();
checkFstCheckIteratorRange2();
checkFstCheckIteratorRange3();
// checkFstCheckIterator2();
// checkFstCheckIteratorPrefix();
// checkFstCheckIteratorRange1();
// checkFstCheckIteratorRange2();
// checkFstCheckIteratorRange3();
// checkFstLongTerm();
// checkFstPrefixSearch();

View File

@ -181,3 +181,240 @@ TEST_F(JsonEnv, testWriteMillonData) {
}
}
}
TEST_F(JsonEnv, testWriteJsonNumberData) {
{
std::string colName("test");
std::string colVal("10");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test2");
std::string colVal("20");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test2");
std::string colVal("15");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test2");
std::string colVal("15");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
{
std::string colName("test1");
std::string colVal("10");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test");
std::string colVal("xxxxxxxxxxxxxxxxxxx");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}

View File

@ -907,6 +907,13 @@ SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, S
return (SNode*)pStmt;
}
static SNode* createAlterTableStmtFinalize(SNode* pRealTable, SAlterTableStmt* pStmt) {
strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName);
strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName);
nodesDestroyNode(pRealTable);
return (SNode*)pStmt;
}
SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions) {
if (NULL == pRealTable) {
return NULL;
@ -915,7 +922,7 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable,
CHECK_OUT_OF_MEM(pStmt);
pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_OPTIONS;
pStmt->pOptions = (STableOptions*)pOptions;
return (SNode*)pStmt;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType,
@ -928,7 +935,7 @@ SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable,
pStmt->alterType = alterType;
strncpy(pStmt->colName, pColName->z, pColName->n);
pStmt->dataType = dataType;
return (SNode*)pStmt;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName) {
@ -939,7 +946,7 @@ SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_
CHECK_OUT_OF_MEM(pStmt);
pStmt->alterType = alterType;
strncpy(pStmt->colName, pColName->z, pColName->n);
return (SNode*)pStmt;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType,
@ -952,7 +959,7 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int
pStmt->alterType = alterType;
strncpy(pStmt->colName, pOldColName->z, pOldColName->n);
strncpy(pStmt->newColName, pNewColName->z, pNewColName->n);
return (SNode*)pStmt;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const SToken* pTagName, SNode* pVal) {
@ -964,7 +971,7 @@ SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const
pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_TAG_VAL;
strncpy(pStmt->colName, pTagName->z, pTagName->n);
pStmt->pVal = (SValueNode*)pVal;
return (SNode*)pStmt;
return createAlterTableStmtFinalize(pRealTable, pStmt);
}
SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {

View File

@ -214,6 +214,7 @@ static SKeyword keywordTable[] = {
{"WINDOW_CLOSE", TK_WINDOW_CLOSE},
{"WITH", TK_WITH},
{"WRITE", TK_WRITE},
{"_C0", TK_ROWTS},
{"_QENDTS", TK_QENDTS},
{"_QSTARTTS", TK_QSTARTTS},
{"_ROWTS", TK_ROWTS},

View File

@ -782,6 +782,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, bool* pHasSel
return DEAL_RES_ERROR;
}
strcpy(pFunc->functionName, "_select_value");
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
translateFunction(pCxt, pFunc);
@ -2540,10 +2541,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->igExists = pStmt->ignoreExists;
pReq->xFilesFactor = pStmt->pOptions->filesFactor;
pReq->delay = pStmt->pOptions->delay;
pReq->ttl = pStmt->pOptions->ttl;
columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
if ('\0' != pStmt->pOptions->comment[0]) {
pReq->comment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->comment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->commentLen = strlen(pStmt->pOptions->comment) + 1;
}
SName tableName;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
@ -2602,6 +2611,18 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS
}
static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) {
if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) {
pAlterReq->ttl = pStmt->pOptions->ttl;
if ('\0' != pStmt->pOptions->comment[0]) {
pAlterReq->comment = strdup(pStmt->pOptions->comment);
if (NULL == pAlterReq->comment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pAlterReq->commentLen = strlen(pStmt->pOptions->comment) + 1;
}
return TSDB_CODE_SUCCESS;
}
pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
if (NULL == pAlterReq->pFields) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -2614,7 +2635,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterR
case TSDB_ALTER_TABLE_DROP_COLUMN:
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: {
TAOS_FIELD field = {.type = pStmt->dataType.type, .bytes = pStmt->dataType.bytes};
TAOS_FIELD field = {.type = pStmt->dataType.type, .bytes = calcTypeBytes(pStmt->dataType)};
strcpy(field.name, pStmt->colName);
taosArrayPush(pAlterReq->pFields, &field);
break;
@ -2625,7 +2646,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterR
strcpy(oldField.name, pStmt->colName);
taosArrayPush(pAlterReq->pFields, &oldField);
TAOS_FIELD newField = {0};
strcpy(oldField.name, pStmt->newColName);
strcpy(newField.name, pStmt->newColName);
taosArrayPush(pAlterReq->pFields, &newField);
break;
}
@ -2642,7 +2663,7 @@ static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pSt
SName tableName;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name);
alterReq.alterType = pStmt->alterType;
if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) {
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) {
return TSDB_CODE_FAILED;
} else {
if (TSDB_CODE_SUCCESS != setAlterTableField(pStmt, &alterReq)) {
@ -2929,7 +2950,6 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, pReq->name);
/*tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);*/
pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
@ -2993,7 +3013,8 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
SMDropTopicReq dropReq = {0};
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), dropReq.name);
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, dropReq.name);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);
@ -3033,28 +3054,48 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq);
}
static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SCMCreateStreamReq createReq = {0};
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
}
createReq.igExists = pStmt->ignoreExists;
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) {
return TSDB_CODE_SUCCESS;
}
}
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY);
}
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, ((SRealTableNode*)(((SSelectStmt*)pStmt)->pFromTable))->table.dbName);
tNameGetFullDbName(&name, pDbFName);
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), createReq.name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
if ('\0' != pStmt->targetTabName[0]) {
strcpy(name.dbname, pStmt->targetDbName);
strcpy(name.tname, pStmt->targetTabName);
tNameExtractFullName(&name, createReq.targetStbFullName);
tNameExtractFullName(&name, pReq->targetStbFullName);
}
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
createReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == createReq.sql) {
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -3064,11 +3105,20 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
: TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
createReq.triggerType = pStmt->pOptions->triggerType;
createReq.watermark =
(NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->triggerType = pStmt->pOptions->triggerType;
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
}
return code;
}
static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SCMCreateStreamReq createReq = {0};
int32_t code = checkCreateStream(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateStreamReq(pCxt, pStmt, &createReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq);
}
@ -3166,7 +3216,7 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB;
}
strcpy(req.user, pStmt->userName);
strcpy(req.dbname, pStmt->dbName);
sprintf(req.dbname, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName);
return buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
}

View File

@ -19,7 +19,7 @@ using namespace std;
namespace ParserTest {
class ParserInitialATest : public ParserTestBase {};
class ParserInitialATest : public ParserDdlTest {};
TEST_F(ParserInitialATest, alterAccount) {
useDb("root", "test");
@ -72,16 +72,103 @@ TEST_F(ParserInitialATest, alterDatabase) {
TEST_F(ParserInitialATest, alterTable) {
useDb("root", "test");
// run("ALTER TABLE t1 TTL 10");
// run("ALTER TABLE t1 COMMENT 'test'");
SMAlterStbReq expect = {0};
auto setAlterStbReqFunc = [&](const char* pTbname, int8_t alterType, int32_t numOfFields = 0,
const char* pField1Name = nullptr, int8_t field1Type = 0, int32_t field1Bytes = 0,
const char* pField2Name = nullptr, const char* pComment = nullptr,
int32_t ttl = TSDB_DEFAULT_TABLE_TTL) {
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname);
expect.name[len] = '\0';
expect.alterType = alterType;
expect.ttl = ttl;
if (nullptr != pComment) {
expect.comment = strdup(pComment);
expect.commentLen = strlen(pComment) + 1;
}
expect.numOfFields = numOfFields;
if (NULL == expect.pFields) {
expect.pFields = taosArrayInit(2, sizeof(TAOS_FIELD));
TAOS_FIELD field = {0};
taosArrayPush(expect.pFields, &field);
taosArrayPush(expect.pFields, &field);
}
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0);
if (NULL != pField1Name) {
strcpy(pField->name, pField1Name);
pField->name[strlen(pField1Name)] = '\0';
} else {
memset(pField, 0, sizeof(TAOS_FIELD));
}
pField->type = field1Type;
pField->bytes = field1Bytes > 0 ? field1Bytes : (field1Type > 0 ? tDataTypes[field1Type].bytes : 0);
pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1);
if (NULL != pField2Name) {
strcpy(pField->name, pField2Name);
pField->name[strlen(pField2Name)] = '\0';
} else {
memset(pField, 0, sizeof(TAOS_FIELD));
}
pField->type = 0;
pField->bytes = 0;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT);
SMAlterStbReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.alterType, expect.alterType);
ASSERT_EQ(req.numOfFields, expect.numOfFields);
if (expect.numOfFields > 0) {
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 0);
TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
}
if (expect.numOfFields > 1) {
TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 1);
TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
}
});
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10);
run("ALTER TABLE t1 TTL 10");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test");
run("ALTER TABLE t1 COMMENT 'test'");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1");
run("ALTER TABLE t1 DROP COLUMN c1");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR,
20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)");
setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1");
run("ALTER TABLE t1 RENAME COLUMN c1 cc1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT);
run("ALTER TABLE st1 ADD TAG tag11 BIGINT");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_DROP_TAG, 1, "tag1");
run("ALTER TABLE st1 DROP TAG tag1");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, 1, "tag1", TSDB_DATA_TYPE_VARCHAR,
20 + VARSTR_HEADER_SIZE);
run("ALTER TABLE st1 MODIFY TAG tag1 VARCHAR(20)");
setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11");
run("ALTER TABLE st1 RENAME TAG tag1 tag11");
// run("ALTER TABLE st1s1 SET TAG tag1=10");

View File

@ -19,7 +19,7 @@ using namespace std;
namespace ParserTest {
class ParserInitialCTest : public ParserTestBase {};
class ParserInitialCTest : public ParserDdlTest {};
// todo compact
@ -97,17 +97,143 @@ TEST_F(ParserInitialCTest, createSnode) {
TEST_F(ParserInitialCTest, createStable) {
useDb("root", "test");
SMCreateStbReq expect = {0};
auto setCreateStbReqFunc = [&](const char* pTbname, int8_t igExists = 0,
float xFilesFactor = TSDB_DEFAULT_ROLLUP_FILE_FACTOR,
int32_t delay = TSDB_DEFAULT_ROLLUP_DELAY, int32_t ttl = TSDB_DEFAULT_TABLE_TTL,
const char* pComment = nullptr) {
memset(&expect, 0, sizeof(SMCreateStbReq));
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname);
expect.name[len] = '\0';
expect.igExists = igExists;
expect.xFilesFactor = xFilesFactor;
expect.delay = delay;
expect.ttl = ttl;
if (nullptr != pComment) {
expect.comment = strdup(pComment);
expect.commentLen = strlen(pComment) + 1;
}
};
auto addFieldToCreateStbReqFunc = [&](bool col, const char* pFieldName, uint8_t type, int32_t bytes = 0,
int8_t flags = SCHEMA_SMA_ON) {
SField field = {0};
strcpy(field.name, pFieldName);
field.type = type;
field.bytes = bytes > 0 ? bytes : tDataTypes[type].bytes;
field.flags = flags;
if (col) {
if (NULL == expect.pColumns) {
expect.pColumns = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField));
}
taosArrayPush(expect.pColumns, &field);
expect.numOfColumns += 1;
} else {
if (NULL == expect.pTags) {
expect.pTags = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField));
}
taosArrayPush(expect.pTags, &field);
expect.numOfTags += 1;
}
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_TABLE_STMT);
SMCreateStbReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(req.xFilesFactor, expect.xFilesFactor);
ASSERT_EQ(req.delay, expect.delay);
ASSERT_EQ(req.ttl, expect.ttl);
ASSERT_EQ(req.numOfColumns, expect.numOfColumns);
ASSERT_EQ(req.numOfTags, expect.numOfTags);
ASSERT_EQ(req.commentLen, expect.commentLen);
ASSERT_EQ(req.ast1Len, expect.ast1Len);
ASSERT_EQ(req.ast2Len, expect.ast2Len);
if (expect.numOfColumns > 0) {
ASSERT_EQ(taosArrayGetSize(req.pColumns), expect.numOfColumns);
ASSERT_EQ(taosArrayGetSize(req.pColumns), taosArrayGetSize(expect.pColumns));
for (int32_t i = 0; i < expect.numOfColumns; ++i) {
SField* pField = (SField*)taosArrayGet(req.pColumns, i);
SField* pExpectField = (SField*)taosArrayGet(expect.pColumns, i);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
ASSERT_EQ(pField->flags, pExpectField->flags);
}
}
if (expect.numOfTags > 0) {
ASSERT_EQ(taosArrayGetSize(req.pTags), expect.numOfTags);
ASSERT_EQ(taosArrayGetSize(req.pTags), taosArrayGetSize(expect.pTags));
for (int32_t i = 0; i < expect.numOfTags; ++i) {
SField* pField = (SField*)taosArrayGet(req.pTags, i);
SField* pExpectField = (SField*)taosArrayGet(expect.pTags, i);
ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name));
ASSERT_EQ(pField->type, pExpectField->type);
ASSERT_EQ(pField->bytes, pExpectField->bytes);
ASSERT_EQ(pField->flags, pExpectField->flags);
}
}
if (expect.commentLen > 0) {
ASSERT_EQ(std::string(req.comment), std::string(expect.comment));
}
if (expect.ast1Len > 0) {
ASSERT_EQ(std::string(req.pAst1), std::string(expect.pAst1));
}
if (expect.ast2Len > 0) {
ASSERT_EQ(std::string(req.pAst2), std::string(expect.pAst2));
}
});
setCreateStbReqFunc("t1");
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP);
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
addFieldToCreateStbReqFunc(false, "id", TSDB_DATA_TYPE_INT);
run("create stable t1(ts timestamp, c1 int) TAGS(id int)");
setCreateStbReqFunc("t1", 1, 0.1, 2, 100, "test create table");
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT);
addFieldToCreateStbReqFunc(true, "c3", TSDB_DATA_TYPE_BIGINT);
addFieldToCreateStbReqFunc(true, "c4", TSDB_DATA_TYPE_UBIGINT, 0, 0);
addFieldToCreateStbReqFunc(true, "c5", TSDB_DATA_TYPE_FLOAT, 0, 0);
addFieldToCreateStbReqFunc(true, "c6", TSDB_DATA_TYPE_DOUBLE, 0, 0);
addFieldToCreateStbReqFunc(true, "c7", TSDB_DATA_TYPE_BINARY, 20 + VARSTR_HEADER_SIZE, 0);
addFieldToCreateStbReqFunc(true, "c8", TSDB_DATA_TYPE_SMALLINT, 0, 0);
addFieldToCreateStbReqFunc(true, "c9", TSDB_DATA_TYPE_USMALLINT, 0, 0);
addFieldToCreateStbReqFunc(true, "c10", TSDB_DATA_TYPE_TINYINT, 0, 0);
addFieldToCreateStbReqFunc(true, "c11", TSDB_DATA_TYPE_UTINYINT, 0, 0);
addFieldToCreateStbReqFunc(true, "c12", TSDB_DATA_TYPE_BOOL, 0, 0);
addFieldToCreateStbReqFunc(true, "c13", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 0);
addFieldToCreateStbReqFunc(true, "c14", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE, 0);
addFieldToCreateStbReqFunc(false, "a1", TSDB_DATA_TYPE_TIMESTAMP);
addFieldToCreateStbReqFunc(false, "a2", TSDB_DATA_TYPE_INT);
addFieldToCreateStbReqFunc(false, "a3", TSDB_DATA_TYPE_UINT);
addFieldToCreateStbReqFunc(false, "a4", TSDB_DATA_TYPE_BIGINT);
addFieldToCreateStbReqFunc(false, "a5", TSDB_DATA_TYPE_UBIGINT);
addFieldToCreateStbReqFunc(false, "a6", TSDB_DATA_TYPE_FLOAT);
addFieldToCreateStbReqFunc(false, "a7", TSDB_DATA_TYPE_DOUBLE);
addFieldToCreateStbReqFunc(false, "a8", TSDB_DATA_TYPE_BINARY, 20 + VARSTR_HEADER_SIZE);
addFieldToCreateStbReqFunc(false, "a9", TSDB_DATA_TYPE_SMALLINT);
addFieldToCreateStbReqFunc(false, "a10", TSDB_DATA_TYPE_USMALLINT);
addFieldToCreateStbReqFunc(false, "a11", TSDB_DATA_TYPE_TINYINT);
addFieldToCreateStbReqFunc(false, "a12", TSDB_DATA_TYPE_UTINYINT);
addFieldToCreateStbReqFunc(false, "a13", TSDB_DATA_TYPE_BOOL);
addFieldToCreateStbReqFunc(false, "a14", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
addFieldToCreateStbReqFunc(false, "a15", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE);
run("create stable if not exists test.t1("
"ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), c8 "
"SMALLINT, "
"c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, c13 NCHAR(30), "
"c15 VARCHAR(50)) "
"TAGS (tsa TIMESTAMP, a1 INT, a2 INT UNSIGNED, a3 BIGINT, a4 BIGINT UNSIGNED, a5 FLOAT, a6 DOUBLE, a7 "
"BINARY(20), a8 SMALLINT, "
"a9 SMALLINT UNSIGNED COMMENT 'test column comment', a10 TINYINT, a11 TINYINT UNSIGNED, a12 BOOL, a13 NCHAR(30), "
"a15 VARCHAR(50)) "
"ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), "
"c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, "
"c13 NCHAR(30), c14 VARCHAR(50)) "
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, "
"a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, "
"a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) "
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (min) FILE_FACTOR 0.1 DELAY 2");
}

View File

@ -49,6 +49,8 @@ struct TerminateFlag : public exception {
class ParserTestBaseImpl {
public:
ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase) {}
void useDb(const string& acctId, const string& db) {
caseEnv_.acctId_ = acctId;
caseEnv_.db_ = db;
@ -156,11 +158,13 @@ class ParserTestBaseImpl {
void doParse(SParseContext* pCxt, SQuery** pQuery) {
DO_WITH_THROW(parse, pCxt, pQuery);
ASSERT_NE(*pQuery, nullptr);
res_.parsedAst_ = toString((*pQuery)->pRoot);
}
void doTranslate(SParseContext* pCxt, SQuery* pQuery) {
DO_WITH_THROW(translate, pCxt, pQuery);
checkQuery(pQuery, PARSER_STAGE_TRANSLATE);
res_.translatedAst_ = toString(pQuery->pRoot);
}
@ -178,12 +182,15 @@ class ParserTestBaseImpl {
return str;
}
caseEnv caseEnv_;
stmtEnv stmtEnv_;
stmtRes res_;
void checkQuery(const SQuery* pQuery, ParserStage stage) { pBase_->checkDdl(pQuery, stage); }
caseEnv caseEnv_;
stmtEnv stmtEnv_;
stmtRes res_;
ParserTestBase* pBase_;
};
ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl()) {}
ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {}
ParserTestBase::~ParserTestBase() {}
@ -193,4 +200,6 @@ void ParserTestBase::run(const std::string& sql, int32_t expect, ParserStage che
return impl_->run(sql, expect, checkStage);
}
void ParserTestBase::checkDdl(const SQuery* pQuery, ParserStage stage) { return; }
} // namespace ParserTest

View File

@ -18,6 +18,9 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "querynodes.h"
#include "taoserror.h"
namespace ParserTest {
@ -34,10 +37,32 @@ class ParserTestBase : public testing::Test {
void useDb(const std::string& acctId, const std::string& db);
void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL);
virtual void checkDdl(const SQuery* pQuery, ParserStage stage);
private:
std::unique_ptr<ParserTestBaseImpl> impl_;
};
class ParserDdlTest : public ParserTestBase {
public:
void setCheckDdlFunc(const std::function<void(const SQuery*, ParserStage)>& func) { checkDdl_ = func; }
virtual void checkDdl(const SQuery* pQuery, ParserStage stage) {
ASSERT_NE(pQuery, nullptr);
ASSERT_EQ(pQuery->haveResultSet, false);
ASSERT_NE(pQuery->pRoot, nullptr);
ASSERT_EQ(pQuery->numOfResCols, 0);
ASSERT_EQ(pQuery->pResSchema, nullptr);
ASSERT_EQ(pQuery->precision, 0);
if (nullptr != checkDdl_) {
checkDdl_(pQuery, stage);
}
}
private:
std::function<void(const SQuery*, ParserStage)> checkDdl_;
};
extern bool g_isDump;
} // namespace ParserTest

View File

@ -113,7 +113,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
}
int tdbBtreeClose(SBTree *pBt) {
// TODO
if (pBt) {
tdbOsFree(pBt);
}
return 0;
}

View File

@ -66,7 +66,10 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn
}
int tdbDbClose(TDB *pDb) {
// TODO
if (pDb) {
tdbBtreeClose(pDb->pBt);
tdbOsFree(pDb);
}
return 0;
}

View File

@ -56,7 +56,7 @@ int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) {
pEnv->nPgrHash = 8;
tsize = sizeof(SPager *) * pEnv->nPgrHash;
pEnv->pgrHash = tdbRealloc(pEnv->pgrHash, tsize);
pEnv->pgrHash = tdbOsMalloc(tsize);
if (pEnv->pgrHash == NULL) {
return -1;
}
@ -69,7 +69,19 @@ int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) {
}
int tdbEnvClose(TENV *pEnv) {
// TODO
SPager *pPager;
if (pEnv) {
for (pPager = pEnv->pgrList; pPager; pPager = pEnv->pgrList) {
pEnv->pgrList = pPager->pNext;
tdbPagerClose(pPager);
}
tdbPCacheClose(pEnv->pCache);
tdbOsFree(pEnv->pgrHash);
tdbOsFree(pEnv);
}
return 0;
}

View File

@ -292,6 +292,10 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
pPage->pFreeNext = pCache->pFree;
pCache->pFree = pPage;
pCache->nFree++;
// add to local list
pPage->pCacheNext = pCache->pList;
pCache->pList = pPage;
}
// Open the hash table
@ -317,9 +321,10 @@ static int tdbPCacheCloseImpl(SPCache *pCache) {
for (pPage = pCache->pList; pPage; pPage = pCache->pList) {
pCache->pList = pPage->pCacheNext;
tdbPageDestroy(pPage, NULL, NULL);
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
}
tdbOsFree(pCache->pgHash);
tdbPCacheDestroyLock(pCache);
return 0;
}

View File

@ -88,7 +88,13 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
}
int tdbPagerClose(SPager *pPager) {
// TODO
if (pPager) {
if (pPager->inTran) {
tdbOsClose(pPager->jfd);
}
tdbOsClose(pPager->fd);
tdbOsFree(pPager);
}
return 0;
}

View File

@ -105,8 +105,8 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef struct {

View File

@ -17,15 +17,25 @@ endif ()
if(USE_TD_MEMORY)
add_definitions(-DUSE_TD_MEMORY)
endif ()
if(BUILD_ADDR2LINE)
target_include_directories(
os
PUBLIC "${TD_SOURCE_DIR}/contrib/libdwarf/src/lib/libdwarf"
)
add_definitions(-DUSE_ADDR2LINE)
target_link_libraries(
os PUBLIC addr2line dl z
)
endif ()
target_link_libraries(
os pthread
os PUBLIC pthread
)
if(NOT TD_WINDOWS)
target_link_libraries(
os dl m rt
os PUBLIC dl m rt
)
else()
target_link_libraries(
os ws2_32 iconv msvcregex wcwidth winmm
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm
)
endif(NOT TD_WINDOWS)

View File

@ -17,7 +17,7 @@
#include <malloc.h>
#include "os.h"
#ifdef USE_TD_MEMORY
#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
#define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S')
@ -71,14 +71,112 @@ int32_t taosBackTrace(void **buffer, int32_t size) {
return frame;
}
#endif
// char **taosBackTraceSymbols(int32_t *size) {
// void *buffer[20] = {NULL};
// *size = taosBackTrace(buffer, 20);
// return backtrace_symbols(buffer, *size);
// }
#ifdef USE_ADDR2LINE
#include "osThread.h"
#include "libdwarf.h"
#include "dwarf.h"
#define DW_PR_DUu "llu"
typedef struct lookup_table
{
Dwarf_Line *table;
Dwarf_Line_Context *ctxts;
int cnt;
Dwarf_Addr low;
Dwarf_Addr high;
} lookup_tableT;
extern int create_lookup_table(Dwarf_Debug dbg, lookup_tableT *lookup_table);
extern void delete_lookup_table(lookup_tableT *lookup_table);
size_t addr = 0;
lookup_tableT lookup_table;
Dwarf_Debug tDbg;
static TdThreadOnce traceThreadInit = PTHREAD_ONCE_INIT;
void endTrace() {
if (traceThreadInit != PTHREAD_ONCE_INIT) {
delete_lookup_table(&lookup_table);
dwarf_finish(tDbg);
}
}
void startTrace() {
int ret;
Dwarf_Ptr errarg = 0;
FILE *fp = fopen("/proc/self/maps", "r");
fscanf(fp, "%lx-", &addr);
fclose(fp);
ret = dwarf_init_path("/proc/self/exe", NULL, 0, DW_GROUPNUMBER_ANY, NULL, errarg, &tDbg, NULL);
if (ret == DW_DLV_NO_ENTRY) {
printf("Unable to open file");
return;
}
ret = create_lookup_table(tDbg, &lookup_table);
if (ret != DW_DLV_OK) {
printf("Unable to create lookup table");
return;
}
atexit(endTrace);
}
static void print_line(Dwarf_Debug dbg, Dwarf_Line line, Dwarf_Addr pc) {
char *linesrc = "??";
Dwarf_Unsigned lineno = 0;
if (line) {
dwarf_linesrc(line, &linesrc, NULL);
dwarf_lineno(line, &lineno, NULL);
}
printf("%s:%" DW_PR_DUu "\n", linesrc, lineno);
if (line) dwarf_dealloc(dbg, linesrc, DW_DLA_STRING);
}
void taosPrintBackTrace() {
int size = 20;
void **buffer[20];
Dwarf_Addr pc;
int32_t frame = 0;
void **ebp;
void **ret = NULL;
size_t func_frame_distance = 0;
taosThreadOnce(&traceThreadInit, startTrace);
if (buffer != NULL && size > 0) {
ebp = taosGetEbp();
func_frame_distance = (size_t)*ebp - (size_t)ebp;
while (ebp && frame < size && (func_frame_distance < (1ULL << 24)) && (func_frame_distance > 0)) {
ret = ebp + 1;
buffer[frame++] = *ret;
ebp = (void **)(*ebp);
func_frame_distance = (size_t)*ebp - (size_t)ebp;
}
for (size_t i = 0; i < frame; i++) {
pc = (size_t)buffer[i] - addr;
if (pc > 0) {
if (pc >= lookup_table.low && pc < lookup_table.high) {
Dwarf_Line line = lookup_table.table[pc - lookup_table.low];
if (line) print_line(tDbg, line, pc);
}
}
}
}
}
#endif
#endif
#endif
#ifndef USE_ADDR2LINE
void taosPrintBackTrace() { return; }
#endif
void *taosMemoryMalloc(int32_t size) {

View File

@ -125,7 +125,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
strcpy(outTimezoneStr, tz);
}
#elif defined(_TD_DARWIN_64)
#else
char buf[4096] = {0};
char *tz = NULL;
{
@ -170,64 +170,5 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0],
-timezone / 3600);
#else
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1);
/* load time zone string from /etc/timezone */
// FILE *f = fopen("/etc/timezone", "r");
errno = 0;
TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ);
char buf[68] = {0};
if (pFile != NULL) {
int len = taosReadFile(pFile, buf, 64);
if (len < 64 && taosGetErrorFile(pFile)) {
taosCloseFile(&pFile);
printf("read /etc/timezone error, reason:%s", strerror(errno));
return;
}
taosCloseFile(&pFile);
buf[sizeof(buf) - 1] = 0;
char *lineEnd = strstr(buf, "\n");
if (lineEnd != NULL) {
*lineEnd = 0;
}
// for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
if (strlen(buf) > 0) {
setenv("TZ", buf, 1);
}
}
// get and set default timezone
tzset();
/*
* get CURRENT time zone.
* system current time zone is affected by daylight saving time(DST)
*
* e.g., the local time zone of London in DST is GMT+01:00,
* otherwise is GMT+00:00
*/
int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR;
*tsTimezone = tz;
tz += daylight;
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
#endif
}

View File

@ -911,7 +911,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1)
void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true;
taosThreadJoin(cacheRefreshWorker, NULL);
if(cacheThreadInit != PTHREAD_ONCE_INIT) taosThreadJoin(cacheRefreshWorker, NULL);
taosArrayDestroy(pCacheArrayList);
}

View File

@ -460,8 +460,7 @@ class TDDnodes:
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
binPath = os.path.dirname(os.path.realpath(__file__))
binPath = binPath + "/../../../debug/"
binPath = self.dnodes[0].getPath() + "/../../../"
tdLog.debug("binPath %s" % (binPath))
binPath = os.path.realpath(binPath)
tdLog.debug("binPath real path %s" % (binPath))

View File

@ -70,6 +70,11 @@
./test.sh -f tsim/tmq/basic2.sim
./test.sh -f tsim/tmq/basic3.sim
./test.sh -f tsim/tmq/basic4.sim
./test.sh -f tsim/tmq/basic1Of2Cons.sim
./test.sh -f tsim/tmq/basic2Of2Cons.sim
./test.sh -f tsim/tmq/basic3Of2Cons.sim
./test.sh -f tsim/tmq/basic4Of2Cons.sim
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
# --- stable
./test.sh -f tsim/stable/disk.sim

View File

@ -4,9 +4,10 @@ set +e
#set -x
echo "Executing copy_udf.sh"
SCRIPT_DIR=`pwd`
SCRIPT_DIR=`dirname $0`
cd $SCRIPT_DIR/../
SCRIPT_DIR=`pwd`
echo "SCRIPT_DIR: ${SCRIPT_DIR}"
IN_TDINTERNAL="community"
if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then
@ -16,6 +17,7 @@ else
fi
TAOS_DIR=`pwd`
echo "find udf library in $TAOS_DIR"
UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1`
UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1`

View File

@ -3,6 +3,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c startUdfd -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
@ -65,25 +66,25 @@ if $data00 != 1.414213562 then
endi
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
#sql select udf1(f1, f2) from t2;
#print $rows , $data00 , $data10 , $data20 , $data30
#if $rows != 4 then
# return -1
#endi
#if $data00 != 88 then
# return -1
#endi
#if $data10 != 88 then
# return -1
#endi
#
#if $data20 != NULL then
# return -1
#endi
#
#if $data30 != NULL then
# return -1
#endi
sql select udf1(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select udf2(f1, f2) from t2;
print $rows, $data00

View File

@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/char_length.py
python3 ./test.py -f 2-query/upper.py
python3 ./test.py -f 2-query/lower.py
python3 ./test.py -f 2-query/join.py
python3 ./test.py -f 2-query/cast.py
# python3 ./test.py -f 2-query/concat.py # after wal ,crash occured
# python3 ./test.py -f 2-query/concat_ws.py

View File

@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows;
@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) {
return NULL;
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
pInfo->tmq = NULL;
// save consume result into consumeresult table
saveConsumeResult(pInfo);