diff --git a/include/common/systable.h b/include/common/systable.h
index 37593144d8..f0f8ac8cf6 100644
--- a/include/common/systable.h
+++ b/include/common/systable.h
@@ -65,6 +65,11 @@ extern "C" {
#define TSDB_PERFS_TABLE_TRANS "perf_trans"
#define TSDB_PERFS_TABLE_APPS "perf_apps"
+#define TSDB_AUDIT_DB "audit"
+#define TSDB_AUDIT_STB_OPERATION "operations"
+#define TSDB_AUDIT_CTB_OPERATION "t_operations_"
+#define TSDB_AUDIT_CTB_OPERATION_LEN 13
+
typedef struct SSysDbTableSchema {
const char* name;
const int32_t type;
diff --git a/include/common/tglobal.h b/include/common/tglobal.h
index f23bb4d51b..04e9b5a380 100644
--- a/include/common/tglobal.h
+++ b/include/common/tglobal.h
@@ -232,7 +232,7 @@ struct SConfig *taosGetCfg();
void taosSetAllDebugFlag(int32_t flag);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
-int8_t taosGranted();
+int8_t taosGranted(int8_t type);
#ifdef __cplusplus
}
diff --git a/include/common/tgrant.h b/include/common/tgrant.h
index dbca2ac90c..2fa6dde8f6 100644
--- a/include/common/tgrant.h
+++ b/include/common/tgrant.h
@@ -30,6 +30,9 @@ extern "C" {
#define GRANT_HEART_BEAT_MIN 2
#define GRANT_ACTIVE_CODE "activeCode"
+#define GRANT_FLAG_ALL (0x01)
+#define GRANT_FLAG_AUDIT (0x02)
+#define GRANT_FLAG_VIEW (0x04)
typedef enum {
TSDB_GRANT_ALL,
@@ -50,11 +53,13 @@ typedef enum {
TSDB_GRANT_SUBSCRIPTION,
TSDB_GRANT_AUDIT,
TSDB_GRANT_CSV,
+ TSDB_GRANT_VIEW,
TSDB_GRANT_MULTI_TIER,
TSDB_GRANT_BACKUP_RESTORE,
} EGrantType;
-int32_t grantCheck(EGrantType grant);
+int32_t grantCheck(EGrantType grant); // less
+int32_t grantCheckLE(EGrantType grant); // less or equal
char* tGetMachineId();
#ifndef TD_UNIQ_GRANT
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
@@ -69,7 +74,7 @@ int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, ch
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
- {.name = "state", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
+ {.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 4a1a43324b..f195aca18f 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -3925,6 +3925,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
+#define SUBMIT_REQ_FROM_FILE 0x4
typedef struct {
int32_t flags;
diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h
index b34b998d76..1957bcbb24 100644
--- a/include/libs/catalog/catalog.h
+++ b/include/libs/catalog/catalog.h
@@ -358,7 +358,7 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes);
-int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
+int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h
index 24fd79d3d7..7a1b655931 100644
--- a/include/libs/nodes/plannodes.h
+++ b/include/libs/nodes/plannodes.h
@@ -724,8 +724,10 @@ typedef struct SSubplan {
SNode* pTagCond;
SNode* pTagIndexCond;
bool showRewrite;
- int32_t rowsThreshold;
+ bool isView;
+ bool isAudit;
bool dynamicRowThreshold;
+ int32_t rowsThreshold;
} SSubplan;
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h
index 6a41f4607b..2ac2c3ccbd 100644
--- a/include/libs/parser/parser.h
+++ b/include/libs/parser/parser.h
@@ -86,8 +86,10 @@ typedef struct SParseContext {
bool enableSysInfo;
bool async;
bool hasInvisibleCol;
- const char* svrVer;
+ bool isView;
+ bool isAudit;
bool nodeOffline;
+ const char* svrVer;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;
diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h
index 1b523c0323..707d70b71b 100644
--- a/include/libs/planner/planner.h
+++ b/include/libs/planner/planner.h
@@ -32,6 +32,8 @@ typedef struct SPlanContext {
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
+ bool isView;
+ bool isAudit;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index f79a0a0718..8d6cd6a3c0 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -66,7 +66,11 @@ typedef enum {
#define QUERY_RSP_POLICY_QUICK 1
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
-#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
+#define QUERY_MSG_MASK_AUDIT() (1 << 1)
+#define QUERY_MSG_MASK_VIEW() (1 << 2)
+#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
+#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0)
+#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0)
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
@@ -338,6 +342,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
+#define IS_AUDIT_DBNAME(_dbname) ((*(_dbname) == 'a') && (0 == strcmp(_dbname, TSDB_AUDIT_DB)))
+#define IS_AUDIT_STB_NAME(_stbname) ((*(_stbname) == 'o') && (0 == strcmp(_stbname, TSDB_AUDIT_STB_OPERATION)))
+#define IS_AUDIT_CTB_NAME(_ctbname) \
+ ((*(_ctbname) == 't') && (0 == strncmp(_ctbname, TSDB_AUDIT_CTB_OPERATION, TSDB_AUDIT_CTB_OPERATION_LEN)))
+
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index fed4081ccb..c603f9f5ac 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#include "tdatablock.h"
#include "rocksdb/c.h"
@@ -20,9 +23,6 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index dce8fffe11..2135bb706b 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,6 +13,9 @@
* along with this program. If not, see .
*/
+#ifndef _STREAM_H_
+#define _STREAM_H_
+
#include "os.h"
#include "streamState.h"
#include "tdatablock.h"
@@ -26,9 +29,6 @@
extern "C" {
#endif
-#ifndef _STREAM_H_
-#define _STREAM_H_
-
#define ONE_MiB_F (1048576.0)
#define ONE_KiB_F (1024.0)
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index 94fe80b901..c0c20a0fde 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -575,8 +575,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
-#define TSDB_CODE_GRANT_CSV_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0824)
-#define TSDB_CODE_GRANT_AUDIT_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0825)
// sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x
diff --git a/include/util/tdef.h b/include/util/tdef.h
index 4698d50e67..f136005026 100644
--- a/include/util/tdef.h
+++ b/include/util/tdef.h
@@ -287,6 +287,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_CLUSTER_VALUE_LEN 1000
+#define TSDB_GRANT_LOG_COL_LEN 15072
#define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 63a65d7c95..a146712cab 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -843,7 +843,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
view->version = htonl(view->version);
}
- tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
+ tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index c1e1da617d..d3d8ee1dc1 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -1154,6 +1154,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
+ .isView = pWrapper->pParseCtx->isView,
+ .isAudit = pWrapper->pParseCtx->isAudit,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,
diff --git a/source/common/src/systable.c b/source/common/src/systable.c
index 77083d0425..47eac317ec 100644
--- a/source/common/src/systable.c
+++ b/source/common/src/systable.c
@@ -349,21 +349,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
};
static const SSysDbTableSchema useGrantsFullSchema[] = {
- {.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
- {.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
- {.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
- {.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+ {.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema useGrantsLogsSchema[] = {
- {.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
- {.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
- {.name = "machine", .bytes = 9088 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
+ {.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "machine", .bytes = TSDB_GRANT_LOG_COL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema useMachinesSchema[] = {
{.name = "id", .bytes = TSDB_CLUSTER_ID_LEN + 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
- {.name = "machine", .bytes = 6016 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
+ {.name = "machine", .bytes = 7552 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysTableMeta infosMeta[] = {
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index 46c4d613fb..444a4c0ccc 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -1801,4 +1801,17 @@ void taosSetAllDebugFlag(int32_t flag) {
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
}
-int8_t taosGranted() { return atomic_load_8(&tsGrant); }
+int8_t taosGranted(int8_t type) {
+ switch (type) {
+ case TSDB_GRANT_ALL:
+ return atomic_load_8(&tsGrant) & GRANT_FLAG_ALL;
+ case TSDB_GRANT_AUDIT:
+ return atomic_load_8(&tsGrant) & GRANT_FLAG_AUDIT;
+ case TSDB_GRANT_VIEW:
+ return atomic_load_8(&tsGrant) & GRANT_FLAG_VIEW;
+ default:
+ ASSERTS(0, "undefined grant type:%" PRIi8, type);
+ break;
+ }
+ return 0;
+}
\ No newline at end of file
diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c
index f212d71362..8e4fe9febb 100644
--- a/source/common/src/tgrant.c
+++ b/source/common/src/tgrant.c
@@ -19,5 +19,6 @@
#ifndef _GRANT
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
+int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;}
#endif
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index bedd72759d..554205decb 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -556,7 +556,7 @@ typedef struct {
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
-void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
+void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h
index 372612274f..4d1125a340 100644
--- a/source/dnode/mnode/impl/inc/mndStream.h
+++ b/source/dnode/mnode/impl/inc/mndStream.h
@@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
+void mndInitExecInfo();
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c
index 1a55a161bf..f2b279276e 100644
--- a/source/dnode/mnode/impl/src/mndCluster.c
+++ b/source/dnode/mnode/impl/src/mndCluster.c
@@ -409,7 +409,7 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
}
{ // audit
- auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, cfgReq.sqlLen);
+ auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
}
_exit:
tFreeSMCfgClusterReq(&cfgReq);
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 3ce548a4f6..753076f1f3 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED;
}
- if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) {
+ if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) {
code = terrno;
goto FAILED;
}
@@ -240,9 +240,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR
}
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
strcpy(data->topic, topic);
- if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIPTION) < 0) {
+ if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
+ grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) {
data->noPrivilege = 1;
- } else{
+ } else {
data->noPrivilege = 0;
}
mndReleaseTopic(pMnode, pTopic);
diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c
index 0b85f8fd5a..2ec6e09d12 100644
--- a/source/dnode/mnode/impl/src/mndGrant.c
+++ b/source/dnode/mnode/impl/src/mndGrant.c
@@ -79,11 +79,6 @@ char *tGetMachineId() { return NULL; };
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; }
-#else
-#ifndef TD_UNIQ_GRANT
-char *tGetMachineId() { return NULL; };
-int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; }
-#endif
#endif
void mndGenerateMachineCode() { grantParseParameter(); }
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index fc460ac67c..0a78914011 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
-static void freeCheckpointCandEntry(void *);
-static void freeTaskList(void *param);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
- taosThreadMutexInit(&execInfo.lock, NULL);
- _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
-
- execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
- execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
- execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
-
- taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
- taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+ mndInitExecInfo();
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
@@ -1628,7 +1616,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
- if(grantCheck(TSDB_GRANT_STREAMS) < 0){
+ if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
terrno = TSDB_CODE_GRANT_EXPIRED;
return -1;
}
@@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
-void freeCheckpointCandEntry(void *param) {
- SCheckpointCandEntry *pEntry = param;
- taosMemoryFreeClear(pEntry->pName);
-}
-
-void freeTaskList(void* param) {
- SArray** pList = (SArray **)param;
- taosArrayDestroy(*pList);
-}
-
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
@@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c
index 4426ab0672..97474fa851 100644
--- a/source/dnode/mnode/impl/src/mndStreamHb.c
+++ b/source/dnode/mnode/impl/src/mndStreamHb.c
@@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
- if(grantCheck(TSDB_GRANT_STREAMS) < 0){
+ if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
if(suspendAllStreams(pMnode, &pReq->info) < 0){
return -1;
}
@@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pFailedTasks) > 0) {
- bool allReady = true;
- SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
- taosArrayDestroy(p);
+ bool allReady = true;
+ if (pMnode != NULL) {
+ SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
+ taosArrayDestroy(p);
+ } else {
+ allReady = false;
+ }
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
- pInfo->checkpointId, pInfo->transId);
+ pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c
index 235c604b27..3cabce2201 100644
--- a/source/dnode/mnode/impl/src/mndStreamUtil.c
+++ b/source/dnode/mnode/impl/src/mndStreamUtil.c
@@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
taosWUnLockLatch(&pStream->lock);
return 0;
}
+
+static void freeCheckpointCandEntry(void *param) {
+ SCheckpointCandEntry *pEntry = param;
+ taosMemoryFreeClear(pEntry->pName);
+}
+
+static void freeTaskList(void* param) {
+ SArray** pList = (SArray **)param;
+ taosArrayDestroy(*pList);
+}
+
+void mndInitExecInfo() {
+ taosThreadMutexInit(&execInfo.lock, NULL);
+ _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
+
+ execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
+ execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
+ execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
+
+ taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
+ taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
+}
diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt
index a002b20bde..bc5b5125f1 100644
--- a/source/dnode/mnode/impl/test/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/CMakeLists.txt
@@ -4,7 +4,7 @@ add_subdirectory(acct)
#add_subdirectory(db)
#add_subdirectory(dnode)
add_subdirectory(func)
-#add_subdirectory(mnode)
+add_subdirectory(stream)
add_subdirectory(profile)
add_subdirectory(qnode)
add_subdirectory(sdb)
diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
new file mode 100644
index 0000000000..b1bb62735f
--- /dev/null
+++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt
@@ -0,0 +1,13 @@
+SET(CMAKE_CXX_STANDARD 11)
+
+aux_source_directory(. MNODE_STREAM_TEST_SRC)
+add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
+target_link_libraries(
+ streamTest
+ PRIVATE dnode gtest
+)
+
+add_test(
+ NAME streamTest
+ COMMAND streamTest
+)
diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp
new file mode 100644
index 0000000000..a3babad80c
--- /dev/null
+++ b/source/dnode/mnode/impl/test/stream/stream.cpp
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wwrite-strings"
+#pragma GCC diagnostic ignored "-Wunused-function"
+#pragma GCC diagnostic ignored "-Wunused-variable"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+
+#include
+#include
+#include "../../inc/mndStream.h"
+
+namespace {
+SRpcMsg buildHbReq() {
+ SStreamHbMsg msg = {0};
+ msg.vgId = 1;
+ msg.numOfTasks = 5;
+ msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry));
+
+ for (int32_t i = 0; i < 4; ++i) {
+ STaskStatusEntry entry = {0};
+ entry.nodeId = i + 1;
+ entry.stage = 1;
+ entry.id.taskId = i + 1;
+ entry.id.streamId = 999;
+
+ if (i == 0) {
+ entry.stage = 4;
+ }
+
+ taosArrayPush(msg.pTaskStatus, &entry);
+ }
+
+ // (p->checkpointId != 0) && p->checkpointFailed
+ // add failed checkpoint info
+ {
+ STaskStatusEntry entry = {0};
+ entry.nodeId = 5;
+ entry.stage = 1;
+
+ entry.id.taskId = 5;
+ entry.id.streamId = 999;
+
+ entry.checkpointId = 1;
+ entry.checkpointFailed = true;
+
+ taosArrayPush(msg.pTaskStatus, &entry);
+ }
+
+ int32_t tlen = 0;
+ int32_t code = 0;
+ SEncoder encoder;
+ void* buf = NULL;
+ SRpcMsg msg1 = {0};
+ msg1.info.noResp = 1;
+
+ tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code);
+ if (code < 0) {
+ goto _end;
+ }
+
+ buf = rpcMallocCont(tlen);
+ if (buf == NULL) {
+ goto _end;
+ }
+
+ tEncoderInit(&encoder, (uint8_t*)buf, tlen);
+ if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
+ rpcFreeCont(buf);
+ goto _end;
+ }
+ tEncoderClear(&encoder);
+
+ initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
+
+ taosArrayDestroy(msg.pTaskStatus);
+ return msg1;
+
+_end:
+ return msg1;
+}
+
+void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) {
+ SStreamExecInfo* pExecNode = &execInfo;
+
+ pTask->id.streamId = streamId;
+ pTask->id.taskId = taskId;
+ pTask->info.nodeId = nodeId;
+
+ STaskId id;
+ id.streamId = pTask->id.streamId;
+ id.taskId = pTask->id.taskId;
+
+ STaskStatusEntry entry;
+ streamTaskStatusInit(&entry, pTask);
+
+ entry.stage = 1;
+ entry.status = TASK_STATUS__READY;
+
+ taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
+ taosArrayPush(pExecNode->pTaskList, &id);
+}
+void initStreamExecInfo() {
+ SStreamExecInfo* pExecNode = &execInfo;
+
+ SStreamTask task = {0};
+ setTask(&task, 1, 999, 1);
+ setTask(&task, 1, 999, 2);
+ setTask(&task, 1, 999, 3);
+ setTask(&task, 1, 999, 4);
+ setTask(&task, 2, 999, 5);
+}
+
+void initNodeInfo() {
+ execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
+ SNodeEntry entry = {0};
+ entry.nodeId = 2;
+ entry.stageUpdated = true;
+ taosArrayPush(execInfo.pNodeList, &entry);
+}
+} // namespace
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+TEST(mndHbTest, handle_error_in_hb) {
+ mndInitExecInfo();
+ initStreamExecInfo();
+ initNodeInfo();
+
+ SRpcMsg msg = buildHbReq();
+ int32_t code = mndProcessStreamHb(&msg);
+
+ rpcFreeCont(msg.pCont);
+}
+
+#pragma GCC diagnostic pop
\ No newline at end of file
diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c
index 3ec6adee41..e4d7b11176 100644
--- a/source/dnode/vnode/src/vnd/vnodeSvr.c
+++ b/source/dnode/vnode/src/vnd/vnodeSvr.c
@@ -238,6 +238,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
TSDB_CHECK_CODE(code, lino, _exit);
}
+ if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) {
+ code = grantCheck(TSDB_GRANT_CSV);
+ TSDB_CHECK_CODE(code, lino, _exit);
+ }
+
int64_t uid;
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
diff --git a/source/libs/catalog/src/ctgRent.c b/source/libs/catalog/src/ctgRent.c
index 457285b147..a19eb19c02 100755
--- a/source/libs/catalog/src/ctgRent.c
+++ b/source/libs/catalog/src/ctgRent.c
@@ -300,7 +300,3 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName,
return TSDB_CODE_SUCCESS;
}
-
-
-
-
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index 0602016268..4a2fe2416f 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
+ curWin.winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c
index 93f8aff377..6b689264b5 100644
--- a/source/libs/executor/src/streamtimewindowoperator.c
+++ b/source/libs/executor/src/streamtimewindowoperator.c
@@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
+ winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0};
getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
@@ -2286,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pWinInfo = pIte;
+ if (!pWinInfo->pStatePos->beUpdated) {
+ continue;
+ }
+ pWinInfo->pStatePos->beUpdated = false;
saveResult(*pWinInfo, pStUpdated);
}
return TSDB_CODE_SUCCESS;
@@ -3425,6 +3430,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
+ curWin.winInfo.pStatePos->beUpdated = true;
SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index 00cd82847d..8c27515542 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -3330,6 +3330,8 @@ static const char* jkSubplanTagIndexCond = "TagIndexCond";
static const char* jkSubplanShowRewrite = "ShowRewrite";
static const char* jkSubplanRowsThreshold = "RowThreshold";
static const char* jkSubplanDynamicRowsThreshold = "DyRowThreshold";
+static const char* jkSubplanIsView = "IsView";
+static const char* jkSubplanIsAudit = "IsAudit";
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
const SSubplan* pNode = (const SSubplan*)pObj;
@@ -3368,6 +3370,12 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddBoolToObject(pJson, jkSubplanIsView, pNode->isView);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddBoolToObject(pJson, jkSubplanIsAudit, pNode->isAudit);
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold);
}
@@ -3415,6 +3423,12 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBoolValue(pJson, jkSubplanIsView, &pNode->isView);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetBoolValue(pJson, jkSubplanIsAudit, &pNode->isAudit);
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold);
}
diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c
index ad0cb6a169..9fa63df667 100644
--- a/source/libs/nodes/src/nodesMsgFuncs.c
+++ b/source/libs/nodes/src/nodesMsgFuncs.c
@@ -3970,6 +3970,12 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tlvEncodeValueBool(pEncoder, pNode->isView);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tlvEncodeValueBool(pEncoder, pNode->isAudit);
+ }
return code;
}
@@ -4025,7 +4031,12 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold);
}
-
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tlvDecodeValueBool(pDecoder, &pNode->isView);
+ }
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tlvDecodeValueBool(pDecoder, &pNode->isAudit);
+ }
return code;
}
diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c
index 512dfdaef2..7d10d1f2df 100644
--- a/source/libs/parser/src/parInsertSql.c
+++ b/source/libs/parser/src/parInsertSql.c
@@ -2144,13 +2144,15 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
pStmt->pTableCxtHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
-
int32_t numOfRows = 0;
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
if (TSDB_CODE_SUCCESS == code) {
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
+ if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
+ rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
+ }
if (!pStmt->fileProcessing) {
taosCloseFile(&pStmt->fp);
} else {
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 30f57a64dd..63891fc78b 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -3165,6 +3165,19 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl
return TSDB_CODE_SUCCESS;
}
+static int32_t translateAudit(STranslateContext* pCxt, SRealTableNode* pRealTable, SName* pName) {
+ if (pRealTable->pMeta->tableType == TSDB_SUPER_TABLE) {
+ if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_STB_NAME(pName->tname)) {
+ pCxt->pParseCxt->isAudit = true;
+ }
+ } else if (pRealTable->pMeta->tableType == TSDB_CHILD_TABLE) {
+ if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_CTB_NAME(pName->tname)) {
+ pCxt->pParseCxt->isAudit = true;
+ }
+ }
+ return 0;
+}
+
int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(*pTable)) {
@@ -3184,7 +3197,8 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) {
return translateView(pCxt, pTable, &name);
}
-#endif
+ translateAudit(pCxt, pRealTable, &name);
+#endif
code = setTableVgroupList(pCxt, &name, pRealTable);
if (TSDB_CODE_SUCCESS == code) {
code = setTableIndex(pCxt, &name, pRealTable);
@@ -8267,7 +8281,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
return code;
}
- code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
+ code = nodesListAppend((*pSelect1)->pGroupByList, nodesCloneNode((const SNode*)pNode1));
if (code) {
return code;
}
@@ -8280,18 +8294,17 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
pNode2->groupingSetType = GP_TYPE_NORMAL;
pNode2->pParameterList = nodesMakeList();
if (NULL == pNode2->pParameterList) {
- nodesDestroyNode((SNode*)pNode1);
+ nodesDestroyNode((SNode*)pNode2);
return TSDB_CODE_OUT_OF_MEMORY;
}
- code = nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
+ code = nodesListAppend(pNode2->pParameterList, nodesCloneNode((const SNode*)pFunc2));
if (code) {
nodesDestroyNode((SNode*)pNode2);
return code;
}
- code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
- return code;
+ return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c
index 6cd2b0f972..11f51fec83 100644
--- a/source/libs/planner/src/planPhysiCreater.c
+++ b/source/libs/planner/src/planPhysiCreater.c
@@ -2187,6 +2187,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
pSubplan->level = pLogicSubplan->level;
pSubplan->rowsThreshold = 4096;
pSubplan->dynamicRowThreshold = false;
+ pSubplan->isView = pCxt->pPlanCxt->isView;
+ pSubplan->isAudit = pCxt->pPlanCxt->isAudit;
if (NULL != pCxt->pPlanCxt->pUser) {
snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
}
diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c
index 9a1c309ab0..66ec460861 100644
--- a/source/libs/qworker/src/qwMsg.c
+++ b/source/libs/qworker/src/qwMsg.c
@@ -360,10 +360,24 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
- if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && !taosGranted()) {
- QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
- tFreeSSubQueryMsg(&msg);
- QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ if (chkGrant) {
+ if ((!TEST_SHOW_REWRITE_MASK(msg.msgMask))) {
+ if (!taosGranted(TSDB_GRANT_ALL)) {
+ QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
+ tFreeSSubQueryMsg(&msg);
+ QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ }
+ if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) {
+ QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
+ tFreeSSubQueryMsg(&msg);
+ QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ }
+ if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) {
+ QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
+ tFreeSSubQueryMsg(&msg);
+ QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
+ }
+ }
}
uint64_t sId = msg.sId;
diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c
index 5c67c7974f..1c0b31109e 100644
--- a/source/libs/scheduler/src/schRemote.c
+++ b/source/libs/scheduler/src/schRemote.c
@@ -1109,6 +1109,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
qMsg.refId = pJob->refId;
qMsg.execId = pTask->execId;
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
+ qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
+ qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
qMsg.taskType = TASK_TYPE_TEMP;
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
diff --git a/source/util/src/tbase64.c b/source/util/src/tbase64.c
index f6f12fef97..a2f4ddbc51 100644
--- a/source/util/src/tbase64.c
+++ b/source/util/src/tbase64.c
@@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE
#include "tbase64.h"
-#include
-#include
static char basis_64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index 9fcca86744..7510b89736 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -462,8 +462,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mism
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
-TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_LIMITED, "Csv has reached the licensed upper limit")
-TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_LIMITED, "Audit has reached the licensed upper limit")
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 9ea03b4e6b..103e67be46 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -350,6 +350,7 @@ fi
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
+,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py
@@ -567,7 +568,7 @@ fi
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4403.py
-
+,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_td28163.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh
index 9f67d437e2..d2f1e13e8f 100755
--- a/tests/script/sh/checkAsan.sh
+++ b/tests/script/sh/checkAsan.sh
@@ -72,7 +72,7 @@ python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
#0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144
#1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38
-runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | wc -l)
+runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | wc -l)
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim
index 67678963ea..ce5c57572e 100644
--- a/tests/script/tsim/stream/windowClose.sim
+++ b/tests/script/tsim/stream/windowClose.sim
@@ -290,6 +290,213 @@ if $data32 != $now32 then
return -1
endi
+print step 2 max delay 2s
+sql create database test15 vgroups 4;
+sql use test15;
+sql create table t1(ts timestamp, a int, b int , c int, d double);
+
+sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
+
+sleep 1000
+
+sql insert into t1 values(1648791213000,1,2,3,1.0);
+sql insert into t1 values(1648791233001,2,2,3,1.1);
+
+$loop_count = 0
+
+loop4:
+
+sleep 1000
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamt13;
+
+if $rows != 2 then
+ print ======rows=$rows
+ goto loop4
+endi
+
+$now02 = $data02
+$now12 = $data12
+
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print session max delay over
+
+print step 3 max delay 2s
+sql create database test16 vgroups 4;
+sql use test16;
+sql create table t1(ts timestamp, a int, b int , c int, d double);
+
+sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
+
+sleep 1000
+
+sql insert into t1 values(1648791213000,1,2,3,1.0);
+sql insert into t1 values(1648791233001,2,2,3,1.1);
+
+$loop_count = 0
+
+loop5:
+
+sleep 1000
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamt13;
+
+if $rows != 2 then
+ print ======rows=$rows
+ goto loop5
+endi
+
+$now02 = $data02
+$now12 = $data12
+
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print state max delay over
+
+print step 4 max delay 2s
+sql create database test17 vgroups 4;
+sql use test17;
+sql create table t1(ts timestamp, a int, b int , c int, d double);
+
+sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
+
+sleep 1000
+
+sql insert into t1 values(1648791213000,1,2,3,1.0);
+sql insert into t1 values(1648791213001,9,2,3,1.0);
+
+sql insert into t1 values(1648791233001,1,2,3,1.1);
+sql insert into t1 values(1648791233009,9,2,3,1.1);
+
+$loop_count = 0
+
+loop6:
+
+sleep 1000
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+sql select * from streamt13;
+
+if $rows != 2 then
+ print ======rows=$rows
+ goto loop6
+endi
+
+$now02 = $data02
+$now12 = $data12
+
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print step1 max delay 2s......... sleep 3s
+sleep 3000
+
+sql select * from streamt13;
+
+
+if $data02 != $now02 then
+ print ======data02=$data02
+ return -1
+endi
+
+if $data12 != $now12 then
+ print ======data12=$data12
+ return -1
+endi
+
+print event max delay over
+
print ======over
system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/system-test/1-insert/test_ts4479.py b/tests/system-test/1-insert/test_ts4479.py
new file mode 100644
index 0000000000..be9789b5fc
--- /dev/null
+++ b/tests/system-test/1-insert/test_ts4479.py
@@ -0,0 +1,75 @@
+import os
+import sys
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.dnodes import tdDnodes
+from math import inf
+import taos
+
+
+class TDTestCase:
+ """Verify inserting varbinary type data of ts-4479
+ """
+ def init(self, conn, logSql, replicaVer=1):
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor(), True)
+ self.conn = conn
+ self.db_name = "db"
+ self.stable_name = "st"
+
+ def run(self):
+ tdSql.execute("create database if not exists %s" % self.db_name)
+ tdSql.execute("use %s" % self.db_name)
+ # create super table
+ tdSql.execute("create table %s (ts timestamp, c1 varbinary(65517)) tags (t1 varbinary(16382))" % self.stable_name)
+
+ # varbinary tag length is more than 16382
+ tag = os.urandom(16383).hex()
+ tdSql.error("create table ct using st tags(%s);" % ('\\x' + tag))
+
+ # create child table with max column and tag length
+ child_table_list = []
+ for i in range(2):
+ child_table_name = "ct_" + str(i+1)
+ child_table_list.append(child_table_name)
+ tag = os.urandom(16382).hex()
+ tdSql.execute("create table %s using st tags('%s');" % (child_table_name, '\\x' + tag))
+ tdLog.info("create table %s successfully" % child_table_name)
+
+ # varbinary column length is more than 65517
+ value = os.urandom(65518).hex()
+ tdSql.error("insert into ct_1 values(now, '\\x%s');" % value)
+
+ # insert data
+ for i in range(10):
+ sql = "insert into table_name values"
+ for j in range(5):
+ value = os.urandom(65517).hex()
+ sql += "(now+%ss, '%s')," % (str(j+1), '\\x' + value)
+ for child_table in child_table_list:
+ tdSql.execute(sql.replace("table_name", child_table))
+ tdLog.info("Insert data into %s successfully" % child_table)
+ tdLog.info("Insert data round %s successfully" % str(i+1))
+ tdSql.execute("flush database %s" % self.db_name)
+
+ # insert \\x to varbinary column
+ tdSql.execute("insert into ct_1 values(now, '\\x');")
+ tdSql.query("select * from ct_1 where c1 = '\\x';")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 1, b'')
+
+ # insert \\x to varbinary tag
+ tdSql.execute("create table ct_3 using st tags('\\x');")
+ tdSql.execute("insert into ct_3 values(now, '\\x45');")
+ tdSql.query("select * from st where t1='';")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 2, b'')
+
+ def stop(self):
+ tdSql.execute("drop database if exists %s" % self.db_name)
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
diff --git a/tests/system-test/2-query/test_td28163.py b/tests/system-test/2-query/test_td28163.py
new file mode 100644
index 0000000000..df727f6c5a
--- /dev/null
+++ b/tests/system-test/2-query/test_td28163.py
@@ -0,0 +1,265 @@
+import random
+import itertools
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.sqlset import *
+from util import constant
+from util.common import *
+
+
+class TDTestCase:
+ """Verify the jira TD-28163
+ """
+ def init(self, conn, logSql, replicaVar=1):
+ self.replicaVar = int(replicaVar)
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+
+ def prepareData(self):
+ # db
+ tdSql.execute("create database if not exists db")
+ tdSql.execute("use db")
+
+ # super table
+ tdSql.execute("create stable st(ts timestamp, c_ts_empty timestamp, c_int int, c_int_empty int, c_unsigned_int int unsigned, \
+ c_unsigned_int_empty int unsigned, c_bigint bigint, c_bigint_empty bigint, c_unsigned_bigint bigint unsigned, \
+ c_unsigned_bigint_empty bigint unsigned, c_float float, c_float_empty float, c_double double, c_double_empty double, \
+ c_binary binary(16), c_binary_empty binary(16), c_smallint smallint, c_smallint_empty smallint, \
+ c_smallint_unsigned smallint unsigned, c_smallint_unsigned_empty smallint unsigned, c_tinyint tinyint, \
+ c_tinyint_empty tinyint, c_tinyint_unsigned tinyint unsigned, c_tinyint_unsigned_empty tinyint unsigned, \
+ c_bool bool, c_bool_empty bool, c_nchar nchar(16), c_nchar_empty nchar(16), c_varchar varchar(16), \
+ c_varchar_empty varchar(16), c_varbinary varbinary(16), c_varbinary_empty varbinary(16)) \
+ tags(t_timestamp timestamp, t_timestamp_empty timestamp, t_int int, t_int_empty int, \
+ t_unsigned_int int unsigned, t_unsigned_int_empty int unsigned, t_bigint bigint, t_bigint_empty bigint, \
+ t_unsigned_bigint bigint unsigned, t_unsigned_bigint_empty bigint unsigned, t_float float, t_float_empty float, \
+ t_double double, t_double_empty double, t_binary binary(16), t_binary_empty binary(16), t_smallint smallint, \
+ t_smallint_empty smallint, t_smallint_unsigned smallint unsigned, t_smallint_unsigned_empty smallint unsigned, \
+ t_tinyint tinyint, t_tinyint_empty tinyint, t_tinyint_unsigned tinyint unsigned, t_tinyint_unsigned_empty tinyint unsigned, \
+ t_bool bool, t_bool_empty bool, t_nchar nchar(16), t_nchar_empty nchar(16), t_varchar varchar(16), \
+ t_varchar_empty varchar(16), t_varbinary varbinary(16), t_varbinary_empty varbinary(16));")
+
+ # child tables
+ start_ts = 1704085200000
+ tags = [
+ "'2024-01-01 13:00:01', null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', '', 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null",
+ "'2024-01-01 13:00:02', null, 2, null, 2, null, 2222222222222222, null, 2222222222222222, null, 2.2, null, 2.22, null, 'bbbbbbbb', '', 2, null, 2, null, 2, null, 2, null, False, null, 'ncharbb', null, 'varcharbb', null, '0x7661726332', null",
+ "'2024-01-01 13:00:03', null, 3, null, 3, null, 3333333333333333, null, 3333333333333333, null, 3.3, null, 3.33, null, 'cccccccc', '', 3, null, 3, null, 3, null, 3, null, True, null, 'ncharcc', null, 'varcharcc', null, '0x7661726333', null",
+ "'2024-01-01 13:00:04', null, 4, null, 4, null, 4444444444444444, null, 4444444444444444, null, 4.4, null, 4.44, null, 'dddddddd', '', 4, null, 4, null, 4, null, 4, null, False, null, 'nchardd', null, 'varchardd', null, '0x7661726334', null",
+ "'2024-01-01 13:00:05', null, 5, null, 5, null, 5555555555555555, null, 5555555555555555, null, 5.5, null, 5.55, null, 'eeeeeeee', '', 5, null, 5, null, 5, null, 5, null, True, null, 'ncharee', null, 'varcharee', null, '0x7661726335', null",
+ ]
+ for i in range(5):
+ tdSql.execute(f"create table ct{i+1} using st tags({tags[i]});")
+
+ # insert data
+ data = "null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', null, 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null"
+ for round in range(100):
+ sql = f"insert into ct{i+1} values"
+ for j in range(100):
+ sql += f"({start_ts + (round * 100 + j + 1) * 1000}, {data})"
+ sql += ";"
+ tdSql.execute(sql)
+ tdLog.debug("Prepare data successfully")
+
+ def test_query_with_filter(self):
+ # total row number
+ tdSql.query("select count(*) from st;")
+ total_rows = tdSql.queryResult[0][0]
+ tdLog.debug("Total row number is %s" % total_rows)
+
+ # start_ts and end_ts
+ tdSql.query("select first(ts), last(ts) from st;")
+ start_ts = tdSql.queryResult[0][0]
+ end_ts = tdSql.queryResult[0][1]
+ tdLog.debug("start_ts is %s, end_ts is %s" % (start_ts, end_ts))
+
+ filter_dic = {
+ "all_filter_list": ["ts <= now", "t_timestamp <= now", f"ts between '{start_ts}' and '{end_ts}'",
+ f"t_timestamp between '{start_ts}' and '{end_ts}'", "c_ts_empty is null",
+ "t_timestamp_empty is null", "ts > '1970-01-01 00:00:00'", "t_int in (1, 2, 3, 4, 5)",
+ "c_int=1", "c_int_empty is null", "c_unsigned_int=1", "c_unsigned_int_empty is null",
+ "c_unsigned_int in (1, 2, 3, 4, 5)", "c_unsigned_int_empty is null", "c_bigint=1111111111111111",
+ "c_bigint_empty is null", "c_unsigned_bigint in (1111111111111111)", "c_unsigned_bigint_empty is null",
+ "c_float=1.1", "c_float_empty is null", "c_double=1.11", "c_double_empty is null", "c_binary='aaaaaaaa'",
+ "c_binary_empty is null", "c_smallint=1", "c_smallint_empty is null", "c_smallint_unsigned=1",
+ "c_smallint_unsigned_empty is null", "c_tinyint=1", "c_tinyint_empty is null", "c_tinyint_unsigned=1",
+ "c_tinyint_unsigned_empty is null", "c_bool=True", "c_bool_empty is null", "c_nchar='ncharaa'",
+ "c_nchar_empty is null", "c_varchar='varcharaa'", "c_varchar_empty is null", "c_varbinary='0x7661726331'",
+ "c_varbinary_empty is null"],
+ "empty_filter_list": ["ts > now", "t_timestamp > now", "c_ts_empty is not null","t_timestamp_empty is not null",
+ "ts <= '1970-01-01 00:00:00'", "c_ts_empty < '1970-01-01 00:00:00'", "c_int <> 1", "c_int_empty is not null",
+ "t_int in (10, 11)", "t_int_empty is not null"]
+ }
+ for filter in filter_dic["all_filter_list"]:
+ tdLog.debug("Execute query with filter '%s'" % filter)
+ tdSql.query(f"select * from st where {filter};")
+ tdSql.checkRows(total_rows)
+
+ for filter in filter_dic["empty_filter_list"]:
+ tdLog.debug("Execute query with filter '%s'" % filter)
+ tdSql.query(f"select * from st where {filter};")
+ tdSql.checkRows(0)
+
+ def test_query_with_groupby(self):
+ tdSql.query("select count(*) from st group by tbname;")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 0, 10000)
+
+ tdSql.query("select count(c_unsigned_int_empty + c_int_empty * c_float_empty - c_double_empty + c_smallint_empty / c_tinyint_empty) from st where c_int_empty is null group by tbname;")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 0, 0)
+
+ tdSql.query("select sum(t_unsigned_int_empty + t_int_empty * t_float_empty - t_double_empty + t_smallint_empty / t_tinyint_empty) from st where t_int_empty is null group by tbname;")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 0, None)
+
+ tdSql.query("select max(c_bigint_empty) from st group by tbname, t_bigint_empty, t_float_empty, t_double_empty;")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 0, None)
+
+ tdSql.query("select min(t_double) as v from st where c_nchar like '%aa%' and t_double is not null group by tbname, t_bigint_empty, t_float_empty, t_double_empty order by v limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 1.11)
+
+ tdSql.query("select top(c_float, 1) as v from st where c_nchar like '%aa%' group by tbname order by v desc slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 1.1)
+
+ tdSql.query("select first(ts) from st where c_varchar is not null partition by tbname order by ts slimit 1;")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
+
+ tdSql.query("select first(c_nchar_empty) from st group by tbname;")
+ tdSql.checkRows(0)
+
+ tdSql.query("select first(ts), first(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
+ tdSql.checkData(0, 1, None)
+
+ tdSql.query("select first(c_nchar_empty) from st group by t_timestamp_empty order by t_timestamp;")
+ tdSql.checkRows(0)
+
+ tdSql.query("select last(ts), last(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
+ tdSql.checkData(0, 1, None)
+
+ tdSql.query("select elapsed(ts, 1s) t from st where c_int = 1 and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 9999)
+
+ tdSql.query("select elapsed(ts, 1s) t from st where c_int_empty is not null and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;")
+ tdSql.checkRows(0)
+
+ def test_query_with_join(self):
+ tdSql.query("select count(*) from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is not null;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0)
+
+ tdSql.query("select count(t1.c_ts_empty) as v from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is null order by v desc;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0)
+
+ tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int > t2.c_int;")
+ tdSql.checkRows(0)
+
+ tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int <= t2.c_int;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 1)
+ tdSql.checkData(0, 1, 1076616672134475760)
+
+ tdSql.query("select count(t1.c_float_empty) from st t1, st t2 where t1.ts=t2.ts and t1.c_int = t2.c_int and t1.t_int_empty=t2.t_int_empty;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0)
+
+ def test_query_with_window(self):
+ # time window
+ tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);")
+ tdSql.checkRows(841)
+ tdSql.checkData(0, 0, 10)
+
+ tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);")
+ tdSql.checkRows(65)
+
+ # status window
+ tdSql.error("select _wstart, count(*) from st state_window(t_bool);")
+ tdSql.query("select _wstart, count(*) from st partition by tbname state_window(c_bool);")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 1, 10000)
+
+ # session window
+ tdSql.query("select _wstart, count(*) from st partition by tbname, t_int session(ts, 1m);")
+ tdSql.checkRows(5)
+ tdSql.checkData(0, 1, 10000)
+
+ # event window
+ tdSql.query("select _wstart, _wend, count(*) from (select * from st order by ts, tbname) event_window start with t_bool=true end with t_bool=false;")
+ tdSql.checkRows(20000)
+
+ def test_query_with_union(self):
+ tdSql.query("select count(ts) from (select * from ct1 union select * from ct2 union select * from ct3);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 10000)
+
+ tdSql.query("select count(ts) from (select * from ct1 union all select * from ct2 union all select * from ct3);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 30000)
+
+ tdSql.query("select count(*) from (select * from ct1 union select * from ct2 union select * from ct3);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 10000)
+
+ tdSql.query("select count(c_ts_empty) from (select * from ct1 union select * from ct2 union select * from ct3);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 0)
+
+ tdSql.query("select count(*) from (select ts from st union select c_ts_empty from st);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 10001)
+
+ tdSql.query("select count(*) from (select ts from st union all select c_ts_empty from st);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 100000)
+
+ tdSql.query("select count(ts) from (select ts from st union select c_ts_empty from st);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 10000)
+
+ tdSql.query("select count(ts) from (select ts from st union all select c_ts_empty from st);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 50000)
+
+ def test_nested_query(self):
+ tdSql.query("select elapsed(ts, 1s) from (select * from (select * from st where c_int = 1) where c_int_empty is null);")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 9999)
+
+ tdSql.query("select first(ts) as t, avg(c_int) as v from (select * from (select * from st where c_int = 1) where c_int_empty is null) group by t_timestamp order by t_timestamp desc slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
+ tdSql.checkData(0, 1, 1)
+
+ tdSql.query("select max(c_tinyint) from (select c_tinyint, tbname from st where c_float_empty is null or t_int_empty is null) group by tbname order by c_tinyint desc slimit 1 limit 1;")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 0, 1)
+
+ tdSql.query("select top(c_int, 3) from (select c_int, tbname from st where t_int in (2, 3)) group by tbname slimit 3;")
+ tdSql.checkRows(6)
+ tdSql.checkData(0, 0, 1)
+
+ def run(self):
+ self.prepareData()
+ self.test_query_with_filter()
+ self.test_query_with_groupby()
+ self.test_query_with_join()
+ self.test_query_with_window()
+ self.test_query_with_union()
+ self.test_nested_query()
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())