diff --git a/include/common/systable.h b/include/common/systable.h index 37593144d8..f0f8ac8cf6 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -65,6 +65,11 @@ extern "C" { #define TSDB_PERFS_TABLE_TRANS "perf_trans" #define TSDB_PERFS_TABLE_APPS "perf_apps" +#define TSDB_AUDIT_DB "audit" +#define TSDB_AUDIT_STB_OPERATION "operations" +#define TSDB_AUDIT_CTB_OPERATION "t_operations_" +#define TSDB_AUDIT_CTB_OPERATION_LEN 13 + typedef struct SSysDbTableSchema { const char* name; const int32_t type; diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f23bb4d51b..04e9b5a380 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -232,7 +232,7 @@ struct SConfig *taosGetCfg(); void taosSetAllDebugFlag(int32_t flag); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal); void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); -int8_t taosGranted(); +int8_t taosGranted(int8_t type); #ifdef __cplusplus } diff --git a/include/common/tgrant.h b/include/common/tgrant.h index dbca2ac90c..2fa6dde8f6 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -30,6 +30,9 @@ extern "C" { #define GRANT_HEART_BEAT_MIN 2 #define GRANT_ACTIVE_CODE "activeCode" +#define GRANT_FLAG_ALL (0x01) +#define GRANT_FLAG_AUDIT (0x02) +#define GRANT_FLAG_VIEW (0x04) typedef enum { TSDB_GRANT_ALL, @@ -50,11 +53,13 @@ typedef enum { TSDB_GRANT_SUBSCRIPTION, TSDB_GRANT_AUDIT, TSDB_GRANT_CSV, + TSDB_GRANT_VIEW, TSDB_GRANT_MULTI_TIER, TSDB_GRANT_BACKUP_RESTORE, } EGrantType; -int32_t grantCheck(EGrantType grant); +int32_t grantCheck(EGrantType grant); // less +int32_t grantCheckLE(EGrantType grant); // less or equal char* tGetMachineId(); #ifndef TD_UNIQ_GRANT int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); @@ -69,7 +74,7 @@ int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, ch {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ - {.name = "state", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ + {.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ca09033f5d..0aea284fa5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3918,6 +3918,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 +#define SUBMIT_REQ_FROM_FILE 0x4 typedef struct { int32_t flags; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b34b998d76..1957bcbb24 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -358,7 +358,7 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes); -int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists); +int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists); int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index bb50208f67..e66ffe5521 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -211,6 +211,7 @@ typedef struct SStoreTqReader { bool (*tqNextBlockImpl)(); // todo remove it SSDataBlock* (*tqGetResultBlock)(); int64_t (*tqGetResultBlockTime)(); + int32_t (*tqGetStreamExecProgress)(); void (*tqReaderSetColIdList)(); int32_t (*tqReaderSetQueryTableList)(); @@ -266,16 +267,11 @@ typedef struct SStoreMeta { // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] int32_t (*getChildTableList)(void* pVnode, int64_t suid, SArray* list); int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); - void* storeGetVersionRange; - void* storeGetLastTimestamp; - - int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema + int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); - int64_t (*getNumOfRowsInMem)(void* pVnode); - SMCtbCursor* (*openCtbCursor)(void* pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 24fd79d3d7..7a1b655931 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -724,8 +724,10 @@ typedef struct SSubplan { SNode* pTagCond; SNode* pTagIndexCond; bool showRewrite; - int32_t rowsThreshold; + bool isView; + bool isAudit; bool dynamicRowThreshold; + int32_t rowsThreshold; } SSubplan; typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 6a41f4607b..2ac2c3ccbd 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -86,8 +86,10 @@ typedef struct SParseContext { bool enableSysInfo; bool async; bool hasInvisibleCol; - const char* svrVer; + bool isView; + bool isAudit; bool nodeOffline; + const char* svrVer; SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos int64_t allocatorId; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 1b523c0323..707d70b71b 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -32,6 +32,8 @@ typedef struct SPlanContext { bool streamQuery; bool rSmaQuery; bool showRewrite; + bool isView; + bool isAudit; int8_t triggerType; int64_t watermark; int64_t deleteMark; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index f79a0a0718..8d6cd6a3c0 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -66,7 +66,11 @@ typedef enum { #define QUERY_RSP_POLICY_QUICK 1 #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) -#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0) +#define QUERY_MSG_MASK_AUDIT() (1 << 1) +#define QUERY_MSG_MASK_VIEW() (1 << 2) +#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) +#define TEST_AUDIT_MASK(m) (((m) & QUERY_MSG_MASK_AUDIT()) != 0) +#define TEST_VIEW_MASK(m) (((m) & QUERY_MSG_MASK_VIEW()) != 0) typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -338,6 +342,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname)) +#define IS_AUDIT_DBNAME(_dbname) ((*(_dbname) == 'a') && (0 == strcmp(_dbname, TSDB_AUDIT_DB))) +#define IS_AUDIT_STB_NAME(_stbname) ((*(_stbname) == 'o') && (0 == strcmp(_stbname, TSDB_AUDIT_STB_OPERATION))) +#define IS_AUDIT_CTB_NAME(_ctbname) \ + ((*(_ctbname) == 't') && (0 == strncmp(_ctbname, TSDB_AUDIT_CTB_OPERATION, TSDB_AUDIT_CTB_OPERATION_LEN))) + #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index fed4081ccb..c603f9f5ac 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_STATE_H_ +#define _STREAM_STATE_H_ + #include "tdatablock.h" #include "rocksdb/c.h" @@ -20,9 +23,6 @@ #include "tsimplehash.h" #include "tstreamFileState.h" -#ifndef _STREAM_STATE_H_ -#define _STREAM_STATE_H_ - #ifdef __cplusplus extern "C" { #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 67ee4620b6..282433066e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#ifndef _STREAM_H_ +#define _STREAM_H_ + #include "os.h" #include "streamState.h" #include "tdatablock.h" @@ -26,9 +29,6 @@ extern "C" { #endif -#ifndef _STREAM_H_ -#define _STREAM_H_ - #define ONE_MiB_F (1048576.0) #define ONE_KiB_F (1024.0) #define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F) @@ -313,7 +313,7 @@ typedef struct SCheckpointInfo { int64_t failedId; // record the latest failed checkpoint id int64_t checkpointingId; int32_t downstreamAlignNum; - int32_t checkpointNotReadyTasks; + int32_t numOfNotReady; bool dispatchCheckpointTrigger; int64_t msgVer; int32_t transId; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 94fe80b901..c0c20a0fde 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -575,8 +575,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821) #define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822) #define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823) -#define TSDB_CODE_GRANT_CSV_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0824) -#define TSDB_CODE_GRANT_AUDIT_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0825) // sync // #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x diff --git a/include/util/tdef.h b/include/util/tdef.h index 4698d50e67..f136005026 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -287,6 +287,7 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_VALUE_LEN 256 #define TSDB_CLUSTER_VALUE_LEN 1000 +#define TSDB_GRANT_LOG_COL_LEN 15072 #define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_CONN_ACTIVE_KEY_LEN 255 diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 63a65d7c95..a146712cab 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -843,7 +843,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S view->version = htonl(view->version); } - tscDebug("hb got %d expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum); + tscDebug("hb got %u expired view, valueLen:%lu", viewNum, sizeof(SViewVersion) * viewNum); if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c1e1da617d..d3d8ee1dc1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1154,6 +1154,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), .pAstRoot = pQuery->pRoot, .showRewrite = pQuery->showRewrite, + .isView = pWrapper->pParseCtx->isView, + .isAudit = pWrapper->pParseCtx->isAudit, .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .pUser = pRequest->pTscObj->user, diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 77083d0425..47eac317ec 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -349,21 +349,21 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { }; static const SSysDbTableSchema useGrantsFullSchema[] = { - {.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "grant_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "display_name", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "expire", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "limits", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema useGrantsLogsSchema[] = { - {.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, - {.name = "machine", .bytes = 9088 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "state", .bytes = 1536 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "active", .bytes = 512 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "machine", .bytes = TSDB_GRANT_LOG_COL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema useMachinesSchema[] = { {.name = "id", .bytes = TSDB_CLUSTER_ID_LEN + 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "machine", .bytes = 6016 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "machine", .bytes = 7552 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysTableMeta infosMeta[] = { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 46c4d613fb..444a4c0ccc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1801,4 +1801,17 @@ void taosSetAllDebugFlag(int32_t flag) { if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist } -int8_t taosGranted() { return atomic_load_8(&tsGrant); } +int8_t taosGranted(int8_t type) { + switch (type) { + case TSDB_GRANT_ALL: + return atomic_load_8(&tsGrant) & GRANT_FLAG_ALL; + case TSDB_GRANT_AUDIT: + return atomic_load_8(&tsGrant) & GRANT_FLAG_AUDIT; + case TSDB_GRANT_VIEW: + return atomic_load_8(&tsGrant) & GRANT_FLAG_VIEW; + default: + ASSERTS(0, "undefined grant type:%" PRIi8, type); + break; + } + return 0; +} \ No newline at end of file diff --git a/source/common/src/tgrant.c b/source/common/src/tgrant.c index f212d71362..8e4fe9febb 100644 --- a/source/common/src/tgrant.c +++ b/source/common/src/tgrant.c @@ -19,5 +19,6 @@ #ifndef _GRANT int32_t grantCheck(EGrantType grant) {return TSDB_CODE_SUCCESS;} +int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;} #endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index bedd72759d..554205decb 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -556,7 +556,7 @@ typedef struct { } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 372612274f..4d1125a340 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -124,6 +124,7 @@ SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); +void mndInitExecInfo(); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 1a55a161bf..f2b279276e 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -409,7 +409,7 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) { } { // audit - auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, cfgReq.sqlLen); + auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1)); } _exit: tFreeSMCfgClusterReq(&cfgReq); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3ce548a4f6..753076f1f3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * goto FAILED; } - if ((terrno = grantCheck(TSDB_GRANT_SUBSCRIPTION)) < 0) { + if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) { code = terrno; goto FAILED; } @@ -240,9 +240,10 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR } STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1); strcpy(data->topic, topic); - if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || grantCheck(TSDB_GRANT_SUBSCRIPTION) < 0) { + if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || + grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) { data->noPrivilege = 1; - } else{ + } else { data->noPrivilege = 0; } mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index 0b85f8fd5a..2ec6e09d12 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -79,11 +79,6 @@ char *tGetMachineId() { return NULL; }; int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; } -#else -#ifndef TD_UNIQ_GRANT -char *tGetMachineId() { return NULL; }; -int32_t mndProcessConfigGrantReq(SMnode *pMnode, SRpcMsg *pReq, SMCfgClusterReq *pCfg) { return 0; } -#endif #endif void mndGenerateMachineCode() { grantParseParameter(); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fc460ac67c..0a78914011 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,8 +62,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -121,17 +119,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); - taosThreadMutexInit(&execInfo.lock, NULL); - _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); - - execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); - execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); - execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); - - taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); - taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); + mndInitExecInfo(); if (sdbSetTable(pMnode->pSdb, table) != 0) { return -1; @@ -1628,7 +1616,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ terrno = TSDB_CODE_GRANT_EXPIRED; return -1; } @@ -2117,16 +2105,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -void freeCheckpointCandEntry(void *param) { - SCheckpointCandEntry *pEntry = param; - taosMemoryFreeClear(pEntry->pName); -} - -void freeTaskList(void* param) { - SArray** pList = (SArray **)param; - taosArrayDestroy(*pList); -} - static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for(int32_t i = 0; i < num; ++i) { @@ -2202,4 +2180,4 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 4426ab0672..97474fa851 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); - if(grantCheck(TSDB_GRANT_STREAMS) < 0){ + if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ if(suspendAllStreams(pMnode, &pReq->info) < 0){ return -1; } @@ -316,16 +316,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (taosArrayGetSize(pFailedTasks) > 0) { - bool allReady = true; - SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); - taosArrayDestroy(p); + bool allReady = true; + if (pMnode != NULL) { + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + } else { + allReady = false; + } if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); + pInfo->checkpointId, pInfo->transId); mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 235c604b27..3cabce2201 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -543,3 +543,27 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * taosWUnLockLatch(&pStream->lock); return 0; } + +static void freeCheckpointCandEntry(void *param) { + SCheckpointCandEntry *pEntry = param; + taosMemoryFreeClear(pEntry->pName); +} + +static void freeTaskList(void* param) { + SArray** pList = (SArray **)param; + taosArrayDestroy(*pList); +} + +void mndInitExecInfo() { + taosThreadMutexInit(&execInfo.lock, NULL); + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); + execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pWaitingList = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + + taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry); + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); +} diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index a002b20bde..bc5b5125f1 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -4,7 +4,7 @@ add_subdirectory(acct) #add_subdirectory(db) #add_subdirectory(dnode) add_subdirectory(func) -#add_subdirectory(mnode) +add_subdirectory(stream) add_subdirectory(profile) add_subdirectory(qnode) add_subdirectory(sdb) diff --git a/source/dnode/mnode/impl/test/stream/CMakeLists.txt b/source/dnode/mnode/impl/test/stream/CMakeLists.txt new file mode 100644 index 0000000000..b1bb62735f --- /dev/null +++ b/source/dnode/mnode/impl/test/stream/CMakeLists.txt @@ -0,0 +1,13 @@ +SET(CMAKE_CXX_STANDARD 11) + +aux_source_directory(. MNODE_STREAM_TEST_SRC) +add_executable(streamTest ${MNODE_STREAM_TEST_SRC}) +target_link_libraries( + streamTest + PRIVATE dnode gtest +) + +add_test( + NAME streamTest + COMMAND streamTest +) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp new file mode 100644 index 0000000000..a3babad80c --- /dev/null +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include +#include +#include "../../inc/mndStream.h" + +namespace { +SRpcMsg buildHbReq() { + SStreamHbMsg msg = {0}; + msg.vgId = 1; + msg.numOfTasks = 5; + msg.pTaskStatus = taosArrayInit(4, sizeof(STaskStatusEntry)); + + for (int32_t i = 0; i < 4; ++i) { + STaskStatusEntry entry = {0}; + entry.nodeId = i + 1; + entry.stage = 1; + entry.id.taskId = i + 1; + entry.id.streamId = 999; + + if (i == 0) { + entry.stage = 4; + } + + taosArrayPush(msg.pTaskStatus, &entry); + } + + // (p->checkpointId != 0) && p->checkpointFailed + // add failed checkpoint info + { + STaskStatusEntry entry = {0}; + entry.nodeId = 5; + entry.stage = 1; + + entry.id.taskId = 5; + entry.id.streamId = 999; + + entry.checkpointId = 1; + entry.checkpointFailed = true; + + taosArrayPush(msg.pTaskStatus, &entry); + } + + int32_t tlen = 0; + int32_t code = 0; + SEncoder encoder; + void* buf = NULL; + SRpcMsg msg1 = {0}; + msg1.info.noResp = 1; + + tEncodeSize(tEncodeStreamHbMsg, &msg, tlen, code); + if (code < 0) { + goto _end; + } + + buf = rpcMallocCont(tlen); + if (buf == NULL) { + goto _end; + } + + tEncoderInit(&encoder, (uint8_t*)buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) { + rpcFreeCont(buf); + goto _end; + } + tEncoderClear(&encoder); + + initRpcMsg(&msg1, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + + taosArrayDestroy(msg.pTaskStatus); + return msg1; + +_end: + return msg1; +} + +void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskId) { + SStreamExecInfo* pExecNode = &execInfo; + + pTask->id.streamId = streamId; + pTask->id.taskId = taskId; + pTask->info.nodeId = nodeId; + + STaskId id; + id.streamId = pTask->id.streamId; + id.taskId = pTask->id.taskId; + + STaskStatusEntry entry; + streamTaskStatusInit(&entry, pTask); + + entry.stage = 1; + entry.status = TASK_STATUS__READY; + + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); +} +void initStreamExecInfo() { + SStreamExecInfo* pExecNode = &execInfo; + + SStreamTask task = {0}; + setTask(&task, 1, 999, 1); + setTask(&task, 1, 999, 2); + setTask(&task, 1, 999, 3); + setTask(&task, 1, 999, 4); + setTask(&task, 2, 999, 5); +} + +void initNodeInfo() { + execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); + SNodeEntry entry = {0}; + entry.nodeId = 2; + entry.stageUpdated = true; + taosArrayPush(execInfo.pNodeList, &entry); +} +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(mndHbTest, handle_error_in_hb) { + mndInitExecInfo(); + initStreamExecInfo(); + initNodeInfo(); + + SRpcMsg msg = buildHbReq(); + int32_t code = mndProcessStreamHb(&msg); + + rpcFreeCont(msg.pCont); +} + +#pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 97cf0ffebc..3c334be2f2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -90,6 +90,8 @@ int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); +int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); + void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad); @@ -180,7 +182,6 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); -int32_t tsdbGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); size_t tsdbCacheGetCapacity(SVnode *pVnode); @@ -233,6 +234,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); +int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cded4ddd7c..475a26aff5 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -97,7 +97,6 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - int64_t walLogLastVer; SRWLatch lock; SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle @@ -153,14 +152,14 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tq util -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); -int32_t setDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, const char* id); +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, const char* id); int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d8d5013fa..cac3be9ee3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -279,6 +279,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); +int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbMerge.c ============================================================================================== typedef struct { @@ -970,8 +971,6 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { return pIter->pRow; } -int32_t tRowInfoCmprFn(const void *p1, const void *p2); - typedef struct { int64_t suid; int64_t uid; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 138bcbb133..621651507e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1554,7 +1554,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA } _resume_delete: version = RSMA_EXEC_MSG_VER(msg); - if ((terrno = extractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, + if ((terrno = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, &packData.pDataBlock, 1))) { taosFreeQitem(msg); goto _err; diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index f537ede8c1..767ea47e21 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -203,7 +203,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t *index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -213,7 +213,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t size = (int32_t)taosArrayGetSize(pReq->aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, ""); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0d04304f97..7d02d4cc17 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -66,7 +66,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->path = taosStrdup(path); pTq->pVnode = pVnode; - pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle); @@ -1057,7 +1056,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); // let's continue scan data in the wal files - if(code == 0 && pReq->reqType >= 0){ + if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { tqScanWalAsync(pTq, false); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 383a636f71..8392f4c479 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -344,7 +344,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - code = extractDelDataBlock(pBody, len, ver, (void**)pItem, 0); + code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7fcb86d84a..7050870c57 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -746,7 +746,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return TDB_CODE_SUCCESS; } -int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData, const char* id) { int32_t numOfRows = pDataBlock->info.rows; @@ -821,7 +821,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -868,7 +868,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { continue; } - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } @@ -878,7 +878,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); } else { - code = setDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d18455d221..b9f578a74b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -388,7 +388,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { SDecoder* pCoder = &(SDecoder){0}; SDeleteRes* pRes = &(SDeleteRes){0}; @@ -449,3 +449,73 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** return TSDB_CODE_SUCCESS; } + +int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) { + SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t code = TSDB_CODE_SUCCESS; + + if (pDelay != NULL) { + *pDelay = 0; + } + + *fhFinished = false; + + if (numOfTasks <= 0) { + return code; + } + + // extract the required source task for a given stream, identified by streamId + for (int32_t i = 0; i < numOfTasks; ++i) { + STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + if (pId->streamId != streamId) { + continue; + } + + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + if (ppTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); + continue; + } + + if ((*ppTask)->info.taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + // here we get the required stream source task + SStreamTask* pTask = *ppTask; + *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask); + + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + + SVersionRange verRange = {0}; + walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer); + + SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0); + if (pReader == NULL) { + tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId); + continue; + } + + int64_t cur = 0; + int64_t latest = 0; + + code = walFetchHead(pReader, ver); + if (code != TSDB_CODE_SUCCESS) { + cur = pReader->pHead->head.ingestTs; + } + + code = walFetchHead(pReader, verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + latest = pReader->pHead->head.ingestTs; + } + + if (pDelay != NULL) { // delay in ms + *pDelay = (latest - cur) / 1000; + } + + walCloseReader(pReader); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9d158668d2..86f58717e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2628,6 +2628,58 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { return (pStatus->pProcMemTableIter != NULL); } +static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) { + SReaderStatus* pStatus = &pReader->status; + SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; + + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + + SDataBlockInfo* pInfo = &pResBlock->info; + blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); + + pInfo->rows = pScanInfo->numOfRowsInStt; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 1; + pInfo->window = pScanInfo->sttWindow; + + setComposedBlockFlag(pReader, true); + + pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; + pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; + pScanInfo->sttBlockReturned = true; + + pSttBlockReader->mergeTree.pIter = NULL; + + tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", + pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pResBlock->info.rows, pReader->idStr); +} + +static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, + SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) { + // whole block is required, return it directly + SReaderStatus* pStatus = &pReader->status; + SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + + pInfo->rows = pBlockInfo->numRow; + pInfo->id.uid = pScanInfo->uid; + pInfo->dataLoad = 0; + pInfo->version = pReader->info.verRange.maxVer; + pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; + setComposedBlockFlag(pReader, false); + setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); + + // update the last key for the corresponding table + pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; + tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " + "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", + pReader, pScanInfo->uid, blockIndex, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, pBlockInfo->firstKey, + pBlockInfo->lastKey, pReader->idStr); +} + static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; @@ -2680,28 +2732,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { // if only require the total rows, no need to load data from stt file if it is clean stt blocks if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) { - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - - SDataBlockInfo* pInfo = &pResBlock->info; - blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); - - pInfo->rows = pScanInfo->numOfRowsInStt; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 1; - pInfo->window = pScanInfo->sttWindow; - - setComposedBlockFlag(pReader, true); - - pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; - pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; - pScanInfo->sttBlockReturned = true; - - pSttBlockReader->mergeTree.pIter = NULL; - - tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", - pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, - pResBlock->info.rows, pReader->idStr); + buildCleanBlockFromSttFiles(pReader, pScanInfo); return TSDB_CODE_SUCCESS; } @@ -2741,10 +2772,11 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { } } -static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { +// current active data block not overlap with the stt-files/stt-blocks +static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); - if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { + if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) { return true; } else { int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; @@ -2794,24 +2826,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) { - // whole block is required, return it directly - SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; - pInfo->rows = pBlockInfo->numRow; - pInfo->id.uid = pScanInfo->uid; - pInfo->dataLoad = 0; - pInfo->version = pReader->info.verRange.maxVer; - pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; - setComposedBlockFlag(pReader, false); - setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); + if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) { + int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; - // update the last key for the corresponding table - pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 - " clean file block retrieved from file, global index:%d, " - "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", - pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, - pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); + if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) || + (!asc && pBlockInfo->firstKey > keyInStt)) { + if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) { + if (asc) { // file block is located before the stt block + ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + } else { // stt block is before the file block + ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + } + } + + buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index); + } else { // clean stt block + if (asc) { + ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey); + } else { + ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey); + } + + // return the stt file block + ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL); + buildCleanBlockFromSttFiles(pReader, pScanInfo); + return TSDB_CODE_SUCCESS; + } } else { SBlockData* pBData = &pReader->status.fileBlockData; tBlockDataReset(pBData); @@ -2822,7 +2862,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - pScanInfo->cleanSttBlocks = false; initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. @@ -2840,8 +2879,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // data in stt now overlaps with current active file data block, need to composed with file data block. int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader); - if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || - (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) { + if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) { tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); break; @@ -4995,9 +5033,9 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { return rows; } -int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { +int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; - metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -5027,7 +5065,7 @@ int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_ metaReaderClear(&mr); // get the newest table schema version - code = metaGetTbTSchemaEx(((SVnode*)pVnode)->pMeta, *suid, uid, -1, pSchema); + code = metaGetTbTSchemaEx(pMeta, *suid, uid, -1, pSchema); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index a160cd26e0..aff7e4642b 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -91,7 +91,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableTypeByName = metaGetTableTypeByName; pMeta->getTableNameByUid = metaGetTableNameByUid; - pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor + pMeta->getTableSchema = vnodeGetTableSchema; pMeta->storeGetTableList = vnodeGetTableList; pMeta->getCachedTableList = metaGetCachedTableUidList; @@ -135,7 +135,9 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; pTq->tqGetResultBlockTime = tqGetResultBlockTime; -} + + pTq->tqGetStreamExecProgress = tqGetStreamExecInfo; + } void initStateStoreAPI(SStateStore* pStore) { pStore->streamFileStateInit = streamFileStateInit; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index b6a9360afd..4fc7a88494 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -14,6 +14,7 @@ */ #include "vnd.h" +#include "tsdb.h" #define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ do { \ @@ -703,3 +704,7 @@ void *vnodeGetIvtIdx(void *pVnode) { } return metaGetIvtIdx(((SVnode *)pVnode)->pMeta); } + +int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) { + return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid); +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3ec6adee41..e4d7b11176 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -238,6 +238,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int TSDB_CHECK_CODE(code, lino, _exit); } + if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) { + code = grantCheck(TSDB_GRANT_CSV); + TSDB_CHECK_CODE(code, lino, _exit); + } + int64_t uid; if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid); diff --git a/source/libs/catalog/src/ctgRent.c b/source/libs/catalog/src/ctgRent.c index 457285b147..a19eb19c02 100755 --- a/source/libs/catalog/src/ctgRent.c +++ b/source/libs/catalog/src/ctgRent.c @@ -300,7 +300,3 @@ int32_t ctgUpdateRentViewVersion(SCatalog *pCtg, char *dbFName, char *viewName, return TSDB_CODE_SUCCESS; } - - - - diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index e91afb2e38..ffed6a7788 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -66,7 +66,7 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pBlock->info.rows; i++) { - // 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 + // todo(liuyao) 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 // 2.计算 当前需要合并的行数 // 3.做聚集计算。 // 4.达到行数,将结果存入pInfo->res中。 diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 3cfd0ab582..2cba6e3241 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -220,7 +220,6 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi (*pResult)->win = *win; - clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -262,6 +261,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p } else if (pInfo->groupId != gid) { // this is a new group, reset the info pInfo->inWindow = false; + pInfo->groupId = gid; } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; @@ -319,6 +319,9 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid); pInfo->inWindow = true; startIndex = rowIndex; + if (pInfo->pRow != NULL) { + clearResultRowInitFlag(pSup->pCtx, pSup->numOfExprs); + } break; } } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 4c4e83fced..bf5d1b2019 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -541,6 +541,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT == type) { pOptr = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) { + pOptr = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; pTaskInfo->code = terrno; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c056f33504..8747082c08 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1284,7 +1284,7 @@ static bool isSlidingWindow(SStreamScanInfo* pInfo) { } static bool isCountSlidingWindow(SStreamScanInfo* pInfo) { - return pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding; + return pInfo->windowSup.pStreamAggSup && (pInfo->windowSup.pStreamAggSup->windowCount != pInfo->windowSup.pStreamAggSup->windowSliding); } static bool isCountWindow(SStreamScanInfo* pInfo) { diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 4a1a6aa2ad..1045480e7e 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -220,7 +220,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; - blockDataEnsureCapacity(pAggSup->pScanBlock, rows); + blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2); SStreamStateCur* pCur = NULL; COUNT_TYPE slidingRows = 0; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 8aca76597b..4a2fe2416f 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); @@ -725,7 +726,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys } if (pInfo->isHistoryOp) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pAllUpdated = tSimpleHashInit(64, hashFn); } else { pInfo->pAllUpdated = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 93f8aff377..6b689264b5 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); @@ -2286,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { SResultWindowInfo* pWinInfo = pIte; + if (!pWinInfo->pStatePos->beUpdated) { + continue; + } + pWinInfo->pStatePos->beUpdated = false; saveResult(*pWinInfo, pStUpdated); } return TSDB_CODE_SUCCESS; @@ -3425,6 +3430,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 00cd82847d..8c27515542 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3330,6 +3330,8 @@ static const char* jkSubplanTagIndexCond = "TagIndexCond"; static const char* jkSubplanShowRewrite = "ShowRewrite"; static const char* jkSubplanRowsThreshold = "RowThreshold"; static const char* jkSubplanDynamicRowsThreshold = "DyRowThreshold"; +static const char* jkSubplanIsView = "IsView"; +static const char* jkSubplanIsAudit = "IsAudit"; static int32_t subplanToJson(const void* pObj, SJson* pJson) { const SSubplan* pNode = (const SSubplan*)pObj; @@ -3368,6 +3370,12 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSubplanIsView, pNode->isView); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSubplanIsAudit, pNode->isAudit); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold); } @@ -3415,6 +3423,12 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSubplanIsView, &pNode->isView); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSubplanIsAudit, &pNode->isAudit); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index ad0cb6a169..9fa63df667 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3970,6 +3970,12 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->isView); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->isAudit); + } return code; } @@ -4025,7 +4031,12 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->isView); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->isAudit); + } return code; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 512dfdaef2..7d10d1f2df 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2144,13 +2144,15 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } - int32_t numOfRows = 0; int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows); if (TSDB_CODE_SUCCESS == code) { pStmt->totalRowsNum += numOfRows; pStmt->totalTbNum += 1; TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT); + if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) { + rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE; + } if (!pStmt->fileProcessing) { taosCloseFile(&pStmt->fp); } else { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 02aa2a8b80..63891fc78b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3165,6 +3165,19 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl return TSDB_CODE_SUCCESS; } +static int32_t translateAudit(STranslateContext* pCxt, SRealTableNode* pRealTable, SName* pName) { + if (pRealTable->pMeta->tableType == TSDB_SUPER_TABLE) { + if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_STB_NAME(pName->tname)) { + pCxt->pParseCxt->isAudit = true; + } + } else if (pRealTable->pMeta->tableType == TSDB_CHILD_TABLE) { + if (IS_AUDIT_DBNAME(pName->dbname) && IS_AUDIT_CTB_NAME(pName->tname)) { + pCxt->pParseCxt->isAudit = true; + } + } + return 0; +} + int32_t translateTable(STranslateContext* pCxt, SNode** pTable) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(*pTable)) { @@ -3184,7 +3197,8 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable) { if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType) { return translateView(pCxt, pTable, &name); } -#endif + translateAudit(pCxt, pRealTable, &name); +#endif code = setTableVgroupList(pCxt, &name, pRealTable); if (TSDB_CODE_SUCCESS == code) { code = setTableIndex(pCxt, &name, pRealTable); @@ -8181,27 +8195,27 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias)); tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName)); - SNodeList* pParamterList = nodesMakeList(); - if (NULL == pParamterList) { + SNodeList* pParameterList = nodesMakeList(); + if (NULL == pParameterList) { nodesDestroyNode((SNode*)col); return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = nodesListStrictAppend(pParamterList, (SNode*)col); + int32_t code = nodesListStrictAppend(pParameterList, (SNode*)col); if (code) { - nodesDestroyList(pParamterList); + nodesDestroyList(pParameterList); return code; } - SNode* pFunc = (SNode*)createFunction("last", pParamterList); + SNode* pFunc = (SNode*)createFunction("last", pParameterList); if (NULL == pFunc) { - nodesDestroyList(pParamterList); + nodesDestroyList(pParameterList); return TSDB_CODE_OUT_OF_MEMORY; } SNodeList* pProjectionList = nodesMakeList(); if (NULL == pProjectionList) { - nodesDestroyList(pParamterList); + nodesDestroyNode(pFunc); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8213,7 +8227,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta SFunctionNode* pFunc1 = createFunction("_vgid", NULL); if (NULL == pFunc1) { - nodesDestroyList(pParamterList); + nodesDestroyList(pProjectionList); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8226,7 +8240,7 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta SFunctionNode* pFunc2 = createFunction("_vgver", NULL); if (NULL == pFunc2) { - nodesDestroyList(pParamterList); + nodesDestroyList(pProjectionList); return TSDB_CODE_OUT_OF_MEMORY; } @@ -8243,25 +8257,54 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta return code; } - // todo add the group by statement SSelectStmt** pSelect1 = (SSelectStmt**)pQuery; (*pSelect1)->pGroupByList = nodesMakeList(); + if (NULL == (*pSelect1)->pGroupByList) { + return TSDB_CODE_OUT_OF_MEMORY; + } SGroupingSetNode* pNode1 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pNode1) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNode1->groupingSetType = GP_TYPE_NORMAL; pNode1->pParameterList = nodesMakeList(); - nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1); + if (NULL == pNode1->pParameterList) { + nodesDestroyNode((SNode*)pNode1); + return TSDB_CODE_OUT_OF_MEMORY; + } - nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode1); + code = nodesListAppend(pNode1->pParameterList, (SNode*)pFunc1); + if (code) { + nodesDestroyNode((SNode*)pNode1); + return code; + } + + code = nodesListAppend((*pSelect1)->pGroupByList, nodesCloneNode((const SNode*)pNode1)); + if (code) { + return code; + } SGroupingSetNode* pNode2 = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pNode2) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNode2->groupingSetType = GP_TYPE_NORMAL; pNode2->pParameterList = nodesMakeList(); - nodesListAppend(pNode2->pParameterList, (SNode*)pFunc2); + if (NULL == pNode2->pParameterList) { + nodesDestroyNode((SNode*)pNode2); + return TSDB_CODE_OUT_OF_MEMORY; + } - nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2); + code = nodesListAppend(pNode2->pParameterList, nodesCloneNode((const SNode*)pFunc2)); + if (code) { + nodesDestroyNode((SNode*)pNode2); + return code; + } - return code; + return nodesListAppend((*pSelect1)->pGroupByList, (SNode*)pNode2); } static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 6cd2b0f972..11f51fec83 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -2187,6 +2187,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl pSubplan->level = pLogicSubplan->level; pSubplan->rowsThreshold = 4096; pSubplan->dynamicRowThreshold = false; + pSubplan->isView = pCxt->pPlanCxt->isView; + pSubplan->isAudit = pCxt->pPlanCxt->isAudit; if (NULL != pCxt->pPlanCxt->pUser) { snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser); } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 9a1c309ab0..66ec460861 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -360,10 +360,24 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg.msgMask)) && !taosGranted()) { - QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask); - tFreeSSubQueryMsg(&msg); - QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + if (chkGrant) { + if ((!TEST_SHOW_REWRITE_MASK(msg.msgMask))) { + if (!taosGranted(TSDB_GRANT_ALL)) { + QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask); + tFreeSSubQueryMsg(&msg); + QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + } + if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) { + QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask); + tFreeSSubQueryMsg(&msg); + QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + } + if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) { + QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask); + tFreeSSubQueryMsg(&msg); + QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + } + } } uint64_t sId = msg.sId; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5246eab115..594bb205d8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -737,6 +737,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } +#if 0 + SReadHandle* pReadHandle = qwMsg->node; + int64_t delay = 0; + bool fhFinish = false; + pReadHandle->api.tqReaderFn.tqGetStreamExecProgress(pReadHandle->vnode, 0, &delay, &fhFinish); +#endif + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH); sql = NULL; if (code) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 5c67c7974f..1c0b31109e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1109,6 +1109,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.refId = pJob->refId; qMsg.execId = pTask->execId; qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0; + qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0; + qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0; qMsg.taskType = TASK_TYPE_TEMP; qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob); qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2e576f7ec7..4614cb0cee 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -158,7 +158,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->chkInfo.transId = pReq->transId; pTask->chkInfo.checkpointingId = pReq->checkpointId; - pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -214,7 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - atomic_add_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); + atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1); streamProcessCheckpointReadyMsg(pTask); streamFreeQitem((SStreamQueueItem*)pBlock); } @@ -249,7 +249,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure - pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask); // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks @@ -268,7 +268,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task - int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.checkpointNotReadyTasks, 1); + int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.numOfNotReady, 1); ASSERT(notReady >= 0); if (notReady == 0) { @@ -287,7 +287,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id pTask->chkInfo.failedId = 0; pTask->chkInfo.startTs = 0; // clear the recorded start time - pTask->chkInfo.checkpointNotReadyTasks = 0; + pTask->chkInfo.numOfNotReady = 0; pTask->chkInfo.transId = 0; pTask->chkInfo.dispatchCheckpointTrigger = false; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 78929c365e..0936d410bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -155,14 +155,14 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; // no available token in bucket for sink task, let's wait for a little bit - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); return TSDB_CODE_SUCCESS; } while (1) { if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { - stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); + stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 9d5a594f46..69a4a6b5a3 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -103,7 +103,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; - memcpy(pNewPos->pRowBuff, p, *pVLen); + if(p) { + memcpy(pNewPos->pRowBuff, p, *pVLen); + } else { + int32_t len = getRowStateRowSize(pFileState); + memset(pNewPos->pRowBuff, 0, len); + } taosMemoryFree(p); return pNewPos; } @@ -727,21 +732,30 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C int32_t size = taosArrayGetSize(pWinStates); if (size == 0) { - void* pFileStore = getStateFileStore(pFileState); - void* p = NULL; - + void* pFileStore = getStateFileStore(pFileState); + void* pRockVal = NULL; SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); - if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { - (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); - code = code_file; - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); + if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); + if (code == TSDB_CODE_SUCCESS) { + int32_t valSize = *pVLen; + COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); + if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { + (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + streamStateFreeCur(pCur); + goto _end; + } + } + pWinKey->win.skey = startTs; + pWinKey->win.ekey = endTs; + (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); + taosMemoryFree(pRockVal); + streamStateFreeCur(pCur); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); code = TSDB_CODE_FAILED; - taosMemoryFree(p); } - streamStateFreeCur(pCur); goto _end; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 3854e90901..d491b00e73 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -70,10 +70,9 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64, + wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); - if (fetchVer > appliedVer){ + if (fetchVer > appliedVer) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } @@ -86,10 +85,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { - if (walFetchBody(pReader) < 0) { - return -1; - } - return 0; + int32_t code = walFetchBody(pReader); + return (code == TSDB_CODE_SUCCESS)? 0:-1; } else { if (walSkipFetchBody(pReader) < 0) { return -1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 341d989f8f..9783705bad 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -498,7 +498,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; - pWal->writeHead.head.ingestTs = 0; + pWal->writeHead.head.ingestTs = taosGetTimestampUs(); // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index dda4b14901..7d1cc746ff 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -56,6 +56,8 @@ int32_t taosGetAppName(char* name, int32_t* len) { char* end = strrchr(filepath, TD_DIRSEP[0]); if (end == NULL) { end = filepath; + } else { + end += 1; } tstrncpy(name, end, TSDB_APP_NAME_LEN); diff --git a/source/util/src/tbase64.c b/source/util/src/tbase64.c index f6f12fef97..a2f4ddbc51 100644 --- a/source/util/src/tbase64.c +++ b/source/util/src/tbase64.c @@ -15,8 +15,6 @@ #define _DEFAULT_SOURCE #include "tbase64.h" -#include -#include static char basis_64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 3cc00ddc7f..656e2706f2 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -977,6 +977,7 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co } #endif +#ifdef BUILD_NO_CALL /************************************************************************* * STREAM COMPRESSION *************************************************************************/ @@ -2120,7 +2121,7 @@ int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData) { return DATA_TYPE_INFO[pCmprsor->type].cmprFn(pCmprsor, pData, nData); } - +#endif /************************************************************************* * REGULAR COMPRESSION *************************************************************************/ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9fcca86744..7510b89736 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -462,8 +462,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mism TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_LIMITED, "Csv has reached the licensed upper limit") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_LIMITED, "Audit has reached the licensed upper limit") // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") diff --git a/tests/army/community/cluster/snapshot.json b/tests/army/community/cluster/snapshot.json index d4f6f00d37..4855c23260 100644 --- a/tests/army/community/cluster/snapshot.json +++ b/tests/army/community/cluster/snapshot.json @@ -35,8 +35,8 @@ "start_timestamp":"now-12d", "columns": [ { "type": "bool", "name": "bc"}, - { "type": "float", "name": "fc" }, - { "type": "double", "name": "dc"}, + { "type": "float", "name": "fc", "min": 100, "max": 100}, + { "type": "double", "name": "dc", "min": 200, "max": 200}, { "type": "tinyint", "name": "ti"}, { "type": "smallint", "name": "si" }, { "type": "int", "name": "ic" }, diff --git a/tests/army/community/cluster/snapshot.py b/tests/army/community/cluster/snapshot.py index 5b5457be75..b4c4d3c4c8 100644 --- a/tests/army/community/cluster/snapshot.py +++ b/tests/army/community/cluster/snapshot.py @@ -29,7 +29,11 @@ from frame import * class TDTestCase(TBase): updatecfgDict = { - "countAlwaysReturnValue" : "0" + "countAlwaysReturnValue" : "0", + "lossyColumns" : "float,double", + "fPrecision" : "0.000000001", + "dPrecision" : "0.00000000000000001", + "ifAdtFse" : "1" } def insertData(self): @@ -48,6 +52,18 @@ class TDTestCase(TBase): sql = f"create table {self.db}.ta(ts timestamp, age int) tags(area int)" tdSql.execute(sql) + def checkFloatDouble(self): + sql = f"select * from {self.db}.{self.stb} where fc!=100" + tdSql.query(sql) + tdSql.checkRows(0) + sql = f"select count(*) from {self.db}.{self.stb} where dc!=200" + tdSql.query(sql) + tdSql.checkRows(0) + sql = f"select avg(fc) from {self.db}.{self.stb}" + tdSql.checkFirstValue(sql, 100) + sql = f"select avg(dc) from {self.db}.{self.stb}" + tdSql.checkFirstValue(sql, 200) + def doAction(self): tdLog.info(f"do action.") self.flushDb() @@ -85,6 +101,9 @@ class TDTestCase(TBase): # check insert data correct self.checkInsertCorrect() + # check float double value ok + self.checkFloatDouble() + # save self.snapshotAgg() @@ -97,6 +116,10 @@ class TDTestCase(TBase): # check insert correct again self.checkInsertCorrect() + # check float double value ok + self.checkFloatDouble() + + tdLog.success(f"{__file__} successfully executed") diff --git a/tests/army/community/storage/oneStageComp.json b/tests/army/community/storage/oneStageComp.json new file mode 100644 index 0000000000..f64fda3824 --- /dev/null +++ b/tests/army/community/storage/oneStageComp.json @@ -0,0 +1,66 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 3000, + "prepared_rand": 3000, + "thread_count": 2, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 2, + "replica": 3, + "wal_retention_period": 10, + "wal_retention_size": 100, + "keep": "60d,120d,365d", + "stt_trigger": 1, + "wal_level": 2, + "WAL_FSYNC_PERIOD": 3300, + "cachemodel": "'last_value'", + "TABLE_PREFIX":1, + "comp": 1 + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "no", + "childtable_count": 10, + "insert_rows": 100000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 1000, + "start_timestamp":"now-360d", + "columns": [ + { "type": "bool", "name": "bc","max": 1,"min": 1}, + { "type": "float", "name": "fc" ,"max": 101,"min": 101}, + { "type": "double", "name": "dc" ,"max": 102,"min": 102}, + { "type": "tinyint", "name": "ti" ,"max": 103,"min": 103}, + { "type": "smallint", "name": "si" ,"max": 104,"min": 104}, + { "type": "int", "name": "ic" ,"max": 105,"min": 105}, + { "type": "bigint", "name": "bi" ,"max": 106,"min": 106}, + { "type": "utinyint", "name": "uti","max": 107,"min": 107}, + { "type": "usmallint", "name": "usi","max": 108,"min": 108}, + { "type": "uint", "name": "ui" ,"max": 109,"min": 109}, + { "type": "ubigint", "name": "ubi","max": 110,"min": 110}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 32} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 100,"min": 100}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/community/storage/oneStageComp.py b/tests/army/community/storage/oneStageComp.py new file mode 100644 index 0000000000..9a2c7cfcd6 --- /dev/null +++ b/tests/army/community/storage/oneStageComp.py @@ -0,0 +1,140 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool + + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * + + +class TDTestCase(TBase): + updatecfgDict = { + "compressMsgSize" : "100", + } + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + jfile = etool.curFile(__file__, "oneStageComp.json") + etool.benchMark(json=jfile) + + tdSql.execute(f"use {self.db}") + # set insert data information + self.childtable_count = 10 + self.insert_rows = 100000 + self.timestamp_step = 1000 + + + + def checkColValueCorrect(self): + tdLog.info(f"do action.") + self.flushDb() + + # check all columns correct + cnt = self.insert_rows * self.childtable_count + sql = "select * from stb where bc!=1" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where fc=101" + tdSql.query(sql) + tdSql.checkRows(cnt) + sql = "select * from stb where dc!=102" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ti!=103" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where si!=104" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ic!=105" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where bi!=106" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where uti!=107" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where usi!=108" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ui!=109" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where ubi!=110" + tdSql.query(sql) + tdSql.checkRows(0) + + def insertNull(self): + # insert 6 lines + sql = "insert into d0(ts) values(now) (now + 1s) (now + 2s) (now + 3s) (now + 4s) (now + 5s)" + tdSql.execute(sql) + + self.flushDb() + self.trimDb() + + # check all columns correct + cnt = self.insert_rows * self.childtable_count + sql = "select * from stb where bc!=1" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "select * from stb where bc is null" + tdSql.query(sql) + tdSql.checkRows(6) + sql = "select * from stb where bc=1" + tdSql.query(sql) + tdSql.checkRows(cnt) + sql = "select * from stb where usi is null" + tdSql.query(sql) + tdSql.checkRows(6) + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + + # insert data + self.insertData() + + # check insert data correct + self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.checkColValueCorrect() + + # check save agg result correct + self.checkAggCorrect() + + # insert null + self.insertNull() + + + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 91a0ac46e5..103e67be46 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -23,7 +23,7 @@ fi ,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3 ,,n,army,python3 ./test.py -f community/cmdline/fullopt.py - +,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1 # @@ -350,6 +350,7 @@ fi ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4479.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_timestamp.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py @@ -567,7 +568,7 @@ fi ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4403.py - +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_td28163.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh index 9f67d437e2..d2f1e13e8f 100755 --- a/tests/script/sh/checkAsan.sh +++ b/tests/script/sh/checkAsan.sh @@ -72,7 +72,7 @@ python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l) #0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144 #1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38 -runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | wc -l) +runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | wc -l) echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m" echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m" diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 67678963ea..ce5c57572e 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -290,6 +290,213 @@ if $data32 != $now32 then return -1 endi +print step 2 max delay 2s +sql create database test15 vgroups 4; +sql use test15; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791233001,2,2,3,1.1); + +$loop_count = 0 + +loop4: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop4 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print session max delay over + +print step 3 max delay 2s +sql create database test16 vgroups 4; +sql use test16; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791233001,2,2,3,1.1); + +$loop_count = 0 + +loop5: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop5 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print state max delay over + +print step 4 max delay 2s +sql create database test17 vgroups 4; +sql use test17; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9; + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,9,2,3,1.0); + +sql insert into t1 values(1648791233001,1,2,3,1.1); +sql insert into t1 values(1648791233009,9,2,3,1.1); + +$loop_count = 0 + +loop6: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop6 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print event max delay over + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/1-insert/test_ts4479.py b/tests/system-test/1-insert/test_ts4479.py new file mode 100644 index 0000000000..be9789b5fc --- /dev/null +++ b/tests/system-test/1-insert/test_ts4479.py @@ -0,0 +1,75 @@ +import os +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf +import taos + + +class TDTestCase: + """Verify inserting varbinary type data of ts-4479 + """ + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + self.db_name = "db" + self.stable_name = "st" + + def run(self): + tdSql.execute("create database if not exists %s" % self.db_name) + tdSql.execute("use %s" % self.db_name) + # create super table + tdSql.execute("create table %s (ts timestamp, c1 varbinary(65517)) tags (t1 varbinary(16382))" % self.stable_name) + + # varbinary tag length is more than 16382 + tag = os.urandom(16383).hex() + tdSql.error("create table ct using st tags(%s);" % ('\\x' + tag)) + + # create child table with max column and tag length + child_table_list = [] + for i in range(2): + child_table_name = "ct_" + str(i+1) + child_table_list.append(child_table_name) + tag = os.urandom(16382).hex() + tdSql.execute("create table %s using st tags('%s');" % (child_table_name, '\\x' + tag)) + tdLog.info("create table %s successfully" % child_table_name) + + # varbinary column length is more than 65517 + value = os.urandom(65518).hex() + tdSql.error("insert into ct_1 values(now, '\\x%s');" % value) + + # insert data + for i in range(10): + sql = "insert into table_name values" + for j in range(5): + value = os.urandom(65517).hex() + sql += "(now+%ss, '%s')," % (str(j+1), '\\x' + value) + for child_table in child_table_list: + tdSql.execute(sql.replace("table_name", child_table)) + tdLog.info("Insert data into %s successfully" % child_table) + tdLog.info("Insert data round %s successfully" % str(i+1)) + tdSql.execute("flush database %s" % self.db_name) + + # insert \\x to varbinary column + tdSql.execute("insert into ct_1 values(now, '\\x');") + tdSql.query("select * from ct_1 where c1 = '\\x';") + tdSql.checkRows(1) + tdSql.checkData(0, 1, b'') + + # insert \\x to varbinary tag + tdSql.execute("create table ct_3 using st tags('\\x');") + tdSql.execute("insert into ct_3 values(now, '\\x45');") + tdSql.query("select * from st where t1='';") + tdSql.checkRows(1) + tdSql.checkData(0, 2, b'') + + def stop(self): + tdSql.execute("drop database if exists %s" % self.db_name) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/test_td28163.py b/tests/system-test/2-query/test_td28163.py new file mode 100644 index 0000000000..df727f6c5a --- /dev/null +++ b/tests/system-test/2-query/test_td28163.py @@ -0,0 +1,265 @@ +import random +import itertools +from util.log import * +from util.cases import * +from util.sql import * +from util.sqlset import * +from util import constant +from util.common import * + + +class TDTestCase: + """Verify the jira TD-28163 + """ + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def prepareData(self): + # db + tdSql.execute("create database if not exists db") + tdSql.execute("use db") + + # super table + tdSql.execute("create stable st(ts timestamp, c_ts_empty timestamp, c_int int, c_int_empty int, c_unsigned_int int unsigned, \ + c_unsigned_int_empty int unsigned, c_bigint bigint, c_bigint_empty bigint, c_unsigned_bigint bigint unsigned, \ + c_unsigned_bigint_empty bigint unsigned, c_float float, c_float_empty float, c_double double, c_double_empty double, \ + c_binary binary(16), c_binary_empty binary(16), c_smallint smallint, c_smallint_empty smallint, \ + c_smallint_unsigned smallint unsigned, c_smallint_unsigned_empty smallint unsigned, c_tinyint tinyint, \ + c_tinyint_empty tinyint, c_tinyint_unsigned tinyint unsigned, c_tinyint_unsigned_empty tinyint unsigned, \ + c_bool bool, c_bool_empty bool, c_nchar nchar(16), c_nchar_empty nchar(16), c_varchar varchar(16), \ + c_varchar_empty varchar(16), c_varbinary varbinary(16), c_varbinary_empty varbinary(16)) \ + tags(t_timestamp timestamp, t_timestamp_empty timestamp, t_int int, t_int_empty int, \ + t_unsigned_int int unsigned, t_unsigned_int_empty int unsigned, t_bigint bigint, t_bigint_empty bigint, \ + t_unsigned_bigint bigint unsigned, t_unsigned_bigint_empty bigint unsigned, t_float float, t_float_empty float, \ + t_double double, t_double_empty double, t_binary binary(16), t_binary_empty binary(16), t_smallint smallint, \ + t_smallint_empty smallint, t_smallint_unsigned smallint unsigned, t_smallint_unsigned_empty smallint unsigned, \ + t_tinyint tinyint, t_tinyint_empty tinyint, t_tinyint_unsigned tinyint unsigned, t_tinyint_unsigned_empty tinyint unsigned, \ + t_bool bool, t_bool_empty bool, t_nchar nchar(16), t_nchar_empty nchar(16), t_varchar varchar(16), \ + t_varchar_empty varchar(16), t_varbinary varbinary(16), t_varbinary_empty varbinary(16));") + + # child tables + start_ts = 1704085200000 + tags = [ + "'2024-01-01 13:00:01', null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', '', 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null", + "'2024-01-01 13:00:02', null, 2, null, 2, null, 2222222222222222, null, 2222222222222222, null, 2.2, null, 2.22, null, 'bbbbbbbb', '', 2, null, 2, null, 2, null, 2, null, False, null, 'ncharbb', null, 'varcharbb', null, '0x7661726332', null", + "'2024-01-01 13:00:03', null, 3, null, 3, null, 3333333333333333, null, 3333333333333333, null, 3.3, null, 3.33, null, 'cccccccc', '', 3, null, 3, null, 3, null, 3, null, True, null, 'ncharcc', null, 'varcharcc', null, '0x7661726333', null", + "'2024-01-01 13:00:04', null, 4, null, 4, null, 4444444444444444, null, 4444444444444444, null, 4.4, null, 4.44, null, 'dddddddd', '', 4, null, 4, null, 4, null, 4, null, False, null, 'nchardd', null, 'varchardd', null, '0x7661726334', null", + "'2024-01-01 13:00:05', null, 5, null, 5, null, 5555555555555555, null, 5555555555555555, null, 5.5, null, 5.55, null, 'eeeeeeee', '', 5, null, 5, null, 5, null, 5, null, True, null, 'ncharee', null, 'varcharee', null, '0x7661726335', null", + ] + for i in range(5): + tdSql.execute(f"create table ct{i+1} using st tags({tags[i]});") + + # insert data + data = "null, 1, null, 1, null, 1111111111111111, null, 1111111111111111, null, 1.1, null, 1.11, null, 'aaaaaaaa', null, 1, null, 1, null, 1, null, 1, null, True, null, 'ncharaa', null, 'varcharaa', null, '0x7661726331', null" + for round in range(100): + sql = f"insert into ct{i+1} values" + for j in range(100): + sql += f"({start_ts + (round * 100 + j + 1) * 1000}, {data})" + sql += ";" + tdSql.execute(sql) + tdLog.debug("Prepare data successfully") + + def test_query_with_filter(self): + # total row number + tdSql.query("select count(*) from st;") + total_rows = tdSql.queryResult[0][0] + tdLog.debug("Total row number is %s" % total_rows) + + # start_ts and end_ts + tdSql.query("select first(ts), last(ts) from st;") + start_ts = tdSql.queryResult[0][0] + end_ts = tdSql.queryResult[0][1] + tdLog.debug("start_ts is %s, end_ts is %s" % (start_ts, end_ts)) + + filter_dic = { + "all_filter_list": ["ts <= now", "t_timestamp <= now", f"ts between '{start_ts}' and '{end_ts}'", + f"t_timestamp between '{start_ts}' and '{end_ts}'", "c_ts_empty is null", + "t_timestamp_empty is null", "ts > '1970-01-01 00:00:00'", "t_int in (1, 2, 3, 4, 5)", + "c_int=1", "c_int_empty is null", "c_unsigned_int=1", "c_unsigned_int_empty is null", + "c_unsigned_int in (1, 2, 3, 4, 5)", "c_unsigned_int_empty is null", "c_bigint=1111111111111111", + "c_bigint_empty is null", "c_unsigned_bigint in (1111111111111111)", "c_unsigned_bigint_empty is null", + "c_float=1.1", "c_float_empty is null", "c_double=1.11", "c_double_empty is null", "c_binary='aaaaaaaa'", + "c_binary_empty is null", "c_smallint=1", "c_smallint_empty is null", "c_smallint_unsigned=1", + "c_smallint_unsigned_empty is null", "c_tinyint=1", "c_tinyint_empty is null", "c_tinyint_unsigned=1", + "c_tinyint_unsigned_empty is null", "c_bool=True", "c_bool_empty is null", "c_nchar='ncharaa'", + "c_nchar_empty is null", "c_varchar='varcharaa'", "c_varchar_empty is null", "c_varbinary='0x7661726331'", + "c_varbinary_empty is null"], + "empty_filter_list": ["ts > now", "t_timestamp > now", "c_ts_empty is not null","t_timestamp_empty is not null", + "ts <= '1970-01-01 00:00:00'", "c_ts_empty < '1970-01-01 00:00:00'", "c_int <> 1", "c_int_empty is not null", + "t_int in (10, 11)", "t_int_empty is not null"] + } + for filter in filter_dic["all_filter_list"]: + tdLog.debug("Execute query with filter '%s'" % filter) + tdSql.query(f"select * from st where {filter};") + tdSql.checkRows(total_rows) + + for filter in filter_dic["empty_filter_list"]: + tdLog.debug("Execute query with filter '%s'" % filter) + tdSql.query(f"select * from st where {filter};") + tdSql.checkRows(0) + + def test_query_with_groupby(self): + tdSql.query("select count(*) from st group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(c_unsigned_int_empty + c_int_empty * c_float_empty - c_double_empty + c_smallint_empty / c_tinyint_empty) from st where c_int_empty is null group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, 0) + + tdSql.query("select sum(t_unsigned_int_empty + t_int_empty * t_float_empty - t_double_empty + t_smallint_empty / t_tinyint_empty) from st where t_int_empty is null group by tbname;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + + tdSql.query("select max(c_bigint_empty) from st group by tbname, t_bigint_empty, t_float_empty, t_double_empty;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + + tdSql.query("select min(t_double) as v from st where c_nchar like '%aa%' and t_double is not null group by tbname, t_bigint_empty, t_float_empty, t_double_empty order by v limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1.11) + + tdSql.query("select top(c_float, 1) as v from st where c_nchar like '%aa%' group by tbname order by v desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1.1) + + tdSql.query("select first(ts) from st where c_varchar is not null partition by tbname order by ts slimit 1;") + tdSql.checkRows(5) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + + tdSql.query("select first(c_nchar_empty) from st group by tbname;") + tdSql.checkRows(0) + + tdSql.query("select first(ts), first(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, None) + + tdSql.query("select first(c_nchar_empty) from st group by t_timestamp_empty order by t_timestamp;") + tdSql.checkRows(0) + + tdSql.query("select last(ts), last(c_nchar_empty) from st group by tbname, ts order by ts slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, None) + + tdSql.query("select elapsed(ts, 1s) t from st where c_int = 1 and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 9999) + + tdSql.query("select elapsed(ts, 1s) t from st where c_int_empty is not null and c_nchar like '%aa%' group by tbname order by t desc slimit 1 limit 1;") + tdSql.checkRows(0) + + def test_query_with_join(self): + tdSql.query("select count(*) from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is not null;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select count(t1.c_ts_empty) as v from st as t1 join st as t2 on t1.ts = t2.ts and t1.c_float_empty is null order by v desc;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int > t2.c_int;") + tdSql.checkRows(0) + + tdSql.query("select avg(t1.c_tinyint), sum(t2.c_bigint) from st t1, st t2 where t1.ts=t2.ts and t1.c_int <= t2.c_int;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 1076616672134475760) + + tdSql.query("select count(t1.c_float_empty) from st t1, st t2 where t1.ts=t2.ts and t1.c_int = t2.c_int and t1.t_int_empty=t2.t_int_empty;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + def test_query_with_window(self): + # time window + tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);") + tdSql.checkRows(841) + tdSql.checkData(0, 0, 10) + + tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);") + tdSql.checkRows(65) + + # status window + tdSql.error("select _wstart, count(*) from st state_window(t_bool);") + tdSql.query("select _wstart, count(*) from st partition by tbname state_window(c_bool);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 10000) + + # session window + tdSql.query("select _wstart, count(*) from st partition by tbname, t_int session(ts, 1m);") + tdSql.checkRows(5) + tdSql.checkData(0, 1, 10000) + + # event window + tdSql.query("select _wstart, _wend, count(*) from (select * from st order by ts, tbname) event_window start with t_bool=true end with t_bool=false;") + tdSql.checkRows(20000) + + def test_query_with_union(self): + tdSql.query("select count(ts) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(ts) from (select * from ct1 union all select * from ct2 union all select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 30000) + + tdSql.query("select count(*) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(c_ts_empty) from (select * from ct1 union select * from ct2 union select * from ct3);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 0) + + tdSql.query("select count(*) from (select ts from st union select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10001) + + tdSql.query("select count(*) from (select ts from st union all select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 100000) + + tdSql.query("select count(ts) from (select ts from st union select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 10000) + + tdSql.query("select count(ts) from (select ts from st union all select c_ts_empty from st);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 50000) + + def test_nested_query(self): + tdSql.query("select elapsed(ts, 1s) from (select * from (select * from st where c_int = 1) where c_int_empty is null);") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 9999) + + tdSql.query("select first(ts) as t, avg(c_int) as v from (select * from (select * from st where c_int = 1) where c_int_empty is null) group by t_timestamp order by t_timestamp desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, '2024-01-01 13:00:01.000') + tdSql.checkData(0, 1, 1) + + tdSql.query("select max(c_tinyint) from (select c_tinyint, tbname from st where c_float_empty is null or t_int_empty is null) group by tbname order by c_tinyint desc slimit 1 limit 1;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + + tdSql.query("select top(c_int, 3) from (select c_int, tbname from st where t_int in (2, 3)) group by tbname slimit 3;") + tdSql.checkRows(6) + tdSql.checkData(0, 0, 1) + + def run(self): + self.prepareData() + self.test_query_with_filter() + self.test_query_with_groupby() + self.test_query_with_join() + self.test_query_with_window() + self.test_query_with_union() + self.test_nested_query() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/utils/TSZ/sz/src/sz_double.c b/utils/TSZ/sz/src/sz_double.c index 1adfdf3b56..0510fc612d 100644 --- a/utils/TSZ/sz/src/sz_double.c +++ b/utils/TSZ/sz/src/sz_double.c @@ -385,9 +385,11 @@ unsigned int optimize_intervals_double_1D_opt(double *oriData, size_t dataLength totalSampleSize++; pred_value = data_pos[-1]; pred_err = fabs(pred_value - *data_pos); - radiusIndex = (unsigned long)((pred_err/realPrecision+1)/2); - if(radiusIndex>=confparams_cpr->maxRangeRadius) - radiusIndex = confparams_cpr->maxRangeRadius - 1; + double dbri = (unsigned long)((pred_err/realPrecision+1)/2); + if(dbri >= (double)confparams_cpr->maxRangeRadius) + radiusIndex = confparams_cpr->maxRangeRadius - 1; + else + radiusIndex = dbri; intervals[radiusIndex]++; data_pos += confparams_cpr->sampleDistance;