Merge remote-tracking branch 'origin/3.0' into test/chr/TD-14699
This commit is contained in:
commit
fe9f8bfe98
|
@ -1,6 +1,6 @@
|
|||
---
|
||||
title: Get Started
|
||||
description: 'Install TDengine from Docker image, apt-get or package, and run TAOS CLI and taosBenchmark to experience the features'
|
||||
description: 'Install TDengine from Docker image, apt-get or package, and run TDengine CLI and taosBenchmark to experience the features'
|
||||
---
|
||||
|
||||
import Tabs from "@theme/Tabs";
|
||||
|
@ -120,7 +120,7 @@ select * from t;
|
|||
Query OK, 2 row(s) in set (0.003128s)
|
||||
```
|
||||
|
||||
Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TAOS CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/).
|
||||
Besides executing SQL commands, system administrators can check running status, add/drop user accounts and manage the running instances. TDengine CLI with client driver can be installed and run on either Linux or Windows machines. For more details on CLI, please [check here](../reference/taos-shell/).
|
||||
|
||||
## Experience the blazing fast speed
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ USE power;
|
|||
|
||||
## Create STable
|
||||
|
||||
In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/tdinternal/arch#model_table1), the SQL statement below can be used to create the super table.
|
||||
In a time-series application, there may be multiple kinds of data collection points. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy and efficient aggregation of multiple tables, one STable needs to be created for each kind of data collection point. For example, for the meters in [table 1](/concept/#model_table1), the SQL statement below can be used to create the super table.
|
||||
|
||||
```sql
|
||||
CREATE STable meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
|
||||
|
|
|
@ -22,7 +22,7 @@ import CStmt from "./_c_stmt.mdx";
|
|||
|
||||
## Introduction
|
||||
|
||||
Application programs can execute `INSERT` statement through connectors to insert rows. The TAOS CLI can also be used to manually insert data.
|
||||
Application programs can execute `INSERT` statement through connectors to insert rows. The TDengine CLI can also be used to manually insert data.
|
||||
|
||||
### Insert Single Row
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
---
|
||||
title: 立即开始
|
||||
description: '从 Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TAOS CLI和工具 taosdemo 快速体验 TDengine 功能'
|
||||
description: '从 Docker,安装包或使用 apt-get 快速安装 TDengine, 通过命令行程序TDengine CLI和工具 taosdemo 快速体验 TDengine 功能'
|
||||
---
|
||||
|
||||
import Tabs from "@theme/Tabs";
|
||||
|
@ -122,7 +122,7 @@ select * from t;
|
|||
Query OK, 2 row(s) in set (0.003128s)
|
||||
```
|
||||
|
||||
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TAOS CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
|
||||
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
|
||||
|
||||
## 使用 taosBenchmark 体验写入速度
|
||||
|
||||
|
|
|
@ -1519,7 +1519,7 @@ typedef struct {
|
|||
#define STREAM_TRIGGER_MAX_DELAY 3
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
|
@ -1539,7 +1539,7 @@ int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStrea
|
|||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
int64_t streamId;
|
||||
char* sql;
|
||||
char* executorMsg;
|
||||
|
@ -2008,7 +2008,7 @@ typedef struct {
|
|||
char sql[TSDB_SHOW_SQL_LEN];
|
||||
uint64_t queryId;
|
||||
int64_t useconds;
|
||||
int64_t stime; // timestamp precision ms
|
||||
int64_t stime; // timestamp precision ms
|
||||
int64_t reqRid;
|
||||
int32_t pid;
|
||||
bool stableQuery;
|
||||
|
@ -2257,13 +2257,13 @@ typedef struct {
|
|||
} SMqVDeleteRsp;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
int64_t streamId;
|
||||
} SMDropStreamTaskReq;
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
int8_t igNotExists;
|
||||
} SMDropStreamReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
} SMDropStreamTaskRsp;
|
||||
} SMDropStreamRsp;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
|
|
|
@ -95,7 +95,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
* @return
|
||||
*/
|
||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -159,13 +159,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
|
|||
*/
|
||||
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
|
||||
|
||||
/**
|
||||
* Update the table id list of a given query.
|
||||
* @param uid child table uid
|
||||
* @param type operation type: ADD|DROP
|
||||
* @return
|
||||
*/
|
||||
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
|
||||
|
||||
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||
|
||||
|
|
|
@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx {
|
|||
} SqlFunctionCtx;
|
||||
|
||||
enum {
|
||||
TEXPR_NODE_DUMMY = 0x0,
|
||||
TEXPR_BINARYEXPR_NODE= 0x1,
|
||||
TEXPR_UNARYEXPR_NODE = 0x2,
|
||||
TEXPR_FUNCTION_NODE = 0x3,
|
||||
TEXPR_COL_NODE = 0x4,
|
||||
TEXPR_VALUE_NODE = 0x8,
|
||||
};
|
||||
|
||||
typedef struct tExprNode {
|
||||
|
|
|
@ -410,10 +410,10 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908)
|
||||
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
|
||||
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
|
||||
|
||||
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
|
||||
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
|
||||
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
|
||||
#define TSDB_CODE_SYN_IS_LEADER TAOS_DEF_ERROR_CODE(0, 0x090B)
|
||||
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
|
||||
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
|
||||
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
|
||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||
|
||||
// tq
|
||||
|
|
|
@ -210,7 +210,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_STREAM_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
|
|
|
@ -44,6 +44,7 @@ void showDB(TAOS* pConn) {
|
|||
}
|
||||
|
||||
void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
||||
#if 0
|
||||
printf("numOfRow = %d \n", numOfRow);
|
||||
int numFields = taos_num_fields(res);
|
||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||
|
@ -63,6 +64,13 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
|||
// taos_close(_taos);
|
||||
// taos_cleanup();
|
||||
}
|
||||
#endif
|
||||
if (numOfRow == 0) {
|
||||
printf("completed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
taos_fetch_raw_block_a(res, fetchCallback, param);
|
||||
}
|
||||
|
||||
void queryCallback(void* param, void* res, int32_t code) {
|
||||
|
@ -70,7 +78,7 @@ void queryCallback(void* param, void* res, int32_t code) {
|
|||
printf("failed to execute, reason:%s\n", taos_errstr(res));
|
||||
}
|
||||
printf("start to fetch data\n");
|
||||
taos_fetch_rows_a(res, fetchCallback, param);
|
||||
taos_fetch_raw_block_a(res, fetchCallback, param);
|
||||
}
|
||||
|
||||
void queryCallback1(void* param, void* res, int32_t code) {
|
||||
|
@ -778,9 +786,9 @@ TEST(testCase, async_api_test) {
|
|||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
taos_query(pConn, "use nest");
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
|
||||
taos_query(pConn, "use table_alltype_hyperloglog");
|
||||
#if 0
|
||||
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
@ -802,8 +810,9 @@ TEST(testCase, async_api_test) {
|
|||
printf("%s\n", str);
|
||||
memset(str, 0, sizeof(str));
|
||||
}
|
||||
#endif
|
||||
|
||||
taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn);
|
||||
taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
|
||||
getchar();
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
|
|
@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
|||
SColumnInfoData colInfo = {0};
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
colInfo.info = p->info;
|
||||
colInfo.hasNull = true;
|
||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -539,27 +539,36 @@ typedef struct {
|
|||
} SMqRebOutputObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int64_t targetStbUid;
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int32_t vgNum;
|
||||
SRWLatch lock;
|
||||
int8_t status;
|
||||
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
||||
int32_t fixedSinkVgId; // 0 for shuffle
|
||||
SVgObj fixedSinkVg;
|
||||
int64_t smaId; // 0 for unused
|
||||
int8_t trigger;
|
||||
int64_t triggerParam;
|
||||
int64_t watermark;
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
// ctl
|
||||
SRWLatch lock;
|
||||
// create info
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int32_t version;
|
||||
int64_t smaId; // 0 for unused
|
||||
// info
|
||||
int64_t uid;
|
||||
int8_t status;
|
||||
// config
|
||||
int8_t dropPolicy;
|
||||
int8_t trigger;
|
||||
int64_t triggerParam;
|
||||
int64_t watermark;
|
||||
// source and target
|
||||
int64_t sourceDbUid;
|
||||
int64_t targetDbUid;
|
||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||
char targetDb[TSDB_DB_FNAME_LEN];
|
||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||
int64_t targetStbUid;
|
||||
int32_t fixedSinkVgId; // 0 for shuffle
|
||||
// fixedSinkVg is not applicable for encode and decode
|
||||
SVgObj fixedSinkVg;
|
||||
|
||||
// transformation
|
||||
char* sql;
|
||||
char* ast;
|
||||
char* physicalPlan;
|
||||
SArray* tasks; // SArray<SArray<SStreamTask>>
|
||||
SSchemaWrapper outputSchema;
|
||||
|
|
|
@ -32,6 +32,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
|||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||
int64_t watermark, double filesFactor);
|
||||
|
||||
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -27,8 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
|
|||
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
||||
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
||||
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t *pRspLen);
|
||||
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
|
||||
|
||||
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
||||
|
@ -36,6 +35,9 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
|||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||
|
||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,7 +31,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
|||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
|
||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -18,34 +18,35 @@
|
|||
#include "mndConsumer.h"
|
||||
|
||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||
int32_t sz = 0;
|
||||
/*int32_t outputNameSz = 0;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
|
||||
if (tEncodeI8(pEncoder, pObj->dropPolicy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||
|
||||
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
// TODO encode tasks
|
||||
if (pObj->tasks) {
|
||||
sz = taosArrayGetSize(pObj->tasks);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pObj->tasks);
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pArray = taosArrayGetP(pObj->tasks, i);
|
||||
int32_t innerSz = taosArrayGetSize(pArray);
|
||||
|
@ -58,40 +59,37 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
|
||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
#if 0
|
||||
if (pObj->ColAlias != NULL) {
|
||||
outputNameSz = taosArrayGetSize(pObj->ColAlias);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name = taosArrayGetP(pObj->ColAlias, i);
|
||||
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
||||
}
|
||||
#endif
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
|
||||
if (tDecodeI8(pDecoder, &pObj->dropPolicy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
|
||||
pObj->tasks = NULL;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
|
@ -112,21 +110,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
|||
}
|
||||
|
||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||
#if 0
|
||||
int32_t outputNameSz;
|
||||
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
||||
if (outputNameSz != 0) {
|
||||
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
|
||||
if (pObj->ColAlias == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name;
|
||||
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
||||
taosArrayPush(pObj->ColAlias, &name);
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -397,7 +397,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
|
||||
char logBuf[512] = {0};
|
||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t mndTick = 0;
|
||||
if (++mndTick % 10 == 1) {
|
||||
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
|
|
|
@ -131,7 +131,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
|
|||
int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) {
|
||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||
// sink
|
||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||
if (pStream->smaId != 0) {
|
||||
pTask->sinkType = TASK_SINK__SMA;
|
||||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
|
@ -275,7 +275,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
|
|||
pTask->execType = TASK_EXEC__NONE;
|
||||
|
||||
// sink
|
||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||
if (pStream->smaId != 0) {
|
||||
pTask->sinkType = TASK_SINK__SMA;
|
||||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
|
@ -321,7 +321,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
|||
pTask->execType = TASK_EXEC__NONE;
|
||||
|
||||
// sink
|
||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||
if (pStream->smaId != 0) {
|
||||
pTask->sinkType = TASK_SINK__SMA;
|
||||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
|
@ -346,8 +346,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pStream->vgNum == 0);
|
||||
|
||||
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
ASSERT(totLevel <= 2);
|
||||
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
|
||||
|
@ -399,7 +397,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
|
||||
// exec
|
||||
pFinalTask->execType = TASK_EXEC__PIPE;
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
|
||||
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
|
||||
if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
@ -420,7 +418,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SVgObj* pVgroup;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pStream->dbUid) {
|
||||
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
@ -463,7 +461,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
SVgObj* pVgroup;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pStream->dbUid) {
|
||||
if (pVgroup->dbUid != pStream->sourceDbUid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "mndDnode.h"
|
||||
#include "mndInfoSchema.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndScheduler.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndStream.h"
|
||||
|
@ -520,6 +521,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
smaObj.createdTime = taosGetTimestampMs();
|
||||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
ASSERT(smaObj.uid != 0);
|
||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name);
|
||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -556,13 +558,13 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.dbUid = pDb->uid;
|
||||
streamObj.sourceDbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
streamObj.createdBy = STREAM_CREATED_BY__SMA;
|
||||
streamObj.smaId = smaObj.uid;
|
||||
streamObj.watermark = 0;
|
||||
streamObj.trigger = STREAM_TRIGGER_AT_ONCE;
|
||||
streamObj.ast = strdup(smaObj.ast);
|
||||
|
||||
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
|
||||
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
|
||||
|
@ -571,6 +573,39 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
|
||||
streamObj.fixedSinkVgId = smaObj.dstVgId;
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(streamObj.ast, &pAst) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// extract output schema from ast
|
||||
if (qExtractResultSchema(pAst, (int32_t *)&streamObj.outputSchema.nCols, &streamObj.outputSchema.pSchema) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SQueryPlan *pPlan = NULL;
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.triggerType = streamObj.trigger,
|
||||
.watermark = streamObj.watermark,
|
||||
};
|
||||
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// save physcial plan
|
||||
if (nodesNodeToString((SNode *)pPlan, false, &streamObj.physicalPlan, NULL) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
@ -585,7 +620,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
// if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
|
||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -28,7 +28,6 @@
|
|||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "mndSma.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define STB_VER_NUMBER 1
|
||||
|
@ -1271,7 +1270,8 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
|
||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp,
|
||||
int32_t *smaVer) {
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||
|
||||
|
@ -1672,7 +1672,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 0; i < numOfStbs; ++i) {
|
||||
SSTableVersion *pStbVersion = &pStbVersions[i];
|
||||
pStbVersion->suid = be64toh(pStbVersion->suid);
|
||||
|
@ -1681,7 +1681,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
|
|||
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
|
||||
|
||||
STableMetaRsp metaRsp = {0};
|
||||
int32_t smaVer = 0;
|
||||
int32_t smaVer = 0;
|
||||
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
|
||||
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
|
||||
metaRsp.numOfColumns = -1;
|
||||
|
@ -1697,15 +1697,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
|
|||
}
|
||||
|
||||
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
|
||||
bool exist = false;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
bool exist = false;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
STableIndexRsp indexRsp = {0};
|
||||
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
|
||||
if (NULL == indexRsp.pIndex) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
|
||||
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
|
||||
if (code || !exist) {
|
||||
|
@ -1769,16 +1769,23 @@ int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mndExtractTableName(char *tableId, char *name) {
|
||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
tNameGetFullDbName(&name, dst);
|
||||
}
|
||||
|
||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
|
||||
int32_t pos = -1;
|
||||
int32_t num = 0;
|
||||
for (pos = 0; tableId[pos] != 0; ++pos) {
|
||||
if (tableId[pos] == TS_PATH_DELIMITER[0]) num++;
|
||||
for (pos = 0; stbFullName[pos] != 0; ++pos) {
|
||||
if (stbFullName[pos] == TS_PATH_DELIMITER[0]) num++;
|
||||
if (num == 2) break;
|
||||
}
|
||||
|
||||
if (num == 2) {
|
||||
strcpy(name, tableId + pos + 1);
|
||||
tstrncpy(dst, stbFullName + pos + 1, dstSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1808,7 +1815,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
|
||||
SName name = {0};
|
||||
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
|
||||
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
|
||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
|
|
@ -196,7 +196,8 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
|
|||
}
|
||||
|
||||
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
|
||||
pCreate->targetStbFullName[0] == 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
@ -232,6 +233,114 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||
SNode *pAst = NULL;
|
||||
|
||||
mDebug("stream:%s to create", pCreate->name);
|
||||
memcpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
pObj->createTime = taosGetTimestampMs();
|
||||
pObj->updateTime = pObj->createTime;
|
||||
pObj->version = 1;
|
||||
pObj->smaId = 0;
|
||||
|
||||
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||
pObj->status = 0;
|
||||
|
||||
// TODO
|
||||
pObj->dropPolicy = 0;
|
||||
pObj->trigger = pCreate->triggerType;
|
||||
pObj->triggerParam = pCreate->maxDelay;
|
||||
pObj->watermark = pCreate->watermark;
|
||||
|
||||
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||
if (pSourceDb == NULL) {
|
||||
/*ASSERT(0);*/
|
||||
mDebug("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb);
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
pObj->sourceDbUid = pSourceDb->uid;
|
||||
|
||||
memcpy(pObj->targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
||||
if (pTargetDb == NULL) {
|
||||
mDebug("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb);
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
||||
pObj->targetStbUid = mndGenerateUid(pObj->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pObj->targetDbUid = pTargetDb->uid;
|
||||
|
||||
pObj->sql = pCreate->sql;
|
||||
pObj->ast = pCreate->ast;
|
||||
|
||||
pCreate->sql = NULL;
|
||||
pCreate->ast = NULL;
|
||||
|
||||
// deserialize ast
|
||||
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// extract output schema from ast
|
||||
if (qExtractResultSchema(pAst, (int32_t *)&pObj->outputSchema.nCols, &pObj->outputSchema.pSchema) != 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
SQueryPlan *pPlan = NULL;
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
.streamQuery = true,
|
||||
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
|
||||
.watermark = pObj->watermark,
|
||||
};
|
||||
|
||||
// using ast and param to build physical plan
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
// save physcial plan
|
||||
if (nodesNodeToString((SNode *)pPlan, false, &pObj->physicalPlan, NULL) != 0) {
|
||||
/*ASSERT(0);*/
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
FAIL:
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
|
||||
SNode *pAst = NULL;
|
||||
|
||||
|
@ -278,8 +387,8 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
}
|
||||
|
||||
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
|
||||
SMCreateStbReq createReq = {0};
|
||||
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -321,7 +430,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
|
||||
if (mndCheckDbAuth(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -357,24 +465,24 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
ASSERT(pTask->nodeId != 0);
|
||||
|
||||
// vnode
|
||||
if (pTask->nodeId > 0) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pReq->head.vgId = htonl(pTask->nodeId);
|
||||
pReq->taskId = pTask->taskId;
|
||||
STransAction action = {0};
|
||||
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SVDropStreamTaskReq);
|
||||
action.msgType = TDMT_VND_STREAM_TASK_DROP;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
/*if (pTask->nodeId > 0) {*/
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pReq->head.vgId = htonl(pTask->nodeId);
|
||||
pReq->taskId = pTask->taskId;
|
||||
STransAction action = {0};
|
||||
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SVDropStreamTaskReq);
|
||||
action.msgType = TDMT_VND_STREAM_TASK_DROP;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
/*}*/
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -404,10 +512,9 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
|||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
streamObj.dbUid = pDb->uid;
|
||||
streamObj.sourceDbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
streamObj.createdBy = STREAM_CREATED_BY__USER;
|
||||
// TODO
|
||||
streamObj.fixedSinkVgId = 0;
|
||||
streamObj.smaId = 0;
|
||||
|
@ -487,7 +594,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
// TODO check auth
|
||||
// TODO check read auth for source and write auth for target
|
||||
#if 0
|
||||
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
|
@ -497,9 +605,60 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
#endif
|
||||
|
||||
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB);
|
||||
// TODO
|
||||
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||
|
||||
// build stream obj from request
|
||||
SStreamObj streamObj = {0};
|
||||
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
|
||||
ASSERT(0);
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// create stb for stream
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
|
||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// schedule stream task for stream obj
|
||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// add stream to trans
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// execute creation
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
/*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/
|
||||
/*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
|
@ -520,13 +679,19 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
/*SDbObj *pDb = NULL;*/
|
||||
/*SUserObj *pUser = NULL;*/
|
||||
|
||||
SMDropStreamTaskReq dropStreamReq = *(SMDropStreamTaskReq *)pReq->pCont;
|
||||
SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont;
|
||||
|
||||
pStream = mndAcquireStream(pMnode, dropStreamReq.name);
|
||||
pStream = mndAcquireStream(pMnode, dropReq.name);
|
||||
|
||||
if (pStream == NULL) {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
return -1;
|
||||
if (dropReq.igNotExists) {
|
||||
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
|
||||
code = 0;
|
||||
goto DROP_STREAM_OVER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -539,18 +704,62 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to drop since %s", dropStreamReq.name, terrstr());
|
||||
return -1;
|
||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
return code;
|
||||
}
|
||||
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropStreamReq.name);
|
||||
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
|
||||
|
||||
// drop all tasks
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", dropStreamReq.name, terrstr());
|
||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
||||
return code;
|
||||
}
|
||||
|
||||
// drop stream
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
DROP_STREAM_OVER:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SStreamObj *pStream = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pStream->sourceDbUid == pDb->uid || pStream->targetDbUid == pDb->uid) {
|
||||
if (pStream->sourceDbUid != pStream->targetDbUid) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
return -1;
|
||||
} else {
|
||||
// TODO drop all task on snode
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (mndSetDropOffsetStreamLogs(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
goto END;
|
||||
}
|
||||
#endif
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -569,7 +778,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
|
|||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pStream->dbUid == pDb->uid) {
|
||||
if (pStream->sourceDbUid == pDb->uid) {
|
||||
numOfStreams++;
|
||||
}
|
||||
|
||||
|
|
|
@ -258,14 +258,6 @@ void mndSyncStart(SMnode *pMnode) {
|
|||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||
syncStart(pMgmt->sync);
|
||||
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
|
||||
|
||||
/*
|
||||
if (pMgmt->standby) {
|
||||
syncStartStandBy(pMgmt->sync);
|
||||
} else {
|
||||
syncStart(pMgmt->sync);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
void mndSyncStop(SMnode *pMnode) {
|
||||
|
|
|
@ -300,108 +300,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
|||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t ret = 0;
|
||||
|
||||
if (syncEnvIsStart()) {
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
ESyncState state = syncGetMyRole(pVnode->sync);
|
||||
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
||||
char logBuf[512] = {0};
|
||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t vndTick = 0;
|
||||
if (++vndTick % 10 == 1) {
|
||||
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
}
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
||||
ret = syncSetStandby(pVnode->sync);
|
||||
vInfo("vgId:%d, set standby result:0x%x rid:%" PRId64, pVnode->config.vgId, ret, pVnode->sync);
|
||||
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
||||
tmsgSendRsp(&rsp);
|
||||
} else {
|
||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
} else {
|
||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
if (ret != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
|
|
|
@ -50,6 +50,33 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t vnodeSetStandBy(SVnode *pVnode) {
|
||||
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
|
||||
|
||||
if (syncSetStandby(pVnode->sync) == 0) {
|
||||
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
||||
return 0;
|
||||
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
|
||||
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
||||
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
|
||||
return -1;
|
||||
} else {
|
||||
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
||||
}
|
||||
|
||||
if (syncSetStandby(pVnode->sync) == 0) {
|
||||
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
||||
return 0;
|
||||
} else {
|
||||
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
SAlterVnodeReq req = {0};
|
||||
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
|
||||
|
@ -74,14 +101,19 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
|
||||
if (code != 0) {
|
||||
vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
|
||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
||||
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
|
||||
} else {
|
||||
vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode));
|
||||
if (terrno != 0) code = terrno;
|
||||
|
||||
vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
|
||||
if (terrno == TSDB_CODE_SYN_IS_LEADER) {
|
||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
||||
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
|
||||
} else {
|
||||
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -108,7 +140,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
|
||||
if (code == 0) {
|
||||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||
|
||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
SEpSet newEpSet = {0};
|
||||
syncGetEpSet(pVnode->sync, &newEpSet);
|
||||
|
@ -170,6 +201,108 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t ret = 0;
|
||||
|
||||
if (syncEnvIsStart()) {
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
ESyncState state = syncGetMyRole(pVnode->sync);
|
||||
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
||||
char logBuf[512] = {0};
|
||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
static int64_t vndTick = 0;
|
||||
if (++vndTick % 10 == 1) {
|
||||
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
}
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
||||
ret = vnodeSetStandBy(pVnode);
|
||||
if (ret != 0 && terrno != 0) ret = terrno;
|
||||
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
||||
tmsgSendRsp(&rsp);
|
||||
} else {
|
||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
} else {
|
||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
if (ret != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||
if (code != 0) {
|
||||
|
@ -258,17 +391,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
|||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
|
||||
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
|
||||
|
||||
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
||||
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
||||
|
||||
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
||||
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
||||
|
||||
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
|
||||
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
|
||||
|
||||
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
||||
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
||||
|
||||
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
|
||||
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
|
||||
|
||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
|
@ -279,7 +412,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinishCb = NULL;
|
||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||
|
||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
|
||||
|
@ -292,7 +424,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||
SSyncInfo syncInfo = {
|
||||
.isStandBy = false,
|
||||
.snapshotEnable = false,
|
||||
.vgId = pVnode->config.vgId,
|
||||
.isStandBy = pVnode->config.standby,
|
||||
|
@ -321,13 +452,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
void vnodeSyncStart(SVnode *pVnode) {
|
||||
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||
syncStart(pVnode->sync);
|
||||
/*
|
||||
if (pVnode->config.standby) {
|
||||
syncStartStandBy(pVnode->sync);
|
||||
} else {
|
||||
syncStart(pVnode->sync);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||
|
|
|
@ -55,7 +55,6 @@ typedef struct SResultRow {
|
|||
uint32_t numOfRows; // number of rows of current time window
|
||||
STimeWindow win;
|
||||
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
|
||||
// char *key; // start key of current result row
|
||||
} SResultRow;
|
||||
|
||||
typedef struct SResultRowPosition {
|
||||
|
|
|
@ -189,7 +189,7 @@ typedef struct SExecTaskInfo {
|
|||
} schemaVer;
|
||||
|
||||
STableListInfo tableqinfoList; // this is a table list
|
||||
char* sql; // query sql string
|
||||
const char* sql; // query sql string
|
||||
jmp_buf env; // jump to this position when error happens.
|
||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||
struct SOperatorInfo* pRoot;
|
||||
|
@ -544,6 +544,13 @@ typedef struct SFillOperatorInfo {
|
|||
bool multigroupResult;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SScalarSupp {
|
||||
SExprInfo* pScalarExprInfo;
|
||||
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
||||
SqlFunctionCtx* pScalarFuncCtx;
|
||||
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
||||
} SScalarSupp;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo;
|
||||
|
@ -556,10 +563,7 @@ typedef struct SGroupbyOperatorInfo {
|
|||
char* keyBuf; // group by keys for hash
|
||||
int32_t groupKeyLen; // total group by column width
|
||||
SGroupResInfo groupResInfo;
|
||||
SExprInfo* pScalarExprInfo;
|
||||
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
||||
SqlFunctionCtx* pScalarFuncCtx;
|
||||
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
||||
SScalarSupp scalarSup;
|
||||
} SGroupbyOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
|
@ -583,6 +587,7 @@ typedef struct SPartitionOperatorInfo {
|
|||
void* pGroupIter; // group iterator
|
||||
int32_t pageIndex; // page index of current group
|
||||
SSDataBlock* pUpdateRes;
|
||||
SScalarSupp scalarSupp;
|
||||
} SPartitionOperatorInfo;
|
||||
|
||||
typedef struct SWindowRowsSup {
|
||||
|
@ -760,6 +765,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
|
|||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
|
||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||
|
@ -839,20 +846,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
|
||||
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
|
||||
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream,
|
||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
#if 0
|
||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||
|
@ -890,7 +894,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
|
|||
|
||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
EOPTR_EXEC_MODEL model);
|
||||
const char* sql, EOPTR_EXEC_MODEL model);
|
||||
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
||||
int32_t* resNum);
|
||||
|
|
|
@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
|||
*/
|
||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
* get proper sort buffer pages according to the row size
|
||||
* @param rowSize
|
||||
* @return
|
||||
*/
|
||||
int32_t getProperSortPageSize(size_t rowSize);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -28,17 +28,6 @@ typedef struct SCompSupporter {
|
|||
int32_t order;
|
||||
} SCompSupporter;
|
||||
|
||||
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
|
||||
int32_t size = 0;
|
||||
|
||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
||||
// size += pQueryAttr->pExpr1[i].base.interBytes;
|
||||
}
|
||||
|
||||
assert(size >= 0);
|
||||
return size;
|
||||
}
|
||||
|
||||
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
|
||||
pResultRowInfo->size = 0;
|
||||
pResultRowInfo->cur.pageId = -1;
|
||||
|
|
|
@ -121,7 +121,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
}
|
||||
|
||||
qTaskInfo_t pTaskInfo = NULL;
|
||||
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: destroy SSubplan & pTaskInfo
|
||||
terrno = code;
|
||||
|
|
|
@ -31,13 +31,13 @@ static void initRefPool() {
|
|||
}
|
||||
|
||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
||||
assert(readHandle != NULL && pSubplan != NULL);
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
|
|
@ -40,9 +40,6 @@
|
|||
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||
|
||||
#define SDATA_BLOCK_INITIALIZER \
|
||||
(SDataBlockInfo) { {0}, 0 }
|
||||
|
||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||
|
||||
#if 0
|
||||
|
@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
|
|||
|
||||
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
||||
|
||||
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
|
||||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
||||
|
@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
|
|||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
||||
}
|
||||
|
||||
static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey, bool ascQuery,
|
||||
bool timeWindowInterpo) {
|
||||
int64_t skey = TSKEY_INITIAL_VAL;
|
||||
#if 0
|
||||
int32_t i = 0;
|
||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
||||
SResultRow* pResult = pResultRowInfo->pResult[i];
|
||||
if (pResult->closed) {
|
||||
break;
|
||||
}
|
||||
|
||||
// new closed result rows
|
||||
if (timeWindowInterpo) {
|
||||
if (pResult->endInterp &&
|
||||
((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
|
||||
if (i > 0) { // the first time window, the startInterp is false.
|
||||
assert(pResult->startInterp);
|
||||
}
|
||||
|
||||
closeResultRow(pResultRowInfo, i);
|
||||
} else {
|
||||
skey = pResult->win.skey;
|
||||
}
|
||||
} else {
|
||||
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
|
||||
closeResultRow(pResultRowInfo, i);
|
||||
} else {
|
||||
skey = pResult->win.skey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// all result rows are closed, set the last one to be the skey
|
||||
if (skey == TSKEY_INITIAL_VAL) {
|
||||
if (pResultRowInfo->size == 0) {
|
||||
// assert(pResultRowInfo->current == NULL);
|
||||
assert(pResultRowInfo->curPos == -1);
|
||||
pResultRowInfo->curPos = -1;
|
||||
} else {
|
||||
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
||||
}
|
||||
} else {
|
||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
||||
SResultRow* pResult = pResultRowInfo->pResult[i];
|
||||
if (pResult->closed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == pResultRowInfo->size - 1) {
|
||||
pResultRowInfo->curPos = i;
|
||||
} else {
|
||||
pResultRowInfo->curPos = i + 1; // current not closed result object
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
//
|
||||
// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
|
||||
// bool ascQuery, bool interp) {
|
||||
// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
|
||||
// closeAllResultRows(pResultRowInfo);
|
||||
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
||||
// } else {
|
||||
// int32_t step = ascQuery ? 1 : -1;
|
||||
// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
|
||||
// }
|
||||
//}
|
||||
|
||||
// query_range_start, query_range_end, window_duration, window_start, window_end
|
||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
||||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
|
@ -886,7 +812,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
|
|||
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
||||
assert(pResultRow != NULL);
|
||||
|
||||
setResultRowKey(pResultRow, pData, type);
|
||||
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -908,21 +833,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
||||
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
// }
|
||||
//
|
||||
// // denote the order type
|
||||
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
||||
// // return pCtx->param[0].i == pQueryAttr->order.order;
|
||||
// }
|
||||
|
||||
// in the reverse table scan, only the following functions need to be executed
|
||||
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
|
||||
// (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
|
||||
// return false;
|
||||
// }
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3935,27 +3845,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
|||
}
|
||||
}
|
||||
|
||||
// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
|
||||
// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
// if (size == 0) {
|
||||
// return NULL;
|
||||
// }
|
||||
//
|
||||
// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
|
||||
// if (pTableQueryInfo == NULL) {
|
||||
// return NULL;
|
||||
// }
|
||||
//
|
||||
// for (int32_t j = 0; j < size; ++j) {
|
||||
// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
|
||||
// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
|
||||
// pTQueryInfo->lastKey = pk->lastKey;
|
||||
// }
|
||||
//
|
||||
// pTableQueryInfo->lastKey = 0;
|
||||
// return pTableQueryInfo;
|
||||
//}
|
||||
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -4525,7 +4414,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
|
|||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||
|
||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
|
||||
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
||||
SMetaReader mr = {0};
|
||||
|
@ -4645,12 +4533,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
tsdbReaderT pDataReader =
|
||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||
if (code) {
|
||||
tsdbCleanupReadHandle(pDataReader);
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4659,6 +4549,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
taosArrayDestroy(groupKeys);
|
||||
if (code) {
|
||||
tsdbCleanupReadHandle(pDataReader);
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -4871,12 +4762,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
pTaskInfo);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
||||
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
|
||||
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
|
||||
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
|
@ -5314,7 +5200,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
|
|||
}
|
||||
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||
EOPTR_EXEC_MODEL model) {
|
||||
const char* sql, EOPTR_EXEC_MODEL model) {
|
||||
uint64_t queryId = pPlan->id.queryId;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -5324,6 +5210,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
goto _complete;
|
||||
}
|
||||
|
||||
(*pTaskInfo)->sql = sql;
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
|
||||
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
|
||||
if (NULL == (*pTaskInfo)->pRoot) {
|
||||
|
|
|
@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
|
||||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->pScalarExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
||||
if (pInfo->scalarSup.pScalarExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pInfo->pGroupCols = pGroupColList;
|
||||
pInfo->pCondition = pCondition;
|
||||
|
||||
pInfo->pScalarExprInfo = pScalarExprInfo;
|
||||
pInfo->numOfScalarExpr = numOfScalarExpr;
|
||||
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
|
||||
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
|
||||
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
|
||||
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
|
||||
|
||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
|
@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
|
||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||
if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
|
||||
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
}
|
||||
|
||||
doHashPartition(pOperator, pBlock);
|
||||
}
|
||||
|
||||
|
@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFree(pInfo->columnOffset);
|
||||
}
|
||||
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
|
||||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pGroupCols = pGroupColList;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
|
||||
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
|
||||
if (pPartNode->pExprs != NULL) {
|
||||
pInfo->scalarSupp.numOfScalarExpr = 0;
|
||||
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
|
||||
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
|
||||
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
|
@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
uint32_t defaultPgsz = 0;
|
||||
uint32_t defaultBufsz = 0;
|
||||
getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
|
||||
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
|
||||
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||
pInfo->binfo.pRes = pResultBlock;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->info = pInfo;
|
||||
|
|
|
@ -1234,6 +1234,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
|||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||
#if 0
|
||||
SFilterInfo* filter = NULL;
|
||||
|
||||
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
||||
|
@ -1279,6 +1281,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
|||
|
||||
px->info.rows = numOfRow;
|
||||
pInfo->pRes = px;
|
||||
#endif
|
||||
|
||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
@ -1457,6 +1460,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||
doFilterResult(pInfo);
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
@ -1545,10 +1550,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
|||
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||
// blockDataDestroy(p); todo handle memory leak
|
||||
|
||||
pInfo->pRes->info.rows = p->info.rows;
|
||||
return p->info.rows;
|
||||
blockDataDestroy(p);
|
||||
|
||||
return pInfo->pRes->info.rows;
|
||||
}
|
||||
|
||||
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
|
|
|
@ -420,24 +420,29 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pSortInfo = pSortInfo;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
pOperator->name = "MultiwaySortMerge";
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
pOperator->name = "MultiwaySortMerge";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||
pInfo->hasGroupId = false;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pInfo->hasGroupId = false;
|
||||
pInfo->prefetchedTuple = NULL;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||
|
||||
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
|
||||
numOfSources = TMAX(2, numOfSources);
|
||||
|
||||
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
||||
|
|
|
@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
|
|||
continue;
|
||||
}
|
||||
|
||||
// if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
|
||||
// pCtx[k].start.key = INT64_MIN;
|
||||
// continue;
|
||||
// }
|
||||
|
||||
SFunctParam* pParam = &pCtx[k].param[0];
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
|
||||
|
||||
|
|
|
@ -204,7 +204,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
|||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||
SSortSource* pSource = cmpParam->pSources[i];
|
||||
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
||||
|
||||
if (taosArrayGetSize(pSource->pageIdList) == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
||||
|
||||
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
|
||||
code = blockDataFromBuf(pSource->src.pBlock, pPage);
|
||||
|
@ -532,6 +537,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t getProperSortPageSize(size_t rowSize) {
|
||||
uint32_t defaultPageSize = 4096;
|
||||
|
||||
uint32_t pgSize = 0;
|
||||
if (rowSize * 4 > defaultPageSize) {
|
||||
pgSize = rowSize * 4;
|
||||
} else {
|
||||
pgSize = defaultPageSize;
|
||||
}
|
||||
|
||||
return pgSize;
|
||||
}
|
||||
|
||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
|
||||
|
@ -557,14 +575,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
|
||||
if (!hasGroupId) {
|
||||
// calculate the buffer pages according to the total available buffers.
|
||||
int32_t rowSize = blockDataGetRowSize(pBlock);
|
||||
if (rowSize * 4 > 4096) {
|
||||
pHandle->pageSize = rowSize * 4;
|
||||
} else {
|
||||
pHandle->pageSize = 4096;
|
||||
}
|
||||
|
||||
// todo!!
|
||||
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
|
||||
|
||||
// todo, number of pages are set according to the total available sort buffer
|
||||
pHandle->numOfPages = 1024;
|
||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||
|
||||
|
@ -577,7 +590,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
|||
if (pHandle->beforeFp != NULL) {
|
||||
pHandle->beforeFp(pBlock, pHandle->param);
|
||||
}
|
||||
// todo relocate the columns
|
||||
|
||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
|
|
|
@ -945,7 +945,7 @@ TEST(testCase, build_executor_tree_Test) {
|
|||
int32_t code = qStringToSubplan(msg, &plan);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
||||
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
|
||||
ASSERT_EQ(code, 0);
|
||||
}
|
||||
#if 0
|
||||
|
|
|
@ -1,67 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_COMMON_EXPR_H_
|
||||
#define _TD_COMMON_EXPR_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "tmsg.h"
|
||||
#include "taosdef.h"
|
||||
#include "tskiplist.h"
|
||||
#include "function.h"
|
||||
|
||||
struct tExprNode;
|
||||
struct SSchema;
|
||||
|
||||
#define QUERY_COND_REL_PREFIX_IN "IN|"
|
||||
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
|
||||
#define QUERY_COND_REL_PREFIX_MATCH "MATCH|"
|
||||
#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|"
|
||||
|
||||
#define QUERY_COND_REL_PREFIX_IN_LEN 3
|
||||
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
|
||||
#define QUERY_COND_REL_PREFIX_MATCH_LEN 6
|
||||
#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7
|
||||
|
||||
typedef bool (*__result_filter_fn_t)(const void *, void *);
|
||||
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
|
||||
|
||||
/**
|
||||
* this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
|
||||
*/
|
||||
typedef struct tQueryInfo {
|
||||
uint8_t optr; // expression operator
|
||||
SSchema sch; // schema of tags
|
||||
char* q;
|
||||
__compar_fn_t compare; // filter function
|
||||
bool indexed; // indexed columns
|
||||
} tQueryInfo;
|
||||
|
||||
typedef struct SExprTraverseSupp {
|
||||
__result_filter_fn_t nodeFilterFn;
|
||||
__do_filter_suppl_fn_t setupInfoFn;
|
||||
void *pExtInfo;
|
||||
} SExprTraverseSupp;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_COMMON_EXPR_H_*/
|
|
@ -17,15 +17,7 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "texception.h"
|
||||
#include "taosdef.h"
|
||||
#include "tmsg.h"
|
||||
#include "tarray.h"
|
||||
#include "tbuffer.h"
|
||||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "texpr.h"
|
||||
#include "tvariant.h"
|
||||
#include "tdef.h"
|
||||
|
||||
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql);
|
||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
|
|
|
@ -308,10 +308,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
|||
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
char * sql = strndup(msg->msg, msg->sqlLen);
|
||||
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
|
||||
taosMemoryFreeClear(sql);
|
||||
|
||||
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
|
||||
|
||||
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -510,7 +510,7 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
SSubplan *plan = NULL;
|
||||
|
@ -537,7 +537,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
|||
|
||||
ctx->plan = plan;
|
||||
|
||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
|
@ -938,7 +938,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
|
|||
|
||||
ctx.plan = plan;
|
||||
|
||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
|
|
|
@ -1488,13 +1488,15 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bo
|
|||
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
|
||||
*job = pJob->refId;
|
||||
|
||||
if (!sync) {
|
||||
pJob->userCb = SCH_EXEC_CB;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||
|
||||
if (sync) {
|
||||
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||
tsem_wait(&pJob->rspSem);
|
||||
} else {
|
||||
pJob->userCb = SCH_EXEC_CB;
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
||||
|
|
|
@ -156,7 +156,7 @@ int32_t syncSetStandby(int64_t rid) {
|
|||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
terrno = TSDB_CODE_SYN_IS_LEADER;
|
||||
sError("failed to set standby since it is leader, rid:%" PRId64, rid);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -324,9 +324,9 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen);
|
||||
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
|
||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
tTrace("except, server continue send while cli ignore it");
|
||||
|
|
|
@ -285,13 +285,14 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
}
|
||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
|
||||
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
||||
} else {
|
||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
||||
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d code:0x%x", pConn,
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp);
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
} else {
|
||||
tDebug("server conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d code:0x%x", pConn,
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
|
||||
transMsg.code);
|
||||
// no ref here
|
||||
}
|
||||
|
||||
|
|
|
@ -412,7 +412,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Mismatched signature"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
|
||||
|
|
|
@ -4,6 +4,11 @@ system sh/deploy.sh -n dnode2 -i 2
|
|||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/deploy.sh -n dnode5 -i 5
|
||||
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode5 -c transPullupInterval -v 1
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
@ -128,9 +133,9 @@ if $rows != 1 then
|
|||
endi
|
||||
if $data(2)[4] == leader then
|
||||
$leaderExist = 1
|
||||
$leaderVnode = 4
|
||||
$follower1 = 2
|
||||
$follower2 = 3
|
||||
$leaderVnode = 2
|
||||
$follower1 = 3
|
||||
$follower2 = 4
|
||||
endi
|
||||
if $data(2)[6] == leader then
|
||||
$leaderExist = 1
|
||||
|
@ -140,9 +145,9 @@ if $data(2)[6] == leader then
|
|||
endi
|
||||
if $data(2)[8] == leader then
|
||||
$leaderExist = 1
|
||||
$leaderVnode = 2
|
||||
$follower1 = 3
|
||||
$follower2 = 4
|
||||
$leaderVnode = 4
|
||||
$follower1 = 2
|
||||
$follower2 = 3
|
||||
endi
|
||||
if $leaderExist != 1 then
|
||||
goto step3
|
||||
|
@ -160,110 +165,22 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
print =============== step32: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
=======
|
||||
print =============== step32: move leader
|
||||
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
return
|
||||
|
||||
print =============== step33: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
=======
|
||||
print =============== step33: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
print =============== step34: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
=======
|
||||
print =============== step34: move follower1
|
||||
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
|
||||
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $leaderVnode
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
print =============== step35: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
=======
|
||||
print =============== step35: move 5
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode $follower2
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
>>>>>>> origin/3.0
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
print =============== step36: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
=======
|
||||
print =============== step36: move follower1
|
||||
print redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $follower1 dnode $follower2 dnode 5
|
||||
>>>>>>> origin/3.0
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
|
@ -272,11 +189,40 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
print =============== step37: move follower1
|
||||
=======
|
||||
print =============== step34: move follower2
|
||||
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
|
||||
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $leaderVnode
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step35: move leader
|
||||
print redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $follower2 dnode 5 dnode $follower1
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step36: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== step37: move follower2
|
||||
>>>>>>> origin/3.0
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql show d1.vgroups
|
||||
|
@ -287,17 +233,9 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
print =============== step38: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
=======
|
||||
print =============== step38: move leader
|
||||
print redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $follower1 dnode 5 dnode $follower2
|
||||
>>>>>>> origin/3.0
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
|
@ -305,18 +243,10 @@ sql show d1.tables
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
<<<<<<< HEAD
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
print =============== step39: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
=======
|
||||
|
||||
print =============== step39: move 5
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode $follower1
|
||||
>>>>>>> origin/3.0
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py
|
|||
|
||||
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
python3 ./test.py -f 6-cluster/5dnode2mnode.py
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
|
||||
#python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 3d5aa76f8c718dcffa100b45e4cbf313d499c356
|
||||
Subproject commit 0a81480420d6601bbdb57770ee64e40f24c4ea83
|
Loading…
Reference in New Issue