diff --git a/cmake/cmake.define b/cmake/cmake.define index e5ef08acb8..f90024e032 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -75,6 +75,14 @@ IF (TD_WINDOWS) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}") ELSE () + IF (${COVER} MATCHES "true") + MESSAGE(STATUS "Test coverage mode, add extra flags") + SET(GCC_COVERAGE_COMPILE_FLAGS "-fprofile-arcs -ftest-coverage") + SET(GCC_COVERAGE_LINK_FLAGS "-lgcov --coverage") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}") + ENDIF () + IF (${SANITIZER} MATCHES "true") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") diff --git a/include/common/tglobal.h b/include/common/tglobal.h index da5158abb5..97e6491b98 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -121,6 +121,9 @@ extern char tsCompressor[]; extern int32_t tsDiskCfgNum; extern SDiskCfg tsDiskCfg[]; +// udf +extern bool tsStartUdfd; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9ec80293a8..909b7d0877 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -328,6 +328,9 @@ typedef struct { int8_t alterType; int32_t numOfFields; SArray* pFields; + int32_t ttl; + int32_t commentLen; + char* comment; } SMAlterStbReq; int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq); @@ -1455,7 +1458,7 @@ typedef struct { static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo)); if (pRebInfo == NULL) { - goto _err; + return NULL; } strcpy(pRebInfo->key, key); pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t)); diff --git a/include/common/tname.h b/include/common/tname.h index ae2dc32335..28f97d1028 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -16,8 +16,8 @@ #ifndef _TD_COMMON_NAME_H_ #define _TD_COMMON_NAME_H_ -#include "tdef.h" #include "tarray.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -65,19 +65,19 @@ bool tNameDBNameEqual(SName* left, SName* right); typedef struct { // input - SArray *tags; // element is SSmlKV - const char *sTableName; // super table name - uint8_t sTableNameLen; // the length of super table name + SArray* tags; // element is SSmlKv + const char* sTableName; // super table name + uint8_t sTableNameLen; // the length of super table name // output - char *childTableName; // must have size of TSDB_TABLE_NAME_LEN; - uint64_t uid; // child table uid, may be useful + char* childTableName; // must have size of TSDB_TABLE_NAME_LEN; + uint64_t uid; // child table uid, may be useful } RandTableName; -void buildChildTableName(RandTableName *rName); +void buildChildTableName(RandTableName* rName); #ifdef __cplusplus } #endif -#endif /*_TD_COMMON_NAME_H_*/ +#endif /*_TD_COMMON_NAME_H_*/ diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 702dd50df5..05c9da1a8a 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -179,6 +179,7 @@ typedef enum ENodeType { QUERY_NODE_KILL_CONNECTION_STMT, QUERY_NODE_KILL_QUERY_STMT, QUERY_NODE_KILL_TRANSACTION_STMT, + QUERY_NODE_QUERY, // logic plan node QUERY_NODE_LOGIC_PLAN_SCAN, diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 0a835dae83..c0e2ec91e0 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -295,6 +295,37 @@ typedef struct SExplainStmt { SNode* pQuery; } SExplainStmt; +typedef struct SCmdMsgInfo { + int16_t msgType; + SEpSet epSet; + void* pMsg; + int32_t msgLen; + void* pExtension; // todo remove it soon +} SCmdMsgInfo; + +typedef enum EQueryExecMode { + QUERY_EXEC_MODE_LOCAL = 1, + QUERY_EXEC_MODE_RPC, + QUERY_EXEC_MODE_SCHEDULE, + QUERY_EXEC_MODE_EMPTY_RESULT +} EQueryExecMode; + +typedef struct SQuery { + ENodeType type; + EQueryExecMode execMode; + bool haveResultSet; + SNode* pRoot; + int32_t numOfResCols; + SSchema* pResSchema; + int8_t precision; + SCmdMsgInfo* pCmdMsg; + int32_t msgType; + SArray* pDbList; + SArray* pTableList; + bool showRewrite; + int32_t placeholderNum; +} SQuery; + void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 998d45aee1..a8bcac1c42 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -48,36 +48,6 @@ typedef struct SParseContext { bool isSuperUser; } SParseContext; -typedef struct SCmdMsgInfo { - int16_t msgType; - SEpSet epSet; - void* pMsg; - int32_t msgLen; - void* pExtension; // todo remove it soon -} SCmdMsgInfo; - -typedef enum EQueryExecMode { - QUERY_EXEC_MODE_LOCAL = 1, - QUERY_EXEC_MODE_RPC, - QUERY_EXEC_MODE_SCHEDULE, - QUERY_EXEC_MODE_EMPTY_RESULT -} EQueryExecMode; - -typedef struct SQuery { - EQueryExecMode execMode; - bool haveResultSet; - SNode* pRoot; - int32_t numOfResCols; - SSchema* pResSchema; - int8_t precision; - SCmdMsgInfo* pCmdMsg; - int32_t msgType; - SArray* pDbList; - SArray* pTableList; - bool showRewrite; - int32_t placeholderNum; -} SQuery; - int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); bool isInsertSql(const char* pStr, size_t length); @@ -103,9 +73,10 @@ void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen); -void* smlInitHandle(SQuery *pQuery); -void smlDestroyHandle(void *pHandle); -int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *colsSchema, SArray *cols, bool format, STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen); +void* smlInitHandle(SQuery* pQuery); +void smlDestroyHandle(void* pHandle); +int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* colsSchema, SArray* cols, bool format, + STableMeta* pTableMeta, char* tableName, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); #ifdef __cplusplus diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e277622c40..56e6a39ce8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -16,6 +16,7 @@ #include "tdatablock.h" #include "tmsg.h" #include "tmsgcb.h" +#include "tqueue.h" #include "trpc.h" #ifdef __cplusplus @@ -154,6 +155,10 @@ struct SStreamTask { STaskDispatcherShuffle shuffleDispatcher; }; + // msg buffer + int32_t memUsed; + STaosQueue* inputQ; + // application storage void* ahandle; }; @@ -194,6 +199,8 @@ typedef struct { SArray* res; // SArray } SStreamSinkReq; +int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType); + int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); #ifdef __cplusplus diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4079fcc2fa..52b2f0c670 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -633,6 +633,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN TAOS_DEF_ERROR_CODE(0, 0x2642) #define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643) #define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644) +#define TSDB_CODE_PAR_INVALID_STREAM_QUERY TAOS_DEF_ERROR_CODE(0, 0x2645) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b021651c16..57d4031835 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -254,7 +254,7 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); +void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 596c3d3fbf..819c50275b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -131,7 +131,7 @@ void destroyTscObj(void *pObj) { taosMemoryFreeClear(pTscObj); } -void *createTscObj(const char *user, const char *auth, const char *db, SAppInstInfo *pAppInfo) { +void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -145,6 +145,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return NULL; } + pObj->connType = connType; pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 97c7d2bad1..53aebe751b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -25,7 +25,7 @@ #include "tref.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static bool stringLengthCheck(const char* str, size_t maxsize) { @@ -491,7 +491,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType) { - STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo); + STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo); if (NULL == pTscObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return pTscObj; @@ -504,7 +504,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return NULL; } - SMsgSendInfo* body = buildConnectMsg(pRequest, connType); + SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); @@ -527,7 +527,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -550,9 +550,10 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) { } taosMemoryFreeClear(db); - connectReq.connType = connType; - connectReq.pid = htonl(appInfo.pid); + connectReq.connType = pObj->connType; + connectReq.pid = htonl(appInfo.pid); connectReq.startTime = htobe64(appInfo.startTime); + tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user)); tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd)); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9d75586917..ae6aee13d5 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -563,7 +563,11 @@ const char *taos_get_server_info(TAOS *taos) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - // TODO + if (taos == NULL || sql == NULL) { + // todo directly call fp + } + + taos_query_l(taos, sql, (int32_t) strlen(sql)); } void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index d674b8286b..c768e001c5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -187,7 +187,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); - conf->autoCommit = false; + conf->autoCommit = true; conf->autoCommitInterval = 5000; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; return conf; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c878109711..f017d22b39 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -169,6 +169,9 @@ uint32_t tsMaxRange = 500; // max range uint32_t tsCurRange = 100; // range char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR +// udf +bool tsStartUdfd = true; + // internal int32_t tsTransPullupInterval = 6; int32_t tsMqRebalanceInterval = 2; @@ -441,6 +444,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; + if (cfgAddBool(pCfg, "startUdfd", tsStartUdfd, 0) != 0) return -1; return 0; } @@ -581,6 +585,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; + tsStartUdfd = cfgGetItem(pCfg, "startUdfd")->bval; + if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 85b55611dc..9524fb1f44 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -607,6 +607,11 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1; + if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; + if (pReq->commentLen > 0) { + if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -639,6 +644,14 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq } } + if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; + if (pReq->commentLen > 0) { + pReq->comment = taosMemoryMalloc(pReq->commentLen); + if (pReq->comment == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/common/src/tname.c b/source/common/src/tname.c index 62ba4bfb79..56fbfed8ff 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -18,11 +18,9 @@ #include "tcommon.h" #include "tstrbuild.h" -#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) +#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) -bool tscValidateTableNameLength(size_t len) { - return len < TSDB_TABLE_NAME_LEN; -} +bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; } #if 0 // TODO refactor @@ -95,12 +93,12 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in * but in case of DST, the start time of one day need to be dynamically decided. */ // todo refactor to extract function that is available for Linux/Windows/Mac platform - #if defined(WINDOWS) && _MSC_VER >= 1900 +#if defined(WINDOWS) && _MSC_VER >= 1900 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 int64_t timezone = _timezone; int32_t daylight = _daylight; char** tzname = _tzname; - #endif +#endif int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L; start += timezone * t; @@ -142,10 +140,10 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { int32_t tNameLen(const SName* name) { assert(name != NULL); - char tmp[12] = {0}; + char tmp[12] = {0}; int32_t len = sprintf(tmp, "%d", name->acctId); - int32_t len1 = (int32_t) strlen(name->dbname); - int32_t len2 = (int32_t) strlen(name->tname); + int32_t len1 = (int32_t)strlen(name->dbname); + int32_t len2 = (int32_t)strlen(name->tname); if (name->type == TSDB_DB_NAME_T) { assert(len2 == 0); @@ -200,9 +198,7 @@ const char* tNameGetTableName(const SName* name) { return &name->tname[0]; } -void tNameAssign(SName* dst, const SName* src) { - memcpy(dst, src, sizeof(SName)); -} +void tNameAssign(SName* dst, const SName* src) { memcpy(dst, src, sizeof(SName)); } int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t nameLen) { assert(dst != NULL && dbName != NULL && nameLen > 0); @@ -244,7 +240,6 @@ bool tNameDBNameEqual(SName* left, SName* right) { return (0 == strcmp(left->dbname, right->dbname)); } - int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { assert(dst != NULL && str != NULL && strlen(str) > 0); @@ -260,14 +255,14 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { if ((type & T_NAME_DB) == T_NAME_DB) { dst->type = TSDB_DB_NAME_T; - char* start = (char*)((p == NULL)? str:(p+1)); + char* start = (char*)((p == NULL) ? str : (p + 1)); int32_t len = 0; p = strstr(start, TS_PATH_DELIMITER); if (p == NULL) { - len = (int32_t) strlen(start); + len = (int32_t)strlen(start); } else { - len = (int32_t) (p - start); + len = (int32_t)(p - start); } // too long account id or too long db name @@ -275,21 +270,21 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) { return -1; } - memcpy (dst->dbname, start, len); + memcpy(dst->dbname, start, len); dst->dbname[len] = 0; } if ((type & T_NAME_TABLE) == T_NAME_TABLE) { dst->type = TSDB_TABLE_NAME_T; - char* start = (char*) ((p == NULL)? str: (p+1)); + char* start = (char*)((p == NULL) ? str : (p + 1)); // too long account id or too long db name - int32_t len = (int32_t) strlen(start); + int32_t len = (int32_t)strlen(start); if ((len >= tListLen(dst->tname)) || (len <= 0)) { return -1; } - memcpy (dst->tname, start, len); + memcpy(dst->tname, start, len); dst->tname[len] = 0; } @@ -305,14 +300,14 @@ static int compareKv(const void* p1, const void* p2) { if (res != 0) { return res; } else { - return kvLen1-kvLen2; + return kvLen1 - kvLen2; } } /* * use stable name and tags to grearate child table name */ -void buildChildTableName(RandTableName *rName) { +void buildChildTableName(RandTableName* rName) { int32_t size = taosArrayGetSize(rName->tags); ASSERT(size > 0); taosArraySort(rName->tags, compareKv); @@ -320,19 +315,19 @@ void buildChildTableName(RandTableName *rName) { SStringBuilder sb = {0}; taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen); for (int j = 0; j < size; ++j) { - SSmlKv *tagKv = taosArrayGetP(rName->tags, j); + SSmlKv* tagKv = taosArrayGetP(rName->tags, j); taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); } - size_t len = 0; - char* keyJoined = taosStringBuilderGetResult(&sb, &len); + size_t len = 0; + char* keyJoined = taosStringBuilderGetResult(&sb, &len); T_MD5_CTX context; tMD5Init(&context); - tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len); + tMD5Update(&context, (uint8_t*)keyJoined, (uint32_t)len); tMD5Final(&context); uint64_t digest1 = *(uint64_t*)(context.digest); uint64_t digest2 = *(uint64_t*)(context.digest + 8); - snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016" PRIx64 "%016" PRIx64, digest1, digest2); taosStringBuilderDestroy(&sb); rName->uid = digest1; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 70bdda5855..645f2ff0e7 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1550,10 +1550,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)t, false); - // single stable model - int8_t m = 0; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&m, false); + colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols); colDataAppend(pColInfo, rows, (const char *)b, false); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a2c628b8a1..8225eca659 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -196,7 +196,9 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { return pVgEpNew; } -void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); } +void tDeleteSMqVgEp(SMqVgEp *pVgEp) { + if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg); +} int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 01149f793f..b62de0e06e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -298,6 +298,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } @@ -307,16 +309,22 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFree(topicObj.ast); + taosMemoryFree(topicObj.sql); return -1; } } else { @@ -331,6 +339,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + taosMemoryFreeClear(topicObj.ast); + taosMemoryFreeClear(topicObj.sql); taosMemoryFreeClear(topicObj.physicalPlan); return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f607e0367e..6ca60945cd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -233,17 +233,19 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; + // make sure msgType == TDMT_VND_SUBMIT + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { + return -1; + } + + if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; + void* data = taosMemoryMalloc(msgLen); if (data == NULL) { return -1; } memcpy(data, msg, msgLen); - // make sure msgType == TDMT_VND_SUBMIT - if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { - return -1; - } - SRpcMsg req = { .msgType = TDMT_VND_STREAM_TRIGGER, .pCont = data, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 98559f974d..93e81aa70e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -62,11 +62,6 @@ enum { * 2. when all data within queried time window, it is also denoted as query_completed */ TASK_COMPLETED = 0x2u, - - /* when the result is not completed return to client, this status will be - * usually used in case of interval query with interpolation option - */ - TASK_OVER = 0x4u, }; typedef struct SResultRowCell { @@ -288,12 +283,6 @@ typedef struct SOperatorInfo { SOperatorFpSet fpSet; } SOperatorInfo; -typedef struct { - int32_t numOfTags; - int32_t numOfCols; - SColumnInfo* colList; -} SQueriedTableInfo; - typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -629,8 +618,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, - SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, @@ -711,8 +699,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pE #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput); #endif int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -737,7 +723,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); -void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ba77950912..3cc75a815d 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -202,7 +202,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER); + return isTaskKilled(pTaskInfo); } void qDestroyTask(qTaskInfo_t qTaskHandle) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 005d39b525..33f0c440ec 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -107,7 +107,6 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); -static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); @@ -620,7 +619,7 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* continue; } - if (functionNeedToExecute(&pCtx[k])) { + if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { pCtx[k].fpSet.process(&pCtx[k]); } @@ -803,9 +802,12 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { - pCtx[k].startTs = startTs; // this can be set during create the struct - if (pCtx[k].fpSet.process != NULL) + pCtx[k].startTs = startTs; + // this can be set during create the struct + // todo add a dummy funtion to avoid process check + if (pCtx[k].fpSet.process != NULL) { pCtx[k].fpSet.process(&pCtx[k]); + } } } } @@ -922,6 +924,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { if (IS_VAR_DATA_TYPE(type)) { // todo disable this + // if (pResultRow->key == NULL) { // pResultRow->key = taosMemoryMalloc(varDataTLen(pData)); // varDataCopy(pResultRow->key, pData); @@ -1075,7 +1078,7 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* } // set the output buffer for the selectivity + tag query -static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { int32_t num = 0; SqlFunctionCtx* p = NULL; @@ -1087,7 +1090,7 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { for (int32_t i = 0; i < numOfOutput; ++i) { if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) { pValCtx[num++] = &pCtx[i]; - } else { + } else if (fmIsAggFunc(pCtx[i].functionId)) { p = &pCtx[i]; } // if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { @@ -1215,7 +1218,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); } - setCtxTagColumnInfo(pFuncCtx, numOfOutput); + setSelectValueColumnInfo(pFuncCtx, numOfOutput); return pFuncCtx; } @@ -1451,8 +1454,6 @@ static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, in } } -#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) - // static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, // SqlFunctionCtx *pCtx, int32_t numOfRows) { // STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -2009,8 +2010,8 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } // todo merged with the build group result. -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, - SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { +void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, + int32_t* rowCellInfoOffset) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) { SResultRowPosition* pPos = &pResultRowInfo->pPosition[i]; @@ -2023,17 +2024,11 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD // } for (int32_t j = 0; j < numOfOutput; ++j) { - pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); - - struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; + struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset); if (!isRowEntryInitialized(pResInfo)) { continue; } - if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr. - // pCtx[j].fpSet.finalize(&pCtx[j]); - } - if (pRow->numOfRows < pResInfo->numOfRes) { pRow->numOfRows = pResInfo->numOfRes; } @@ -2187,17 +2182,15 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t start = 0; - int32_t step = -1; + int32_t step = 1; // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC); if (orderType == TSDB_ORDER_ASC) { start = pGroupResInfo->index; - step = 1; } else { // desc order copy all data start = numOfRows - pGroupResInfo->index - 1; - step = -1; } for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { @@ -2227,10 +2220,13 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor } else { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes); + // expand the result into multiple rows. E.g., _wstartts, top(k, 20) + // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for(int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } } } @@ -3682,10 +3678,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOptrBasicInfo* pInfo = &pAggInfo->binfo; - int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; - bool newgroup = true; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -3698,6 +3692,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } + int32_t order = getTableScanOrder(pOperator); + // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, @@ -3730,8 +3726,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, - &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, + pAggInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); OPTR_SET_OPENED(pOperator); @@ -4014,7 +4010,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } } - // todo dynamic set tags + // todo set tags + // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); @@ -4028,7 +4025,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, pTaskInfo->code); } @@ -4279,11 +4275,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { STableKeyInfo* pk = taosArrayGet(pa, j); - STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; - // pTQueryInfo->uid = pk->uid; pTQueryInfo->lastKey = pk->lastKey; - // pTQueryInfo->groupIndex = i; } } @@ -4302,7 +4295,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t numOfRows = 1; + int32_t numOfRows = 10; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, numOfRows); @@ -4313,9 +4306,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pOperator->resultInfo.capacity = 4096; - pOperator->resultInfo.threshold = 4096 * 0.75; - int32_t numOfGroup = 10; // todo replaced with true value pInfo->groupId = INT32_MIN; initResultRowInfo(&pInfo->binfo.resultRowInfo, numOfGroup); @@ -4545,42 +4535,6 @@ _error: return NULL; } -static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { - int32_t j = 0; - - if (TSDB_COL_IS_TAG(pExpr->pParam[0].pCol->type)) { - if (pExpr->pParam[0].pCol->colId == TSDB_TBNAME_COLUMN_INDEX) { - return TSDB_TBNAME_COLUMN_INDEX; - } - - while (j < pTableInfo->numOfTags) { - if (pExpr->pParam[0].pCol->colId == pTagCols[j].colId) { - return j; - } - - j += 1; - } - - } /*else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) { // user specified column data - return TSDB_UD_COLUMN_INDEX; - } else { - while (j < pTableInfo->numOfCols) { - if (pExpr->colInfo.colId == pTableInfo->colList[j].colId) { - return j; - } - - j += 1; - } - }*/ - - return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value -} - -bool validateExprColumnInfo(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { - int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols); - return j != INT32_MIN; -} - static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; @@ -4739,7 +4693,7 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); -static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode); +static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { SInterval interval = { @@ -5240,28 +5194,6 @@ _complete: return code; } -static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, - int32_t numOfOutput, int32_t tagLen, bool superTable) { - for (int32_t i = 0; i < numOfOutput; ++i) { - int16_t functId = getExprFunctionId(&pExprs[i]); - - if (functId == FUNCTION_TOP || functId == FUNCTION_BOTTOM) { - int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); - if (j < 0 || j >= pTableInfo->numOfCols) { - return TSDB_CODE_QRY_INVALID_MSG; - } else { - SColumnInfo* pCol = &pTableInfo->colList[j]; - // int32_t ret = getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.param[0].i, - // &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes, - // &pExprs[i].base.interBytes, tagLen, superTable, NULL); - // assert(ret == TSDB_CODE_SUCCESS); - } - } - } - - return TSDB_CODE_SUCCESS; -} - void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2ba3e257b2..e3a507bf7c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -304,8 +304,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, - &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, + pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ebde1c7997..738f4821bd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -798,8 +798,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, - &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, + pInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); OPTR_SET_OPENED(pOperator); @@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1293,7 +1293,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c62a7f00cc..a45fdab030 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -25,6 +25,7 @@ extern "C" { bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f88d84f50a..80bfb626d8 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -987,6 +987,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = NULL, .finalizeFunc = NULL }, + { + .name = "_c0", + .type = FUNCTION_TYPE_ROWTS, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, + .translateFunc = translateTimePseudoColumn, + .getEnvFunc = getTimePseudoFuncEnv, + .initFunc = NULL, + .sprocessFunc = NULL, + .finalizeFunc = NULL + }, { .name = "tbname", .type = FUNCTION_TYPE_TBNAME, @@ -1064,7 +1074,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateSelectValue, .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .initFunc = functionSetup, - .sprocessFunc = NULL, + .processFunc = NULL, .finalizeFunc = NULL } }; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d1b16aef71..effc4888a8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -184,7 +184,6 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; - /*cleanupResultRowEntry(pResInfo);*/ char* in = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes); @@ -192,6 +191,10 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) { + return 0; +} + int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) { int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 11502b4c47..a577ea200f 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -18,6 +18,7 @@ #include "tudf.h" #include "tudfInt.h" #include "tarray.h" +#include "tglobal.h" #include "tdatablock.h" #include "querynodes.h" #include "builtinsimpl.h" @@ -138,6 +139,10 @@ static void udfWatchUdfd(void *args) { } int32_t udfStartUdfd(int32_t startDnodeId) { + if (!tsStartUdfd) { + fnInfo("start udfd is disabled.") + return 0; + } SUdfdData *pData = &udfdGlobal; if (pData->startCalled) { fnInfo("dnode-mgmt start udfd already called"); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e8ac562072..a9481593e0 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -907,6 +907,13 @@ SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, S return (SNode*)pStmt; } +static SNode* createAlterTableStmtFinalize(SNode* pRealTable, SAlterTableStmt* pStmt) { + strcpy(pStmt->dbName, ((SRealTableNode*)pRealTable)->table.dbName); + strcpy(pStmt->tableName, ((SRealTableNode*)pRealTable)->table.tableName); + nodesDestroyNode(pRealTable); + return (SNode*)pStmt; +} + SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions) { if (NULL == pRealTable) { return NULL; @@ -915,7 +922,7 @@ SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, CHECK_OUT_OF_MEM(pStmt); pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_OPTIONS; pStmt->pOptions = (STableOptions*)pOptions; - return (SNode*)pStmt; + return createAlterTableStmtFinalize(pRealTable, pStmt); } SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, @@ -928,7 +935,7 @@ SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, pStmt->alterType = alterType; strncpy(pStmt->colName, pColName->z, pColName->n); pStmt->dataType = dataType; - return (SNode*)pStmt; + return createAlterTableStmtFinalize(pRealTable, pStmt); } SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, const SToken* pColName) { @@ -939,7 +946,7 @@ SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_ CHECK_OUT_OF_MEM(pStmt); pStmt->alterType = alterType; strncpy(pStmt->colName, pColName->z, pColName->n); - return (SNode*)pStmt; + return createAlterTableStmtFinalize(pRealTable, pStmt); } SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, @@ -952,7 +959,7 @@ SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int pStmt->alterType = alterType; strncpy(pStmt->colName, pOldColName->z, pOldColName->n); strncpy(pStmt->newColName, pNewColName->z, pNewColName->n); - return (SNode*)pStmt; + return createAlterTableStmtFinalize(pRealTable, pStmt); } SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const SToken* pTagName, SNode* pVal) { @@ -964,7 +971,7 @@ SNode* createAlterTableSetTag(SAstCreateContext* pCxt, SNode* pRealTable, const pStmt->alterType = TSDB_ALTER_TABLE_UPDATE_TAG_VAL; strncpy(pStmt->colName, pTagName->z, pTagName->n); pStmt->pVal = (SValueNode*)pVal; - return (SNode*)pStmt; + return createAlterTableStmtFinalize(pRealTable, pStmt); } SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 01327c08ef..abc7ddb17f 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -214,6 +214,7 @@ static SKeyword keywordTable[] = { {"WINDOW_CLOSE", TK_WINDOW_CLOSE}, {"WITH", TK_WITH}, {"WRITE", TK_WRITE}, + {"_C0", TK_ROWTS}, {"_QENDTS", TK_QENDTS}, {"_QSTARTTS", TK_QSTARTTS}, {"_ROWTS", TK_ROWTS}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0bb552606a..cdcb2592a7 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -782,6 +782,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, bool* pHasSel return DEAL_RES_ERROR; } strcpy(pFunc->functionName, "_select_value"); + strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); if (TSDB_CODE_SUCCESS == pCxt->errCode) { translateFunction(pCxt, pFunc); @@ -2540,10 +2541,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->igExists = pStmt->ignoreExists; pReq->xFilesFactor = pStmt->pOptions->filesFactor; pReq->delay = pStmt->pOptions->delay; + pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); + if ('\0' != pStmt->pOptions->comment[0]) { + pReq->comment = strdup(pStmt->pOptions->comment); + if (NULL == pReq->comment) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pReq->commentLen = strlen(pStmt->pOptions->comment) + 1; + } SName tableName; tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name); @@ -2602,6 +2611,18 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS } static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterReq) { + if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { + pAlterReq->ttl = pStmt->pOptions->ttl; + if ('\0' != pStmt->pOptions->comment[0]) { + pAlterReq->comment = strdup(pStmt->pOptions->comment); + if (NULL == pAlterReq->comment) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pAlterReq->commentLen = strlen(pStmt->pOptions->comment) + 1; + } + return TSDB_CODE_SUCCESS; + } + pAlterReq->pFields = taosArrayInit(2, sizeof(TAOS_FIELD)); if (NULL == pAlterReq->pFields) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2614,7 +2635,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterR case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: { - TAOS_FIELD field = {.type = pStmt->dataType.type, .bytes = pStmt->dataType.bytes}; + TAOS_FIELD field = {.type = pStmt->dataType.type, .bytes = calcTypeBytes(pStmt->dataType)}; strcpy(field.name, pStmt->colName); taosArrayPush(pAlterReq->pFields, &field); break; @@ -2625,7 +2646,7 @@ static int32_t setAlterTableField(SAlterTableStmt* pStmt, SMAlterStbReq* pAlterR strcpy(oldField.name, pStmt->colName); taosArrayPush(pAlterReq->pFields, &oldField); TAOS_FIELD newField = {0}; - strcpy(oldField.name, pStmt->newColName); + strcpy(newField.name, pStmt->newColName); taosArrayPush(pAlterReq->pFields, &newField); break; } @@ -2642,7 +2663,7 @@ static int32_t translateAlterTable(STranslateContext* pCxt, SAlterTableStmt* pSt SName tableName; tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), alterReq.name); alterReq.alterType = pStmt->alterType; - if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType || TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) { + if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == pStmt->alterType) { return TSDB_CODE_FAILED; } else { if (TSDB_CODE_SUCCESS != setAlterTableField(pStmt, &alterReq)) { @@ -2929,7 +2950,6 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS SName name; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); tNameGetFullDbName(&name, pReq->name); - /*tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);*/ pReq->igExists = pStmt->ignoreExists; pReq->withTbName = pStmt->pOptions->withTable; pReq->withSchema = pStmt->pOptions->withSchema; @@ -2993,7 +3013,8 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt SMDropTopicReq dropReq = {0}; SName name; - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), dropReq.name); + tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); + tNameGetFullDbName(&name, dropReq.name); dropReq.igNotExists = pStmt->ignoreNotExists; return buildCmdMsg(pCxt, TDMT_MND_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq); @@ -3033,28 +3054,48 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq); } -static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { - SCMCreateStreamReq createReq = {0}; +static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + if (NULL == pStmt->pQuery) { + return TSDB_CODE_SUCCESS; + } - createReq.igExists = pStmt->ignoreExists; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) { + return TSDB_CODE_SUCCESS; + } + } + + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY); +} + +static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { + SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; + strcpy(name.dbname, ((SRealTableNode*)(((SSelectStmt*)pStmt)->pFromTable))->table.dbName); + tNameGetFullDbName(&name, pDbFName); +} + +static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { + pReq->igExists = pStmt->ignoreExists; SName name; - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), createReq.name); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name); if ('\0' != pStmt->targetTabName[0]) { strcpy(name.dbname, pStmt->targetDbName); strcpy(name.tname, pStmt->targetTabName); - tNameExtractFullName(&name, createReq.targetStbFullName); + tNameExtractFullName(&name, pReq->targetStbFullName); } int32_t code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); + getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); + code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } if (TSDB_CODE_SUCCESS == code) { - createReq.sql = strdup(pCxt->pParseCxt->pSql); - if (NULL == createReq.sql) { + pReq->sql = strdup(pCxt->pParseCxt->pSql); + if (NULL == pReq->sql) { code = TSDB_CODE_OUT_OF_MEMORY; } } @@ -3064,11 +3105,20 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* : TSDB_CODE_SUCCESS; } if (TSDB_CODE_SUCCESS == code) { - createReq.triggerType = pStmt->pOptions->triggerType; - createReq.watermark = - (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); + pReq->triggerType = pStmt->pOptions->triggerType; + pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); } + return code; +} + +static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + SCMCreateStreamReq createReq = {0}; + + int32_t code = checkCreateStream(pCxt, pStmt); + if (TSDB_CODE_SUCCESS == code) { + code = buildCreateStreamReq(pCxt, pStmt, &createReq); + } if (TSDB_CODE_SUCCESS == code) { code = buildCmdMsg(pCxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, &createReq); } @@ -3166,7 +3216,7 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) { req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB; } strcpy(req.user, pStmt->userName); - strcpy(req.dbname, pStmt->dbName); + sprintf(req.dbname, "%d.%s", pCxt->pParseCxt->acctId, pStmt->dbName); return buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req); } diff --git a/source/libs/parser/test/parInitialATest.cpp b/source/libs/parser/test/parInitialATest.cpp index 5bd9eb5c43..4ddb7736b2 100644 --- a/source/libs/parser/test/parInitialATest.cpp +++ b/source/libs/parser/test/parInitialATest.cpp @@ -19,7 +19,7 @@ using namespace std; namespace ParserTest { -class ParserInitialATest : public ParserTestBase {}; +class ParserInitialATest : public ParserDdlTest {}; TEST_F(ParserInitialATest, alterAccount) { useDb("root", "test"); @@ -72,16 +72,103 @@ TEST_F(ParserInitialATest, alterDatabase) { TEST_F(ParserInitialATest, alterTable) { useDb("root", "test"); - // run("ALTER TABLE t1 TTL 10"); - // run("ALTER TABLE t1 COMMENT 'test'"); + SMAlterStbReq expect = {0}; + + auto setAlterStbReqFunc = [&](const char* pTbname, int8_t alterType, int32_t numOfFields = 0, + const char* pField1Name = nullptr, int8_t field1Type = 0, int32_t field1Bytes = 0, + const char* pField2Name = nullptr, const char* pComment = nullptr, + int32_t ttl = TSDB_DEFAULT_TABLE_TTL) { + int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname); + expect.name[len] = '\0'; + expect.alterType = alterType; + expect.ttl = ttl; + if (nullptr != pComment) { + expect.comment = strdup(pComment); + expect.commentLen = strlen(pComment) + 1; + } + + expect.numOfFields = numOfFields; + if (NULL == expect.pFields) { + expect.pFields = taosArrayInit(2, sizeof(TAOS_FIELD)); + TAOS_FIELD field = {0}; + taosArrayPush(expect.pFields, &field); + taosArrayPush(expect.pFields, &field); + } + + TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0); + if (NULL != pField1Name) { + strcpy(pField->name, pField1Name); + pField->name[strlen(pField1Name)] = '\0'; + } else { + memset(pField, 0, sizeof(TAOS_FIELD)); + } + pField->type = field1Type; + pField->bytes = field1Bytes > 0 ? field1Bytes : (field1Type > 0 ? tDataTypes[field1Type].bytes : 0); + + pField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1); + if (NULL != pField2Name) { + strcpy(pField->name, pField2Name); + pField->name[strlen(pField2Name)] = '\0'; + } else { + memset(pField, 0, sizeof(TAOS_FIELD)); + } + pField->type = 0; + pField->bytes = 0; + }; + + setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { + ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_ALTER_TABLE_STMT); + SMAlterStbReq req = {0}; + ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMAlterStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); + ASSERT_EQ(std::string(req.name), std::string(expect.name)); + ASSERT_EQ(req.alterType, expect.alterType); + ASSERT_EQ(req.numOfFields, expect.numOfFields); + if (expect.numOfFields > 0) { + TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 0); + TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 0); + ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name)); + ASSERT_EQ(pField->type, pExpectField->type); + ASSERT_EQ(pField->bytes, pExpectField->bytes); + } + if (expect.numOfFields > 1) { + TAOS_FIELD* pField = (TAOS_FIELD*)taosArrayGet(req.pFields, 1); + TAOS_FIELD* pExpectField = (TAOS_FIELD*)taosArrayGet(expect.pFields, 1); + ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name)); + ASSERT_EQ(pField->type, pExpectField->type); + ASSERT_EQ(pField->bytes, pExpectField->bytes); + } + }); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, nullptr, 10); + run("ALTER TABLE t1 TTL 10"); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_OPTIONS, 0, nullptr, 0, 0, nullptr, "test"); + run("ALTER TABLE t1 COMMENT 'test'"); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_ADD_COLUMN, 1, "cc1", TSDB_DATA_TYPE_BIGINT); run("ALTER TABLE t1 ADD COLUMN cc1 BIGINT"); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_DROP_COLUMN, 1, "c1"); run("ALTER TABLE t1 DROP COLUMN c1"); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, 1, "c1", TSDB_DATA_TYPE_VARCHAR, + 20 + VARSTR_HEADER_SIZE); run("ALTER TABLE t1 MODIFY COLUMN c1 VARCHAR(20)"); + + setAlterStbReqFunc("t1", TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, 2, "c1", 0, 0, "cc1"); run("ALTER TABLE t1 RENAME COLUMN c1 cc1"); + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_ADD_TAG, 1, "tag11", TSDB_DATA_TYPE_BIGINT); run("ALTER TABLE st1 ADD TAG tag11 BIGINT"); + + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_DROP_TAG, 1, "tag1"); run("ALTER TABLE st1 DROP TAG tag1"); + + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, 1, "tag1", TSDB_DATA_TYPE_VARCHAR, + 20 + VARSTR_HEADER_SIZE); run("ALTER TABLE st1 MODIFY TAG tag1 VARCHAR(20)"); + + setAlterStbReqFunc("st1", TSDB_ALTER_TABLE_UPDATE_TAG_NAME, 2, "tag1", 0, 0, "tag11"); run("ALTER TABLE st1 RENAME TAG tag1 tag11"); // run("ALTER TABLE st1s1 SET TAG tag1=10"); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 540560fcd7..cf364aba5c 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -19,7 +19,7 @@ using namespace std; namespace ParserTest { -class ParserInitialCTest : public ParserTestBase {}; +class ParserInitialCTest : public ParserDdlTest {}; // todo compact @@ -97,17 +97,143 @@ TEST_F(ParserInitialCTest, createSnode) { TEST_F(ParserInitialCTest, createStable) { useDb("root", "test"); + SMCreateStbReq expect = {0}; + + auto setCreateStbReqFunc = [&](const char* pTbname, int8_t igExists = 0, + float xFilesFactor = TSDB_DEFAULT_ROLLUP_FILE_FACTOR, + int32_t delay = TSDB_DEFAULT_ROLLUP_DELAY, int32_t ttl = TSDB_DEFAULT_TABLE_TTL, + const char* pComment = nullptr) { + memset(&expect, 0, sizeof(SMCreateStbReq)); + int32_t len = snprintf(expect.name, sizeof(expect.name), "0.test.%s", pTbname); + expect.name[len] = '\0'; + expect.igExists = igExists; + expect.xFilesFactor = xFilesFactor; + expect.delay = delay; + expect.ttl = ttl; + if (nullptr != pComment) { + expect.comment = strdup(pComment); + expect.commentLen = strlen(pComment) + 1; + } + }; + + auto addFieldToCreateStbReqFunc = [&](bool col, const char* pFieldName, uint8_t type, int32_t bytes = 0, + int8_t flags = SCHEMA_SMA_ON) { + SField field = {0}; + strcpy(field.name, pFieldName); + field.type = type; + field.bytes = bytes > 0 ? bytes : tDataTypes[type].bytes; + field.flags = flags; + + if (col) { + if (NULL == expect.pColumns) { + expect.pColumns = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField)); + } + taosArrayPush(expect.pColumns, &field); + expect.numOfColumns += 1; + } else { + if (NULL == expect.pTags) { + expect.pTags = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SField)); + } + taosArrayPush(expect.pTags, &field); + expect.numOfTags += 1; + } + }; + + setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { + ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_TABLE_STMT); + SMCreateStbReq req = {0}; + ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateStbReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); + + ASSERT_EQ(std::string(req.name), std::string(expect.name)); + ASSERT_EQ(req.igExists, expect.igExists); + ASSERT_EQ(req.xFilesFactor, expect.xFilesFactor); + ASSERT_EQ(req.delay, expect.delay); + ASSERT_EQ(req.ttl, expect.ttl); + ASSERT_EQ(req.numOfColumns, expect.numOfColumns); + ASSERT_EQ(req.numOfTags, expect.numOfTags); + ASSERT_EQ(req.commentLen, expect.commentLen); + ASSERT_EQ(req.ast1Len, expect.ast1Len); + ASSERT_EQ(req.ast2Len, expect.ast2Len); + + if (expect.numOfColumns > 0) { + ASSERT_EQ(taosArrayGetSize(req.pColumns), expect.numOfColumns); + ASSERT_EQ(taosArrayGetSize(req.pColumns), taosArrayGetSize(expect.pColumns)); + for (int32_t i = 0; i < expect.numOfColumns; ++i) { + SField* pField = (SField*)taosArrayGet(req.pColumns, i); + SField* pExpectField = (SField*)taosArrayGet(expect.pColumns, i); + ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name)); + ASSERT_EQ(pField->type, pExpectField->type); + ASSERT_EQ(pField->bytes, pExpectField->bytes); + ASSERT_EQ(pField->flags, pExpectField->flags); + } + } + if (expect.numOfTags > 0) { + ASSERT_EQ(taosArrayGetSize(req.pTags), expect.numOfTags); + ASSERT_EQ(taosArrayGetSize(req.pTags), taosArrayGetSize(expect.pTags)); + for (int32_t i = 0; i < expect.numOfTags; ++i) { + SField* pField = (SField*)taosArrayGet(req.pTags, i); + SField* pExpectField = (SField*)taosArrayGet(expect.pTags, i); + ASSERT_EQ(std::string(pField->name), std::string(pExpectField->name)); + ASSERT_EQ(pField->type, pExpectField->type); + ASSERT_EQ(pField->bytes, pExpectField->bytes); + ASSERT_EQ(pField->flags, pExpectField->flags); + } + } + if (expect.commentLen > 0) { + ASSERT_EQ(std::string(req.comment), std::string(expect.comment)); + } + if (expect.ast1Len > 0) { + ASSERT_EQ(std::string(req.pAst1), std::string(expect.pAst1)); + } + if (expect.ast2Len > 0) { + ASSERT_EQ(std::string(req.pAst2), std::string(expect.pAst2)); + } + }); + + setCreateStbReqFunc("t1"); + addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP); + addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT); + addFieldToCreateStbReqFunc(false, "id", TSDB_DATA_TYPE_INT); run("create stable t1(ts timestamp, c1 int) TAGS(id int)"); + setCreateStbReqFunc("t1", 1, 0.1, 2, 100, "test create table"); + addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0); + addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT); + addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT); + addFieldToCreateStbReqFunc(true, "c3", TSDB_DATA_TYPE_BIGINT); + addFieldToCreateStbReqFunc(true, "c4", TSDB_DATA_TYPE_UBIGINT, 0, 0); + addFieldToCreateStbReqFunc(true, "c5", TSDB_DATA_TYPE_FLOAT, 0, 0); + addFieldToCreateStbReqFunc(true, "c6", TSDB_DATA_TYPE_DOUBLE, 0, 0); + addFieldToCreateStbReqFunc(true, "c7", TSDB_DATA_TYPE_BINARY, 20 + VARSTR_HEADER_SIZE, 0); + addFieldToCreateStbReqFunc(true, "c8", TSDB_DATA_TYPE_SMALLINT, 0, 0); + addFieldToCreateStbReqFunc(true, "c9", TSDB_DATA_TYPE_USMALLINT, 0, 0); + addFieldToCreateStbReqFunc(true, "c10", TSDB_DATA_TYPE_TINYINT, 0, 0); + addFieldToCreateStbReqFunc(true, "c11", TSDB_DATA_TYPE_UTINYINT, 0, 0); + addFieldToCreateStbReqFunc(true, "c12", TSDB_DATA_TYPE_BOOL, 0, 0); + addFieldToCreateStbReqFunc(true, "c13", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 0); + addFieldToCreateStbReqFunc(true, "c14", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE, 0); + addFieldToCreateStbReqFunc(false, "a1", TSDB_DATA_TYPE_TIMESTAMP); + addFieldToCreateStbReqFunc(false, "a2", TSDB_DATA_TYPE_INT); + addFieldToCreateStbReqFunc(false, "a3", TSDB_DATA_TYPE_UINT); + addFieldToCreateStbReqFunc(false, "a4", TSDB_DATA_TYPE_BIGINT); + addFieldToCreateStbReqFunc(false, "a5", TSDB_DATA_TYPE_UBIGINT); + addFieldToCreateStbReqFunc(false, "a6", TSDB_DATA_TYPE_FLOAT); + addFieldToCreateStbReqFunc(false, "a7", TSDB_DATA_TYPE_DOUBLE); + addFieldToCreateStbReqFunc(false, "a8", TSDB_DATA_TYPE_BINARY, 20 + VARSTR_HEADER_SIZE); + addFieldToCreateStbReqFunc(false, "a9", TSDB_DATA_TYPE_SMALLINT); + addFieldToCreateStbReqFunc(false, "a10", TSDB_DATA_TYPE_USMALLINT); + addFieldToCreateStbReqFunc(false, "a11", TSDB_DATA_TYPE_TINYINT); + addFieldToCreateStbReqFunc(false, "a12", TSDB_DATA_TYPE_UTINYINT); + addFieldToCreateStbReqFunc(false, "a13", TSDB_DATA_TYPE_BOOL); + addFieldToCreateStbReqFunc(false, "a14", TSDB_DATA_TYPE_NCHAR, 30 * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); + addFieldToCreateStbReqFunc(false, "a15", TSDB_DATA_TYPE_VARCHAR, 50 + VARSTR_HEADER_SIZE); run("create stable if not exists test.t1(" - "ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), c8 " - "SMALLINT, " - "c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, c13 NCHAR(30), " - "c15 VARCHAR(50)) " - "TAGS (tsa TIMESTAMP, a1 INT, a2 INT UNSIGNED, a3 BIGINT, a4 BIGINT UNSIGNED, a5 FLOAT, a6 DOUBLE, a7 " - "BINARY(20), a8 SMALLINT, " - "a9 SMALLINT UNSIGNED COMMENT 'test column comment', a10 TINYINT, a11 TINYINT UNSIGNED, a12 BOOL, a13 NCHAR(30), " - "a15 VARCHAR(50)) " + "ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), " + "c8 SMALLINT, c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, " + "c13 NCHAR(30), c14 VARCHAR(50)) " + "TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, " + "a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, " + "a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) " "TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (min) FILE_FACTOR 0.1 DELAY 2"); } diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index a9ff756a95..250ac1c528 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -49,6 +49,8 @@ struct TerminateFlag : public exception { class ParserTestBaseImpl { public: + ParserTestBaseImpl(ParserTestBase* pBase) : pBase_(pBase) {} + void useDb(const string& acctId, const string& db) { caseEnv_.acctId_ = acctId; caseEnv_.db_ = db; @@ -156,11 +158,13 @@ class ParserTestBaseImpl { void doParse(SParseContext* pCxt, SQuery** pQuery) { DO_WITH_THROW(parse, pCxt, pQuery); + ASSERT_NE(*pQuery, nullptr); res_.parsedAst_ = toString((*pQuery)->pRoot); } void doTranslate(SParseContext* pCxt, SQuery* pQuery) { DO_WITH_THROW(translate, pCxt, pQuery); + checkQuery(pQuery, PARSER_STAGE_TRANSLATE); res_.translatedAst_ = toString(pQuery->pRoot); } @@ -178,12 +182,15 @@ class ParserTestBaseImpl { return str; } - caseEnv caseEnv_; - stmtEnv stmtEnv_; - stmtRes res_; + void checkQuery(const SQuery* pQuery, ParserStage stage) { pBase_->checkDdl(pQuery, stage); } + + caseEnv caseEnv_; + stmtEnv stmtEnv_; + stmtRes res_; + ParserTestBase* pBase_; }; -ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl()) {} +ParserTestBase::ParserTestBase() : impl_(new ParserTestBaseImpl(this)) {} ParserTestBase::~ParserTestBase() {} @@ -193,4 +200,6 @@ void ParserTestBase::run(const std::string& sql, int32_t expect, ParserStage che return impl_->run(sql, expect, checkStage); } +void ParserTestBase::checkDdl(const SQuery* pQuery, ParserStage stage) { return; } + } // namespace ParserTest diff --git a/source/libs/parser/test/parTestUtil.h b/source/libs/parser/test/parTestUtil.h index 0e8703b2c8..c7d7ead8db 100644 --- a/source/libs/parser/test/parTestUtil.h +++ b/source/libs/parser/test/parTestUtil.h @@ -18,6 +18,9 @@ #include +#define ALLOW_FORBID_FUNC + +#include "querynodes.h" #include "taoserror.h" namespace ParserTest { @@ -34,10 +37,32 @@ class ParserTestBase : public testing::Test { void useDb(const std::string& acctId, const std::string& db); void run(const std::string& sql, int32_t expect = TSDB_CODE_SUCCESS, ParserStage checkStage = PARSER_STAGE_ALL); + virtual void checkDdl(const SQuery* pQuery, ParserStage stage); + private: std::unique_ptr impl_; }; +class ParserDdlTest : public ParserTestBase { + public: + void setCheckDdlFunc(const std::function& func) { checkDdl_ = func; } + + virtual void checkDdl(const SQuery* pQuery, ParserStage stage) { + ASSERT_NE(pQuery, nullptr); + ASSERT_EQ(pQuery->haveResultSet, false); + ASSERT_NE(pQuery->pRoot, nullptr); + ASSERT_EQ(pQuery->numOfResCols, 0); + ASSERT_EQ(pQuery->pResSchema, nullptr); + ASSERT_EQ(pQuery->precision, 0); + if (nullptr != checkDdl_) { + checkDdl_(pQuery, stage); + } + } + + private: + std::function checkDdl_; +}; + extern bool g_isDump; } // namespace ParserTest diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 43822cd311..cf7dd50103 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -113,7 +113,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB } int tdbBtreeClose(SBTree *pBt) { - // TODO + if (pBt) { + tdbOsFree(pBt); + } return 0; } diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index ceaac6dff1..383807cc35 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -66,7 +66,10 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn } int tdbDbClose(TDB *pDb) { - // TODO + if (pDb) { + tdbBtreeClose(pDb->pBt); + tdbOsFree(pDb); + } return 0; } diff --git a/source/libs/tdb/src/db/tdbEnv.c b/source/libs/tdb/src/db/tdbEnv.c index d64b31234e..c0c1343a4f 100644 --- a/source/libs/tdb/src/db/tdbEnv.c +++ b/source/libs/tdb/src/db/tdbEnv.c @@ -56,7 +56,7 @@ int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) { pEnv->nPgrHash = 8; tsize = sizeof(SPager *) * pEnv->nPgrHash; - pEnv->pgrHash = tdbRealloc(pEnv->pgrHash, tsize); + pEnv->pgrHash = tdbOsMalloc(tsize); if (pEnv->pgrHash == NULL) { return -1; } @@ -69,7 +69,19 @@ int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) { } int tdbEnvClose(TENV *pEnv) { - // TODO + SPager *pPager; + + if (pEnv) { + for (pPager = pEnv->pgrList; pPager; pPager = pEnv->pgrList) { + pEnv->pgrList = pPager->pNext; + tdbPagerClose(pPager); + } + + tdbPCacheClose(pEnv->pCache); + tdbOsFree(pEnv->pgrHash); + tdbOsFree(pEnv); + } + return 0; } diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 646b83298e..aa05687426 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -292,6 +292,10 @@ static int tdbPCacheOpenImpl(SPCache *pCache) { pPage->pFreeNext = pCache->pFree; pCache->pFree = pPage; pCache->nFree++; + + // add to local list + pPage->pCacheNext = pCache->pList; + pCache->pList = pPage; } // Open the hash table @@ -317,9 +321,10 @@ static int tdbPCacheCloseImpl(SPCache *pCache) { for (pPage = pCache->pList; pPage; pPage = pCache->pList) { pCache->pList = pPage->pCacheNext; - tdbPageDestroy(pPage, NULL, NULL); + tdbPageDestroy(pPage, tdbDefaultFree, NULL); } + tdbOsFree(pCache->pgHash); tdbPCacheDestroyLock(pCache); return 0; } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 1941592602..fbd5cb3aac 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -88,7 +88,13 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { } int tdbPagerClose(SPager *pPager) { - // TODO + if (pPager) { + if (pPager->inTran) { + tdbOsClose(pPager->jfd); + } + tdbOsClose(pPager->fd); + tdbOsFree(pPager); + } return 0; } diff --git a/source/os/src/osTimezone.c b/source/os/src/osTimezone.c index cbf20f02cd..575d5bc187 100644 --- a/source/os/src/osTimezone.c +++ b/source/os/src/osTimezone.c @@ -125,7 +125,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) { strcpy(outTimezoneStr, tz); } -#elif defined(_TD_DARWIN_64) +#else char buf[4096] = {0}; char *tz = NULL; { @@ -170,64 +170,5 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) { */ snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %+03ld00)", tz, tm1.tm_isdst ? tzname[daylight] : tzname[0], -timezone / 3600); - -#else - /* - * NOTE: do not remove it. - * Enforce set the correct daylight saving time(DST) flag according - * to current time - */ - time_t tx1 = taosGetTimestampSec(); - struct tm tm1; - taosLocalTime(&tx1, &tm1); - - /* load time zone string from /etc/timezone */ - // FILE *f = fopen("/etc/timezone", "r"); - errno = 0; - TdFilePtr pFile = taosOpenFile("/etc/timezone", TD_FILE_READ); - char buf[68] = {0}; - if (pFile != NULL) { - int len = taosReadFile(pFile, buf, 64); - if (len < 64 && taosGetErrorFile(pFile)) { - taosCloseFile(&pFile); - printf("read /etc/timezone error, reason:%s", strerror(errno)); - return; - } - - taosCloseFile(&pFile); - - buf[sizeof(buf) - 1] = 0; - char *lineEnd = strstr(buf, "\n"); - if (lineEnd != NULL) { - *lineEnd = 0; - } - - // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables - if (strlen(buf) > 0) { - setenv("TZ", buf, 1); - } - } - // get and set default timezone - tzset(); - - /* - * get CURRENT time zone. - * system current time zone is affected by daylight saving time(DST) - * - * e.g., the local time zone of London in DST is GMT+01:00, - * otherwise is GMT+00:00 - */ - int32_t tz = (-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR; - *tsTimezone = tz; - tz += daylight; - - /* - * format example: - * - * Asia/Shanghai (CST, +0800) - * Europe/London (BST, +0100) - */ - snprintf(outTimezoneStr, TD_TIMEZONE_LEN, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz)); - #endif } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 14503e7068..9dcbafca7a 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -911,7 +911,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; - taosThreadJoin(cacheRefreshWorker, NULL); + if(cacheThreadInit != PTHREAD_ONCE_INIT) taosThreadJoin(cacheRefreshWorker, NULL); taosArrayDestroy(pCacheArrayList); } diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 8a29a0a7a6..9dcd485194 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -460,8 +460,7 @@ class TDDnodes: processID = subprocess.check_output( psCmd, shell=True).decode("utf-8") - binPath = os.path.dirname(os.path.realpath(__file__)) - binPath = binPath + "/../../../debug/" + binPath = self.dnodes[0].getPath() + "/../../../" tdLog.debug("binPath %s" % (binPath)) binPath = os.path.realpath(binPath) tdLog.debug("binPath real path %s" % (binPath)) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index f2b9e0caab..9f197b16c8 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -70,6 +70,11 @@ ./test.sh -f tsim/tmq/basic2.sim ./test.sh -f tsim/tmq/basic3.sim ./test.sh -f tsim/tmq/basic4.sim +./test.sh -f tsim/tmq/basic1Of2Cons.sim +./test.sh -f tsim/tmq/basic2Of2Cons.sim +./test.sh -f tsim/tmq/basic3Of2Cons.sim +./test.sh -f tsim/tmq/basic4Of2Cons.sim +./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim # --- stable ./test.sh -f tsim/stable/disk.sim diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh index 7b5b5f4720..c3c300fb7b 100755 --- a/tests/script/sh/copy_udf.sh +++ b/tests/script/sh/copy_udf.sh @@ -4,9 +4,10 @@ set +e #set -x echo "Executing copy_udf.sh" - -SCRIPT_DIR=`pwd` +SCRIPT_DIR=`dirname $0` cd $SCRIPT_DIR/../ +SCRIPT_DIR=`pwd` +echo "SCRIPT_DIR: ${SCRIPT_DIR}" IN_TDINTERNAL="community" if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then @@ -16,6 +17,7 @@ else fi TAOS_DIR=`pwd` +echo "find udf library in $TAOS_DIR" UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1` UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1` diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index d2893ade57..6290403214 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -3,6 +3,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c wallevel -v 2 system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c startUdfd -v 1 print ========= start dnode1 as LEADER system sh/exec.sh -n dnode1 -s start @@ -65,25 +66,25 @@ if $data00 != 1.414213562 then endi sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2); -#sql select udf1(f1, f2) from t2; -#print $rows , $data00 , $data10 , $data20 , $data30 -#if $rows != 4 then -# return -1 -#endi -#if $data00 != 88 then -# return -1 -#endi -#if $data10 != 88 then -# return -1 -#endi -# -#if $data20 != NULL then -# return -1 -#endi -# -#if $data30 != NULL then -# return -1 -#endi +sql select udf1(f1, f2) from t2; +print $rows , $data00 , $data10 , $data20 , $data30 +if $rows != 4 then + return -1 +endi +if $data00 != 88 then + return -1 +endi +if $data10 != 88 then + return -1 +endi + +if $data20 != NULL then + return -1 +endi + +if $data30 != NULL then + return -1 +endi sql select udf2(f1, f2) from t2; print $rows, $data00 diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 5c5b1f543a..a6b4408cdc 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/char_length.py python3 ./test.py -f 2-query/upper.py python3 ./test.py -f 2-query/lower.py python3 ./test.py -f 2-query/join.py +python3 ./test.py -f 2-query/cast.py # python3 ./test.py -f 2-query/concat.py # after wal ,crash occured # python3 ./test.py -f 2-query/concat_ws.py