Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-22023
This commit is contained in:
commit
c588770699
|
@ -65,6 +65,11 @@ extern "C" {
|
||||||
#define TSDB_PERFS_TABLE_TRANS "perf_trans"
|
#define TSDB_PERFS_TABLE_TRANS "perf_trans"
|
||||||
#define TSDB_PERFS_TABLE_APPS "perf_apps"
|
#define TSDB_PERFS_TABLE_APPS "perf_apps"
|
||||||
|
|
||||||
|
#define TSDB_AUDIT_DB "audit"
|
||||||
|
#define TSDB_AUDIT_STB_OPERATION "operations"
|
||||||
|
#define TSDB_AUDIT_CTB_OPERATION "t_operations_"
|
||||||
|
#define TSDB_AUDIT_CTB_OPERATION_LEN 13
|
||||||
|
|
||||||
typedef struct SSysDbTableSchema {
|
typedef struct SSysDbTableSchema {
|
||||||
const char* name;
|
const char* name;
|
||||||
const int32_t type;
|
const int32_t type;
|
||||||
|
|
|
@ -232,7 +232,7 @@ struct SConfig *taosGetCfg();
|
||||||
void taosSetAllDebugFlag(int32_t flag);
|
void taosSetAllDebugFlag(int32_t flag);
|
||||||
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
|
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
|
||||||
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
|
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
|
||||||
int8_t taosGranted();
|
int8_t taosGranted(int8_t type);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,9 @@ extern "C" {
|
||||||
|
|
||||||
#define GRANT_HEART_BEAT_MIN 2
|
#define GRANT_HEART_BEAT_MIN 2
|
||||||
#define GRANT_ACTIVE_CODE "activeCode"
|
#define GRANT_ACTIVE_CODE "activeCode"
|
||||||
|
#define GRANT_FLAG_ALL (0x01)
|
||||||
|
#define GRANT_FLAG_AUDIT (0x02)
|
||||||
|
#define GRANT_FLAG_VIEW (0x04)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_GRANT_ALL,
|
TSDB_GRANT_ALL,
|
||||||
|
@ -50,11 +53,13 @@ typedef enum {
|
||||||
TSDB_GRANT_SUBSCRIPTION,
|
TSDB_GRANT_SUBSCRIPTION,
|
||||||
TSDB_GRANT_AUDIT,
|
TSDB_GRANT_AUDIT,
|
||||||
TSDB_GRANT_CSV,
|
TSDB_GRANT_CSV,
|
||||||
|
TSDB_GRANT_VIEW,
|
||||||
TSDB_GRANT_MULTI_TIER,
|
TSDB_GRANT_MULTI_TIER,
|
||||||
TSDB_GRANT_BACKUP_RESTORE,
|
TSDB_GRANT_BACKUP_RESTORE,
|
||||||
} EGrantType;
|
} EGrantType;
|
||||||
|
|
||||||
int32_t grantCheck(EGrantType grant);
|
int32_t grantCheck(EGrantType grant); // less
|
||||||
|
int32_t grantCheckLE(EGrantType grant); // less or equal
|
||||||
char* tGetMachineId();
|
char* tGetMachineId();
|
||||||
#ifndef TD_UNIQ_GRANT
|
#ifndef TD_UNIQ_GRANT
|
||||||
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
|
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
|
||||||
|
@ -69,7 +74,7 @@ int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, ch
|
||||||
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "state", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
|
||||||
|
|
|
@ -3925,6 +3925,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
|
|
||||||
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
|
||||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||||
|
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t flags;
|
int32_t flags;
|
||||||
|
|
|
@ -358,7 +358,7 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
|
||||||
|
|
||||||
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes);
|
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes);
|
||||||
|
|
||||||
int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
|
int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
|
||||||
|
|
||||||
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
|
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
|
||||||
|
|
||||||
|
|
|
@ -724,8 +724,10 @@ typedef struct SSubplan {
|
||||||
SNode* pTagCond;
|
SNode* pTagCond;
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
int32_t rowsThreshold;
|
bool isView;
|
||||||
|
bool isAudit;
|
||||||
bool dynamicRowThreshold;
|
bool dynamicRowThreshold;
|
||||||
|
int32_t rowsThreshold;
|
||||||
} SSubplan;
|
} SSubplan;
|
||||||
|
|
||||||
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
|
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
|
||||||
|
|
|
@ -86,8 +86,10 @@ typedef struct SParseContext {
|
||||||
bool enableSysInfo;
|
bool enableSysInfo;
|
||||||
bool async;
|
bool async;
|
||||||
bool hasInvisibleCol;
|
bool hasInvisibleCol;
|
||||||
const char* svrVer;
|
bool isView;
|
||||||
|
bool isAudit;
|
||||||
bool nodeOffline;
|
bool nodeOffline;
|
||||||
|
const char* svrVer;
|
||||||
SArray* pTableMetaPos; // sql table pos => catalog data pos
|
SArray* pTableMetaPos; // sql table pos => catalog data pos
|
||||||
SArray* pTableVgroupPos; // sql table pos => catalog data pos
|
SArray* pTableVgroupPos; // sql table pos => catalog data pos
|
||||||
int64_t allocatorId;
|
int64_t allocatorId;
|
||||||
|
|
|
@ -32,6 +32,8 @@ typedef struct SPlanContext {
|
||||||
bool streamQuery;
|
bool streamQuery;
|
||||||
bool rSmaQuery;
|
bool rSmaQuery;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
|
bool isView;
|
||||||
|
bool isAudit;
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
int64_t deleteMark;
|
int64_t deleteMark;
|
||||||
|
|
|
@ -66,7 +66,11 @@ typedef enum {
|
||||||
#define QUERY_RSP_POLICY_QUICK 1
|
#define QUERY_RSP_POLICY_QUICK 1
|
||||||
|
|
||||||
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
|
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
|
||||||
#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
|
#define QUERY_MSG_MASK_AUDIT() (1 << 1)
|
||||||
|
#define QUERY_MSG_MASK_VIEW() (1 << 2)
|
||||||
|
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
|
||||||
|
#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0)
|
||||||
|
#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0)
|
||||||
|
|
||||||
typedef struct STableComInfo {
|
typedef struct STableComInfo {
|
||||||
uint8_t numOfTags; // the number of tags in schema
|
uint8_t numOfTags; // the number of tags in schema
|
||||||
|
@ -338,6 +342,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
|
|
||||||
#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
|
#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
|
||||||
|
|
||||||
|
#define IS_AUDIT_DBNAME(_dbname) ((*(_dbname) == 'a') && (0 == strcmp(_dbname, TSDB_AUDIT_DB)))
|
||||||
|
#define IS_AUDIT_STB_NAME(_stbname) ((*(_stbname) == 'o') && (0 == strcmp(_stbname, TSDB_AUDIT_STB_OPERATION)))
|
||||||
|
#define IS_AUDIT_CTB_NAME(_ctbname) \
|
||||||
|
((*(_ctbname) == 't') && (0 == strncmp(_ctbname, TSDB_AUDIT_CTB_OPERATION, TSDB_AUDIT_CTB_OPERATION_LEN)))
|
||||||
|
|
||||||
#define qFatal(...) \
|
#define qFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
if (qDebugFlag & DEBUG_FATAL) { \
|
if (qDebugFlag & DEBUG_FATAL) { \
|
||||||
|
|
|
@ -13,6 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef _STREAM_STATE_H_
|
||||||
|
#define _STREAM_STATE_H_
|
||||||
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
#include "rocksdb/c.h"
|
#include "rocksdb/c.h"
|
||||||
|
@ -20,9 +23,6 @@
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
#include "tstreamFileState.h"
|
#include "tstreamFileState.h"
|
||||||
|
|
||||||
#ifndef _STREAM_STATE_H_
|
|
||||||
#define _STREAM_STATE_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -13,6 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#ifndef _STREAM_H_
|
||||||
|
#define _STREAM_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -26,9 +29,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef _STREAM_H_
|
|
||||||
#define _STREAM_H_
|
|
||||||
|
|
||||||
#define ONE_MiB_F (1048576.0)
|
#define ONE_MiB_F (1048576.0)
|
||||||
#define ONE_KiB_F (1024.0)
|
#define ONE_KiB_F (1024.0)
|
||||||
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
||||||
|
|
|
@ -575,8 +575,6 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
|
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
|
||||||
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
|
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
|
||||||
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
|
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
|
||||||
#define TSDB_CODE_GRANT_CSV_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0824)
|
|
||||||
#define TSDB_CODE_GRANT_AUDIT_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0825)
|
|
||||||
|
|
||||||
// sync
|
// sync
|
||||||
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x
|
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x
|
||||||
|
|
|
@ -287,6 +287,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DNODE_VALUE_LEN 256
|
#define TSDB_DNODE_VALUE_LEN 256
|
||||||
|
|
||||||
#define TSDB_CLUSTER_VALUE_LEN 1000
|
#define TSDB_CLUSTER_VALUE_LEN 1000
|
||||||
|
#define TSDB_GRANT_LOG_COL_LEN 15072
|
||||||
|
|
||||||
#define TSDB_ACTIVE_KEY_LEN 109
|
#define TSDB_ACTIVE_KEY_LEN 109
|
||||||
#define TSDB_CONN_ACTIVE_KEY_LEN 255
|
#define TSDB_CONN_ACTIVE_KEY_LEN 255
|
||||||
|
|
|
@ -843,7 +843,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
||||||
view->version = htonl(view->version);
|
view->version = htonl(view->version);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
|
tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum);
|
||||||
|
|
||||||
if (NULL == req->info) {
|
if (NULL == req->info) {
|
||||||
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||||
|
|
|
@ -1154,6 +1154,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||||
.pAstRoot = pQuery->pRoot,
|
.pAstRoot = pQuery->pRoot,
|
||||||
.showRewrite = pQuery->showRewrite,
|
.showRewrite = pQuery->showRewrite,
|
||||||
|
.isView = pWrapper->pParseCtx->isView,
|
||||||
|
.isAudit = pWrapper->pParseCtx->isAudit,
|
||||||
.pMsg = pRequest->msgBuf,
|
.pMsg = pRequest->msgBuf,
|
||||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||||
.pUser = pRequest->pTscObj->user,
|
.pUser = pRequest->pTscObj->user,
|
||||||
|
|
|
@ -349,21 +349,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema useGrantsFullSchema[] = {
|
static const SSysDbTableSchema useGrantsFullSchema[] = {
|
||||||
{.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema useGrantsLogsSchema[] = {
|
static const SSysDbTableSchema useGrantsLogsSchema[] = {
|
||||||
{.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "machine", .bytes = 9088 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "machine", .bytes = TSDB_GRANT_LOG_COL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema useMachinesSchema[] = {
|
static const SSysDbTableSchema useMachinesSchema[] = {
|
||||||
{.name = "id", .bytes = TSDB_CLUSTER_ID_LEN + 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "id", .bytes = TSDB_CLUSTER_ID_LEN + 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "machine", .bytes = 6016 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "machine", .bytes = 7552 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysTableMeta infosMeta[] = {
|
static const SSysTableMeta infosMeta[] = {
|
||||||
|
|
|
@ -1801,4 +1801,17 @@ void taosSetAllDebugFlag(int32_t flag) {
|
||||||
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
|
if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t taosGranted() { return atomic_load_8(&tsGrant); }
|
int8_t taosGranted(int8_t type) {
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_GRANT_ALL:
|
||||||
|
return atomic_load_8(&tsGrant) & GRANT_FLAG_ALL;
|
||||||
|
case TSDB_GRANT_AUDIT:
|
||||||
|
return atomic_load_8(&tsGrant) & GRANT_FLAG_AUDIT;
|
||||||
|
case TSDB_GRANT_VIEW:
|
||||||
|
return atomic_load_8(&tsGrant) & GRANT_FLAG_VIEW;
|
||||||
|
default:
|
||||||
|
ASSERTS(0, "undefined grant type:%" PRIi8, type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -19,5 +19,6 @@
|
||||||
#ifndef _GRANT
|
#ifndef _GRANT
|
||||||
|
|
||||||
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
||||||
|
int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;}
|
||||||
|
|
||||||
#endif
|
#endif
|
|
@ -556,7 +556,7 @@ typedef struct {
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete);
|
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
|
||||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||||
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
|
||||||
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
void destroyStreamTaskIter(SStreamTaskIter *pIter);
|
||||||
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
|
||||||
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
|
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
|
||||||
|
void mndInitExecInfo();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -409,7 +409,7 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // audit
|
{ // audit
|
||||||
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, cfgReq.sqlLen);
|
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
|
||||||
}
|
}
|
||||||
_exit:
|
_exit:
|
||||||
tFreeSMCfgClusterReq(&cfgReq);
|
tFreeSMCfgClusterReq(&cfgReq);
|
||||||
|
|
|
@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) {
|
if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
|
@ -240,9 +240,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR
|
||||||
}
|
}
|
||||||
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
|
||||||
strcpy(data->topic, topic);
|
strcpy(data->topic, topic);
|
||||||
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIPTION) < 0) {
|
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
|
||||||
|
grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) {
|
||||||
data->noPrivilege = 1;
|
data->noPrivilege = 1;
|
||||||
} else{
|
} else {
|
||||||
data->noPrivilege = 0;
|
data->noPrivilege = 0;
|
||||||
}
|
}
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
|
@ -79,11 +79,6 @@ char *tGetMachineId() { return NULL; };
|
||||||
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||||
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||||
int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; }
|
int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; }
|
||||||
#else
|
|
||||||
#ifndef TD_UNIQ_GRANT
|
|
||||||
char *tGetMachineId() { return NULL; };
|
|
||||||
int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; }
|
|
||||||
#endif
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void mndGenerateMachineCode() { grantParseParameter(); }
|
void mndGenerateMachineCode() { grantParseParameter(); }
|
|
@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||||
static void freeCheckpointCandEntry(void *);
|
|
||||||
static void freeTaskList(void *param);
|
|
||||||
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
||||||
|
@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||||
|
|
||||||
taosThreadMutexInit(&execInfo.lock, NULL);
|
mndInitExecInfo();
|
||||||
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
|
||||||
|
|
||||||
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
|
||||||
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
|
||||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
|
||||||
|
|
||||||
if (sdbSetTable(pMnode->pSdb, table) != 0) {
|
if (sdbSetTable(pMnode->pSdb, table) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1628,7 +1616,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
||||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
|
||||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeCheckpointCandEntry(void *param) {
|
|
||||||
SCheckpointCandEntry *pEntry = param;
|
|
||||||
taosMemoryFreeClear(pEntry->pName);
|
|
||||||
}
|
|
||||||
|
|
||||||
void freeTaskList(void* param) {
|
|
||||||
SArray** pList = (SArray **)param;
|
|
||||||
taosArrayDestroy(*pList);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||||
int32_t num = taosArrayGetSize(pList);
|
int32_t num = taosArrayGetSize(pList);
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||||
|
|
||||||
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
|
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){
|
||||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
// current checkpoint is failed, rollback from the checkpoint trans
|
// current checkpoint is failed, rollback from the checkpoint trans
|
||||||
// kill the checkpoint trans and then set all tasks status to be normal
|
// kill the checkpoint trans and then set all tasks status to be normal
|
||||||
if (taosArrayGetSize(pFailedTasks) > 0) {
|
if (taosArrayGetSize(pFailedTasks) > 0) {
|
||||||
bool allReady = true;
|
bool allReady = true;
|
||||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
if (pMnode != NULL) {
|
||||||
taosArrayDestroy(p);
|
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||||
|
taosArrayDestroy(p);
|
||||||
|
} else {
|
||||||
|
allReady = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (allReady || snodeChanged) {
|
if (allReady || snodeChanged) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
|
||||||
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
|
||||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||||
pInfo->checkpointId, pInfo->transId);
|
pInfo->checkpointId, pInfo->transId);
|
||||||
|
|
||||||
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void freeCheckpointCandEntry(void *param) {
|
||||||
|
SCheckpointCandEntry *pEntry = param;
|
||||||
|
taosMemoryFreeClear(pEntry->pName);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void freeTaskList(void* param) {
|
||||||
|
SArray** pList = (SArray **)param;
|
||||||
|
taosArrayDestroy(*pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndInitExecInfo() {
|
||||||
|
taosThreadMutexInit(&execInfo.lock, NULL);
|
||||||
|
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
|
|
||||||
|
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
|
||||||
|
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||||
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ add_subdirectory(acct)
|
||||||
#add_subdirectory(db)
|
#add_subdirectory(db)
|
||||||
#add_subdirectory(dnode)
|
#add_subdirectory(dnode)
|
||||||
add_subdirectory(func)
|
add_subdirectory(func)
|
||||||
#add_subdirectory(mnode)
|
add_subdirectory(stream)
|
||||||
add_subdirectory(profile)
|
add_subdirectory(profile)
|
||||||
add_subdirectory(qnode)
|
add_subdirectory(qnode)
|
||||||
add_subdirectory(sdb)
|
add_subdirectory(sdb)
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
|
||||||
|
aux_source_directory(. MNODE_STREAM_TEST_SRC)
|
||||||
|
add_executable(streamTest ${MNODE_STREAM_TEST_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
streamTest
|
||||||
|
PRIVATE dnode gtest
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME streamTest
|
||||||
|
COMMAND streamTest
|
||||||
|
)
|
|
@ -0,0 +1,155 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#pragma GCC diagnostic push
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
|
||||||
|
#include <libs/stream/tstream.h>
|
||||||
|
#include <libs/transport/trpc.h>
|
||||||
|
#include "../../inc/mndStream.h"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
SRpcMsg buildHbReq() {
|
||||||
|
SStreamHbMsg msg = {0};
|
||||||
|
msg.vgId = 1;
|
||||||
|
msg.numOfTasks = 5;
|
||||||
|
msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < 4; ++i) {
|
||||||
|
STaskStatusEntry entry = {0};
|
||||||
|
entry.nodeId = i + 1;
|
||||||
|
entry.stage = 1;
|
||||||
|
entry.id.taskId = i + 1;
|
||||||
|
entry.id.streamId = 999;
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
entry.stage = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(msg.pTaskStatus, &entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
// (p->checkpointId != 0) && p->checkpointFailed
|
||||||
|
// add failed checkpoint info
|
||||||
|
{
|
||||||
|
STaskStatusEntry entry = {0};
|
||||||
|
entry.nodeId = 5;
|
||||||
|
entry.stage = 1;
|
||||||
|
|
||||||
|
entry.id.taskId = 5;
|
||||||
|
entry.id.streamId = 999;
|
||||||
|
|
||||||
|
entry.checkpointId = 1;
|
||||||
|
entry.checkpointFailed = true;
|
||||||
|
|
||||||
|
taosArrayPush(msg.pTaskStatus, &entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
SEncoder encoder;
|
||||||
|
void* buf = NULL;
|
||||||
|
SRpcMsg msg1 = {0};
|
||||||
|
msg1.info.noResp = 1;
|
||||||
|
|
||||||
|
tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEncoderInit(&encoder, (uint8_t*)buf, tlen);
|
||||||
|
if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||||
|
|
||||||
|
taosArrayDestroy(msg.pTaskStatus);
|
||||||
|
return msg1;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
return msg1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) {
|
||||||
|
SStreamExecInfo* pExecNode = &execInfo;
|
||||||
|
|
||||||
|
pTask->id.streamId = streamId;
|
||||||
|
pTask->id.taskId = taskId;
|
||||||
|
pTask->info.nodeId = nodeId;
|
||||||
|
|
||||||
|
STaskId id;
|
||||||
|
id.streamId = pTask->id.streamId;
|
||||||
|
id.taskId = pTask->id.taskId;
|
||||||
|
|
||||||
|
STaskStatusEntry entry;
|
||||||
|
streamTaskStatusInit(&entry, pTask);
|
||||||
|
|
||||||
|
entry.stage = 1;
|
||||||
|
entry.status = TASK_STATUS__READY;
|
||||||
|
|
||||||
|
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
|
||||||
|
taosArrayPush(pExecNode->pTaskList, &id);
|
||||||
|
}
|
||||||
|
void initStreamExecInfo() {
|
||||||
|
SStreamExecInfo* pExecNode = &execInfo;
|
||||||
|
|
||||||
|
SStreamTask task = {0};
|
||||||
|
setTask(&task, 1, 999, 1);
|
||||||
|
setTask(&task, 1, 999, 2);
|
||||||
|
setTask(&task, 1, 999, 3);
|
||||||
|
setTask(&task, 1, 999, 4);
|
||||||
|
setTask(&task, 2, 999, 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
void initNodeInfo() {
|
||||||
|
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
|
SNodeEntry entry = {0};
|
||||||
|
entry.nodeId = 2;
|
||||||
|
entry.stageUpdated = true;
|
||||||
|
taosArrayPush(execInfo.pNodeList, &entry);
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(mndHbTest, handle_error_in_hb) {
|
||||||
|
mndInitExecInfo();
|
||||||
|
initStreamExecInfo();
|
||||||
|
initNodeInfo();
|
||||||
|
|
||||||
|
SRpcMsg msg = buildHbReq();
|
||||||
|
int32_t code = mndProcessStreamHb(&msg);
|
||||||
|
|
||||||
|
rpcFreeCont(msg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
#pragma GCC diagnostic pop
|
|
@ -238,6 +238,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) {
|
||||||
|
code = grantCheck(TSDB_GRANT_CSV);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||||
code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
|
code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
|
||||||
|
|
|
@ -300,7 +300,3 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName,
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
curWin.winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
|
|
@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&winInfo.sessionWin, &key);
|
getSessionHashKey(&winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||||
|
@ -2286,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||||
SResultWindowInfo* pWinInfo = pIte;
|
SResultWindowInfo* pWinInfo = pIte;
|
||||||
|
if (!pWinInfo->pStatePos->beUpdated) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pWinInfo->pStatePos->beUpdated = false;
|
||||||
saveResult(*pWinInfo, pStUpdated);
|
saveResult(*pWinInfo, pStUpdated);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3425,6 +3430,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
|
curWin.winInfo.pStatePos->beUpdated = true;
|
||||||
SSessionKey key = {0};
|
SSessionKey key = {0};
|
||||||
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
|
||||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||||
|
|
|
@ -3330,6 +3330,8 @@ static const char* jkSubplanTagIndexCond = "TagIndexCond";
|
||||||
static const char* jkSubplanShowRewrite = "ShowRewrite";
|
static const char* jkSubplanShowRewrite = "ShowRewrite";
|
||||||
static const char* jkSubplanRowsThreshold = "RowThreshold";
|
static const char* jkSubplanRowsThreshold = "RowThreshold";
|
||||||
static const char* jkSubplanDynamicRowsThreshold = "DyRowThreshold";
|
static const char* jkSubplanDynamicRowsThreshold = "DyRowThreshold";
|
||||||
|
static const char* jkSubplanIsView = "IsView";
|
||||||
|
static const char* jkSubplanIsAudit = "IsAudit";
|
||||||
|
|
||||||
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSubplan* pNode = (const SSubplan*)pObj;
|
const SSubplan* pNode = (const SSubplan*)pObj;
|
||||||
|
@ -3368,6 +3370,12 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
|
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkSubplanIsView, pNode->isView);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkSubplanIsAudit, pNode->isAudit);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold);
|
code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold);
|
||||||
}
|
}
|
||||||
|
@ -3415,6 +3423,12 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
|
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkSubplanIsView, &pNode->isView);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkSubplanIsAudit, &pNode->isAudit);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold);
|
code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3970,6 +3970,12 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold);
|
code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeValueBool(pEncoder, pNode->isView);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeValueBool(pEncoder, pNode->isAudit);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4025,7 +4031,12 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold);
|
code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvDecodeValueBool(pDecoder, &pNode->isView);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvDecodeValueBool(pDecoder, &pNode->isAudit);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2144,13 +2144,15 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
||||||
pStmt->pTableCxtHashObj =
|
pStmt->pTableCxtHashObj =
|
||||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pStmt->totalRowsNum += numOfRows;
|
pStmt->totalRowsNum += numOfRows;
|
||||||
pStmt->totalTbNum += 1;
|
pStmt->totalTbNum += 1;
|
||||||
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
|
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
|
||||||
|
if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
|
||||||
|
rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
|
||||||
|
}
|
||||||
if (!pStmt->fileProcessing) {
|
if (!pStmt->fileProcessing) {
|
||||||
taosCloseFile(&pStmt->fp);
|
taosCloseFile(&pStmt->fp);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -3165,6 +3165,19 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateAudit(STranslateContext* pCxt, SRealTableNode* pRealTable, SName* pName) {
|
||||||
|
if (pRealTable->pMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
|
if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_STB_NAME(pName->tname)) {
|
||||||
|
pCxt->pParseCxt->isAudit = true;
|
||||||
|
}
|
||||||
|
} else if (pRealTable->pMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
|
if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_CTB_NAME(pName->tname)) {
|
||||||
|
pCxt->pParseCxt->isAudit = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
|
int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(*pTable)) {
|
switch (nodeType(*pTable)) {
|
||||||
|
@ -3184,7 +3197,8 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable) {
|
||||||
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) {
|
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) {
|
||||||
return translateView(pCxt, pTable, &name);
|
return translateView(pCxt, pTable, &name);
|
||||||
}
|
}
|
||||||
#endif
|
translateAudit(pCxt, pRealTable, &name);
|
||||||
|
#endif
|
||||||
code = setTableVgroupList(pCxt, &name, pRealTable);
|
code = setTableVgroupList(pCxt, &name, pRealTable);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setTableIndex(pCxt, &name, pRealTable);
|
code = setTableIndex(pCxt, &name, pRealTable);
|
||||||
|
@ -8267,7 +8281,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1);
|
code = nodesListAppend((*pSelect1)->pGroupByList, nodesCloneNode((const SNode*)pNode1));
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -8280,18 +8294,17 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
||||||
pNode2->groupingSetType = GP_TYPE_NORMAL;
|
pNode2->groupingSetType = GP_TYPE_NORMAL;
|
||||||
pNode2->pParameterList = nodesMakeList();
|
pNode2->pParameterList = nodesMakeList();
|
||||||
if (NULL == pNode2->pParameterList) {
|
if (NULL == pNode2->pParameterList) {
|
||||||
nodesDestroyNode((SNode*)pNode1);
|
nodesDestroyNode((SNode*)pNode2);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2);
|
code = nodesListAppend(pNode2->pParameterList, nodesCloneNode((const SNode*)pFunc2));
|
||||||
if (code) {
|
if (code) {
|
||||||
nodesDestroyNode((SNode*)pNode2);
|
nodesDestroyNode((SNode*)pNode2);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
|
|
|
@ -2187,6 +2187,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
|
||||||
pSubplan->level = pLogicSubplan->level;
|
pSubplan->level = pLogicSubplan->level;
|
||||||
pSubplan->rowsThreshold = 4096;
|
pSubplan->rowsThreshold = 4096;
|
||||||
pSubplan->dynamicRowThreshold = false;
|
pSubplan->dynamicRowThreshold = false;
|
||||||
|
pSubplan->isView = pCxt->pPlanCxt->isView;
|
||||||
|
pSubplan->isAudit = pCxt->pPlanCxt->isAudit;
|
||||||
if (NULL != pCxt->pPlanCxt->pUser) {
|
if (NULL != pCxt->pPlanCxt->pUser) {
|
||||||
snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
|
snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,10 +360,24 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && !taosGranted()) {
|
if (chkGrant) {
|
||||||
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
|
if ((!TEST_SHOW_REWRITE_MASK(msg.msgMask))) {
|
||||||
tFreeSSubQueryMsg(&msg);
|
if (!taosGranted(TSDB_GRANT_ALL)) {
|
||||||
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
|
||||||
|
tFreeSSubQueryMsg(&msg);
|
||||||
|
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
||||||
|
}
|
||||||
|
if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) {
|
||||||
|
QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
|
||||||
|
tFreeSSubQueryMsg(&msg);
|
||||||
|
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
||||||
|
}
|
||||||
|
if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) {
|
||||||
|
QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
|
||||||
|
tFreeSSubQueryMsg(&msg);
|
||||||
|
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t sId = msg.sId;
|
uint64_t sId = msg.sId;
|
||||||
|
|
|
@ -1109,6 +1109,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
qMsg.refId = pJob->refId;
|
qMsg.refId = pJob->refId;
|
||||||
qMsg.execId = pTask->execId;
|
qMsg.execId = pTask->execId;
|
||||||
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
|
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
|
||||||
|
qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
|
||||||
|
qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
|
||||||
qMsg.taskType = TASK_TYPE_TEMP;
|
qMsg.taskType = TASK_TYPE_TEMP;
|
||||||
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||||
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
|
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);
|
||||||
|
|
|
@ -15,8 +15,6 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tbase64.h"
|
#include "tbase64.h"
|
||||||
#include <math.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
|
|
||||||
static char basis_64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
static char basis_64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
||||||
|
|
||||||
|
|
|
@ -462,8 +462,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mism
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
|
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
|
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
|
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_LIMITED, "Csv has reached the licensed upper limit")
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_LIMITED, "Audit has reached the licensed upper limit")
|
|
||||||
|
|
||||||
// sync
|
// sync
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
|
||||||
|
|
|
@ -350,6 +350,7 @@ fi
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py
|
||||||
|
@ -567,7 +568,7 @@ fi
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4403.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4403.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_td28163.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py
|
||||||
|
|
|
@ -72,7 +72,7 @@ python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
|
||||||
|
|
||||||
#0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144
|
#0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144
|
||||||
#1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38
|
#1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38
|
||||||
runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | wc -l)
|
runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | wc -l)
|
||||||
|
|
||||||
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
|
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
|
||||||
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
||||||
|
|
|
@ -290,6 +290,213 @@ if $data32 != $now32 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 2 max delay 2s
|
||||||
|
sql create database test15 vgroups 4;
|
||||||
|
sql use test15;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print session max delay over
|
||||||
|
|
||||||
|
print step 3 max delay 2s
|
||||||
|
sql create database test16 vgroups 4;
|
||||||
|
sql use test16;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791233001,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print state max delay over
|
||||||
|
|
||||||
|
print step 4 max delay 2s
|
||||||
|
sql create database test17 vgroups 4;
|
||||||
|
sql use test17;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
|
sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,9,2,3,1.0);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791233001,1,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791233009,9,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
if $rows != 2 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
$now02 = $data02
|
||||||
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step1 max delay 2s......... sleep 3s
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
||||||
|
if $data02 != $now02 then
|
||||||
|
print ======data02=$data02
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != $now12 then
|
||||||
|
print ======data12=$data12
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print event max delay over
|
||||||
|
|
||||||
print ======over
|
print ======over
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.dnodes import tdDnodes
|
||||||
|
from math import inf
|
||||||
|
import taos
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
"""Verify inserting varbinary type data of ts-4479
|
||||||
|
"""
|
||||||
|
def init(self, conn, logSql, replicaVer=1):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), True)
|
||||||
|
self.conn = conn
|
||||||
|
self.db_name = "db"
|
||||||
|
self.stable_name = "st"
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.execute("create database if not exists %s" % self.db_name)
|
||||||
|
tdSql.execute("use %s" % self.db_name)
|
||||||
|
# create super table
|
||||||
|
tdSql.execute("create table %s (ts timestamp, c1 varbinary(65517)) tags (t1 varbinary(16382))" % self.stable_name)
|
||||||
|
|
||||||
|
# varbinary tag length is more than 16382
|
||||||
|
tag = os.urandom(16383).hex()
|
||||||
|
tdSql.error("create table ct using st tags(%s);" % ('\\x' + tag))
|
||||||
|
|
||||||
|
# create child table with max column and tag length
|
||||||
|
child_table_list = []
|
||||||
|
for i in range(2):
|
||||||
|
child_table_name = "ct_" + str(i+1)
|
||||||
|
child_table_list.append(child_table_name)
|
||||||
|
tag = os.urandom(16382).hex()
|
||||||
|
tdSql.execute("create table %s using st tags('%s');" % (child_table_name, '\\x' + tag))
|
||||||
|
tdLog.info("create table %s successfully" % child_table_name)
|
||||||
|
|
||||||
|
# varbinary column length is more than 65517
|
||||||
|
value = os.urandom(65518).hex()
|
||||||
|
tdSql.error("insert into ct_1 values(now, '\\x%s');" % value)
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
for i in range(10):
|
||||||
|
sql = "insert into table_name values"
|
||||||
|
for j in range(5):
|
||||||
|
value = os.urandom(65517).hex()
|
||||||
|
sql += "(now+%ss, '%s')," % (str(j+1), '\\x' + value)
|
||||||
|
for child_table in child_table_list:
|
||||||
|
tdSql.execute(sql.replace("table_name", child_table))
|
||||||
|
tdLog.info("Insert data into %s successfully" % child_table)
|
||||||
|
tdLog.info("Insert data round %s successfully" % str(i+1))
|
||||||
|
tdSql.execute("flush database %s" % self.db_name)
|
||||||
|
|
||||||
|
# insert \\x to varbinary column
|
||||||
|
tdSql.execute("insert into ct_1 values(now, '\\x');")
|
||||||
|
tdSql.query("select * from ct_1 where c1 = '\\x';")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 1, b'')
|
||||||
|
|
||||||
|
# insert \\x to varbinary tag
|
||||||
|
tdSql.execute("create table ct_3 using st tags('\\x');")
|
||||||
|
tdSql.execute("insert into ct_3 values(now, '\\x45');")
|
||||||
|
tdSql.query("select * from st where t1='';")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 2, b'')
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.execute("drop database if exists %s" % self.db_name)
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -0,0 +1,265 @@
|
||||||
|
import random
|
||||||
|
import itertools
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.sqlset import *
|
||||||
|
from util import constant
|
||||||
|
from util.common import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
"""Verify the jira TD-28163
|
||||||
|
"""
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def prepareData(self):
|
||||||
|
# db
|
||||||
|
tdSql.execute("create database if not exists db")
|
||||||
|
tdSql.execute("use db")
|
||||||
|
|
||||||
|
# super table
|
||||||
|
tdSql.execute("create stable st(ts timestamp, c_ts_empty timestamp, c_int int, c_int_empty int, c_unsigned_int int unsigned, \
|
||||||
|
c_unsigned_int_empty int unsigned, c_bigint bigint, c_bigint_empty bigint, c_unsigned_bigint bigint unsigned, \
|
||||||
|
c_unsigned_bigint_empty bigint unsigned, c_float float, c_float_empty float, c_double double, c_double_empty double, \
|
||||||
|
c_binary binary(16), c_binary_empty binary(16), c_smallint smallint, c_smallint_empty smallint, \
|
||||||
|
c_smallint_unsigned smallint unsigned, c_smallint_unsigned_empty smallint unsigned, c_tinyint tinyint, \
|
||||||
|
c_tinyint_empty tinyint, c_tinyint_unsigned tinyint unsigned, c_tinyint_unsigned_empty tinyint unsigned, \
|
||||||
|
c_bool bool, c_bool_empty bool, c_nchar nchar(16), c_nchar_empty nchar(16), c_varchar varchar(16), \
|
||||||
|
c_varchar_empty varchar(16), c_varbinary varbinary(16), c_varbinary_empty varbinary(16)) \
|
||||||
|
tags(t_timestamp timestamp, t_timestamp_empty timestamp, t_int int, t_int_empty int, \
|
||||||
|
t_unsigned_int int unsigned, t_unsigned_int_empty int unsigned, t_bigint bigint, t_bigint_empty bigint, \
|
||||||
|
t_unsigned_bigint bigint unsigned, t_unsigned_bigint_empty bigint unsigned, t_float float, t_float_empty float, \
|
||||||
|
t_double double, t_double_empty double, t_binary binary(16), t_binary_empty binary(16), t_smallint smallint, \
|
||||||
|
t_smallint_empty smallint, t_smallint_unsigned smallint unsigned, t_smallint_unsigned_empty smallint unsigned, \
|
||||||
|
t_tinyint tinyint, t_tinyint_empty tinyint, t_tinyint_unsigned tinyint unsigned, t_tinyint_unsigned_empty tinyint unsigned, \
|
||||||
|
t_bool bool, t_bool_empty bool, t_nchar nchar(16), t_nchar_empty nchar(16), t_varchar varchar(16), \
|
||||||
|
t_varchar_empty varchar(16), t_varbinary varbinary(16), t_varbinary_empty varbinary(16));")
|
||||||
|
|
||||||
|
# child tables
|
||||||
|
start_ts = 1704085200000
|
||||||
|
tags = [
|
||||||
|
"'2024-01-01 13:00:01', null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', '', 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null",
|
||||||
|
"'2024-01-01 13:00:02', null, 2, null, 2, null, 2222222222222222, null, 2222222222222222, null, 2.2, null, 2.22, null, 'bbbbbbbb', '', 2, null, 2, null, 2, null, 2, null, False, null, 'ncharbb', null, 'varcharbb', null, '0x7661726332', null",
|
||||||
|
"'2024-01-01 13:00:03', null, 3, null, 3, null, 3333333333333333, null, 3333333333333333, null, 3.3, null, 3.33, null, 'cccccccc', '', 3, null, 3, null, 3, null, 3, null, True, null, 'ncharcc', null, 'varcharcc', null, '0x7661726333', null",
|
||||||
|
"'2024-01-01 13:00:04', null, 4, null, 4, null, 4444444444444444, null, 4444444444444444, null, 4.4, null, 4.44, null, 'dddddddd', '', 4, null, 4, null, 4, null, 4, null, False, null, 'nchardd', null, 'varchardd', null, '0x7661726334', null",
|
||||||
|
"'2024-01-01 13:00:05', null, 5, null, 5, null, 5555555555555555, null, 5555555555555555, null, 5.5, null, 5.55, null, 'eeeeeeee', '', 5, null, 5, null, 5, null, 5, null, True, null, 'ncharee', null, 'varcharee', null, '0x7661726335', null",
|
||||||
|
]
|
||||||
|
for i in range(5):
|
||||||
|
tdSql.execute(f"create table ct{i+1} using st tags({tags[i]});")
|
||||||
|
|
||||||
|
# insert data
|
||||||
|
data = "null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', null, 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null"
|
||||||
|
for round in range(100):
|
||||||
|
sql = f"insert into ct{i+1} values"
|
||||||
|
for j in range(100):
|
||||||
|
sql += f"({start_ts + (round * 100 + j + 1) * 1000}, {data})"
|
||||||
|
sql += ";"
|
||||||
|
tdSql.execute(sql)
|
||||||
|
tdLog.debug("Prepare data successfully")
|
||||||
|
|
||||||
|
def test_query_with_filter(self):
|
||||||
|
# total row number
|
||||||
|
tdSql.query("select count(*) from st;")
|
||||||
|
total_rows = tdSql.queryResult[0][0]
|
||||||
|
tdLog.debug("Total row number is %s" % total_rows)
|
||||||
|
|
||||||
|
# start_ts and end_ts
|
||||||
|
tdSql.query("select first(ts), last(ts) from st;")
|
||||||
|
start_ts = tdSql.queryResult[0][0]
|
||||||
|
end_ts = tdSql.queryResult[0][1]
|
||||||
|
tdLog.debug("start_ts is %s, end_ts is %s" % (start_ts, end_ts))
|
||||||
|
|
||||||
|
filter_dic = {
|
||||||
|
"all_filter_list": ["ts <= now", "t_timestamp <= now", f"ts between '{start_ts}' and '{end_ts}'",
|
||||||
|
f"t_timestamp between '{start_ts}' and '{end_ts}'", "c_ts_empty is null",
|
||||||
|
"t_timestamp_empty is null", "ts > '1970-01-01 00:00:00'", "t_int in (1, 2, 3, 4, 5)",
|
||||||
|
"c_int=1", "c_int_empty is null", "c_unsigned_int=1", "c_unsigned_int_empty is null",
|
||||||
|
"c_unsigned_int in (1, 2, 3, 4, 5)", "c_unsigned_int_empty is null", "c_bigint=1111111111111111",
|
||||||
|
"c_bigint_empty is null", "c_unsigned_bigint in (1111111111111111)", "c_unsigned_bigint_empty is null",
|
||||||
|
"c_float=1.1", "c_float_empty is null", "c_double=1.11", "c_double_empty is null", "c_binary='aaaaaaaa'",
|
||||||
|
"c_binary_empty is null", "c_smallint=1", "c_smallint_empty is null", "c_smallint_unsigned=1",
|
||||||
|
"c_smallint_unsigned_empty is null", "c_tinyint=1", "c_tinyint_empty is null", "c_tinyint_unsigned=1",
|
||||||
|
"c_tinyint_unsigned_empty is null", "c_bool=True", "c_bool_empty is null", "c_nchar='ncharaa'",
|
||||||
|
"c_nchar_empty is null", "c_varchar='varcharaa'", "c_varchar_empty is null", "c_varbinary='0x7661726331'",
|
||||||
|
"c_varbinary_empty is null"],
|
||||||
|
"empty_filter_list": ["ts > now", "t_timestamp > now", "c_ts_empty is not null","t_timestamp_empty is not null",
|
||||||
|
"ts <= '1970-01-01 00:00:00'", "c_ts_empty < '1970-01-01 00:00:00'", "c_int <> 1", "c_int_empty is not null",
|
||||||
|
"t_int in (10, 11)", "t_int_empty is not null"]
|
||||||
|
}
|
||||||
|
for filter in filter_dic["all_filter_list"]:
|
||||||
|
tdLog.debug("Execute query with filter '%s'" % filter)
|
||||||
|
tdSql.query(f"select * from st where {filter};")
|
||||||
|
tdSql.checkRows(total_rows)
|
||||||
|
|
||||||
|
for filter in filter_dic["empty_filter_list"]:
|
||||||
|
tdLog.debug("Execute query with filter '%s'" % filter)
|
||||||
|
tdSql.query(f"select * from st where {filter};")
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
def test_query_with_groupby(self):
|
||||||
|
tdSql.query("select count(*) from st group by tbname;")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 0, 10000)
|
||||||
|
|
||||||
|
tdSql.query("select count(c_unsigned_int_empty + c_int_empty * c_float_empty - c_double_empty + c_smallint_empty / c_tinyint_empty) from st where c_int_empty is null group by tbname;")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 0, 0)
|
||||||
|
|
||||||
|
tdSql.query("select sum(t_unsigned_int_empty + t_int_empty * t_float_empty - t_double_empty + t_smallint_empty / t_tinyint_empty) from st where t_int_empty is null group by tbname;")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 0, None)
|
||||||
|
|
||||||
|
tdSql.query("select max(c_bigint_empty) from st group by tbname, t_bigint_empty, t_float_empty, t_double_empty;")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 0, None)
|
||||||
|
|
||||||
|
tdSql.query("select min(t_double) as v from st where c_nchar like '%aa%' and t_double is not null group by tbname, t_bigint_empty, t_float_empty, t_double_empty order by v limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1.11)
|
||||||
|
|
||||||
|
tdSql.query("select top(c_float, 1) as v from st where c_nchar like '%aa%' group by tbname order by v desc slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1.1)
|
||||||
|
|
||||||
|
tdSql.query("select first(ts) from st where c_varchar is not null partition by tbname order by ts slimit 1;")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
|
||||||
|
|
||||||
|
tdSql.query("select first(c_nchar_empty) from st group by tbname;")
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
tdSql.query("select first(ts), first(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
|
||||||
|
tdSql.checkData(0, 1, None)
|
||||||
|
|
||||||
|
tdSql.query("select first(c_nchar_empty) from st group by t_timestamp_empty order by t_timestamp;")
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
tdSql.query("select last(ts), last(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
|
||||||
|
tdSql.checkData(0, 1, None)
|
||||||
|
|
||||||
|
tdSql.query("select elapsed(ts, 1s) t from st where c_int = 1 and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 9999)
|
||||||
|
|
||||||
|
tdSql.query("select elapsed(ts, 1s) t from st where c_int_empty is not null and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
def test_query_with_join(self):
|
||||||
|
tdSql.query("select count(*) from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is not null;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 0)
|
||||||
|
|
||||||
|
tdSql.query("select count(t1.c_ts_empty) as v from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is null order by v desc;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 0)
|
||||||
|
|
||||||
|
tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int > t2.c_int;")
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int <= t2.c_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
tdSql.checkData(0, 1, 1076616672134475760)
|
||||||
|
|
||||||
|
tdSql.query("select count(t1.c_float_empty) from st t1, st t2 where t1.ts=t2.ts and t1.c_int = t2.c_int and t1.t_int_empty=t2.t_int_empty;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 0)
|
||||||
|
|
||||||
|
def test_query_with_window(self):
|
||||||
|
# time window
|
||||||
|
tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);")
|
||||||
|
tdSql.checkRows(841)
|
||||||
|
tdSql.checkData(0, 0, 10)
|
||||||
|
|
||||||
|
tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);")
|
||||||
|
tdSql.checkRows(65)
|
||||||
|
|
||||||
|
# status window
|
||||||
|
tdSql.error("select _wstart, count(*) from st state_window(t_bool);")
|
||||||
|
tdSql.query("select _wstart, count(*) from st partition by tbname state_window(c_bool);")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 1, 10000)
|
||||||
|
|
||||||
|
# session window
|
||||||
|
tdSql.query("select _wstart, count(*) from st partition by tbname, t_int session(ts, 1m);")
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0, 1, 10000)
|
||||||
|
|
||||||
|
# event window
|
||||||
|
tdSql.query("select _wstart, _wend, count(*) from (select * from st order by ts, tbname) event_window start with t_bool=true end with t_bool=false;")
|
||||||
|
tdSql.checkRows(20000)
|
||||||
|
|
||||||
|
def test_query_with_union(self):
|
||||||
|
tdSql.query("select count(ts) from (select * from ct1 union select * from ct2 union select * from ct3);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 10000)
|
||||||
|
|
||||||
|
tdSql.query("select count(ts) from (select * from ct1 union all select * from ct2 union all select * from ct3);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 30000)
|
||||||
|
|
||||||
|
tdSql.query("select count(*) from (select * from ct1 union select * from ct2 union select * from ct3);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 10000)
|
||||||
|
|
||||||
|
tdSql.query("select count(c_ts_empty) from (select * from ct1 union select * from ct2 union select * from ct3);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 0)
|
||||||
|
|
||||||
|
tdSql.query("select count(*) from (select ts from st union select c_ts_empty from st);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 10001)
|
||||||
|
|
||||||
|
tdSql.query("select count(*) from (select ts from st union all select c_ts_empty from st);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 100000)
|
||||||
|
|
||||||
|
tdSql.query("select count(ts) from (select ts from st union select c_ts_empty from st);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 10000)
|
||||||
|
|
||||||
|
tdSql.query("select count(ts) from (select ts from st union all select c_ts_empty from st);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 50000)
|
||||||
|
|
||||||
|
def test_nested_query(self):
|
||||||
|
tdSql.query("select elapsed(ts, 1s) from (select * from (select * from st where c_int = 1) where c_int_empty is null);")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 9999)
|
||||||
|
|
||||||
|
tdSql.query("select first(ts) as t, avg(c_int) as v from (select * from (select * from st where c_int = 1) where c_int_empty is null) group by t_timestamp order by t_timestamp desc slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2024-01-01 13:00:01.000')
|
||||||
|
tdSql.checkData(0, 1, 1)
|
||||||
|
|
||||||
|
tdSql.query("select max(c_tinyint) from (select c_tinyint, tbname from st where c_float_empty is null or t_int_empty is null) group by tbname order by c_tinyint desc slimit 1 limit 1;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
|
||||||
|
tdSql.query("select top(c_int, 3) from (select c_int, tbname from st where t_int in (2, 3)) group by tbname slimit 3;")
|
||||||
|
tdSql.checkRows(6)
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepareData()
|
||||||
|
self.test_query_with_filter()
|
||||||
|
self.test_query_with_groupby()
|
||||||
|
self.test_query_with_join()
|
||||||
|
self.test_query_with_window()
|
||||||
|
self.test_query_with_union()
|
||||||
|
self.test_nested_query()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue