diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index c641fbb1a3..70e76517c6 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -113,13 +113,13 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
-#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3
+#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
-#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7
-#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8
+#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
+#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
@@ -254,7 +254,7 @@ typedef struct {
int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns;
- SSchema pSchema[];
+ SSchema pSchemas[];
} SMCreateStbReq;
typedef struct {
@@ -265,8 +265,9 @@ typedef struct {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
- SSchema schema;
-} SMAlterStbReq;
+ int32_t numOfSchemas;
+ SSchema pSchemas[];
+} SMAltertbReq;
typedef struct {
int32_t pid;
@@ -1177,31 +1178,28 @@ typedef struct SVCreateTbReq {
SSchema* pSchema;
} ntbCfg;
};
-} SVCreateTbReq;
+} SVCreateTbReq, SVUpdateTbReq;
+
+typedef struct {
+} SVCreateTbRsp, SVUpdateTbRsp;
+
+int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
+void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
+int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp);
+void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp);
typedef struct {
uint64_t ver; // use a general definition
SArray* pArray;
} SVCreateTbBatchReq;
-int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
-void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
-int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
-void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
-
typedef struct {
- SMsgHead head;
-} SVCreateTbRsp;
+} SVCreateTbBatchRsp;
-typedef struct {
- SMsgHead head;
- char name[TSDB_TABLE_FNAME_LEN];
- int8_t ignoreNotExists;
-} SVAlterTbReq;
-
-typedef struct {
- SMsgHead head;
-} SVAlterTbRsp;
+int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
+void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
+int32_t tSerializeSVCreateTbBatchReqp(void** buf, SVCreateTbBatchReq* pRsp);
+void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pRsp);
typedef struct {
uint64_t ver;
diff --git a/include/nodes/nodes.h b/include/nodes/nodes.h
index ccb135aa0d..73082825ef 100644
--- a/include/nodes/nodes.h
+++ b/include/nodes/nodes.h
@@ -54,6 +54,9 @@ typedef enum ENodeType {
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
+ // only for parser
+ QUERY_NODE_TARGET_EXPR,
+
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT
@@ -78,11 +81,6 @@ typedef struct SNodeList {
SListCell* pTail;
} SNodeList;
-typedef struct SNameStr {
- int32_t len;
- char* pName;
-} SNameStr;
-
typedef struct SDataType {
uint8_t type;
uint8_t precision;
@@ -114,7 +112,7 @@ typedef struct SColumnNode {
} SColumnNode;
typedef struct SValueNode {
- SExprNode type; // QUERY_NODE_VALUE
+ SExprNode node; // QUERY_NODE_VALUE
char* literal;
} SValueNode;
@@ -146,7 +144,7 @@ typedef enum EOperatorType {
} EOperatorType;
typedef struct SOperatorNode {
- SExprNode type; // QUERY_NODE_OPERATOR
+ SExprNode node; // QUERY_NODE_OPERATOR
EOperatorType opType;
SNode* pLeft;
SNode* pRight;
@@ -332,6 +330,10 @@ void nodesCloneNode(const SNode* pNode);
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode);
+bool nodesIsArithmeticOp(const SOperatorNode* pOp);
+bool nodesIsComparisonOp(const SOperatorNode* pOp);
+bool nodesIsJsonOp(const SOperatorNode* pOp);
+
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index b5740b0118..b19215c79d 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -228,7 +228,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03A9)
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
-#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
+#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD)
// mnode-func
@@ -444,6 +444,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PARSER_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
#define TSDB_CODE_PARSER_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
#define TSDB_CODE_PARSER_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
+#define TSDB_CODE_PARSER_WRONG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x2604) //wrong value type
#ifdef __cplusplus
}
diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c
index 3fabd2ce0d..e45b61554c 100644
--- a/source/common/src/tmsg.c
+++ b/source/common/src/tmsg.c
@@ -293,7 +293,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
return buf;
}
-int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
+int tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
@@ -306,7 +306,7 @@ int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
return tlen;
}
-void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
+void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
buf = taosDecodeFixedU64(buf, &pReq->ver);
diff --git a/source/dnode/mgmt/impl/test/vnode/vnode.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp
index 11b32fbf0f..9451608653 100644
--- a/source/dnode/mgmt/impl/test/vnode/vnode.cpp
+++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp
@@ -220,15 +220,69 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
}
TEST_F(DndTestVnode, 04_ALTER_Stb) {
-#if 0
- {
- for (int i = 0; i < 3; ++i) {
- SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, pReq, contLen);
- ASSERT_NE(pRsp, nullptr);
- ASSERT_EQ(pRsp->code, 0);
+ for (int i = 0; i < 1; ++i) {
+ SVCreateTbReq req = {0};
+ req.ver = 0;
+ req.name = (char*)"stb1";
+ req.ttl = 0;
+ req.keep = 0;
+ req.type = TD_SUPER_TABLE;
+
+ SSchema schemas[5] = {0};
+ {
+ SSchema* pSchema = &schemas[0];
+ pSchema->bytes = htonl(8);
+ pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
+ strcpy(pSchema->name, "ts");
}
+
+ {
+ SSchema* pSchema = &schemas[1];
+ pSchema->bytes = htonl(4);
+ pSchema->type = TSDB_DATA_TYPE_INT;
+ strcpy(pSchema->name, "col1");
+ }
+
+ {
+ SSchema* pSchema = &schemas[2];
+ pSchema->bytes = htonl(2);
+ pSchema->type = TSDB_DATA_TYPE_TINYINT;
+ strcpy(pSchema->name, "_tag1");
+ }
+
+ {
+ SSchema* pSchema = &schemas[3];
+ pSchema->bytes = htonl(8);
+ pSchema->type = TSDB_DATA_TYPE_BIGINT;
+ strcpy(pSchema->name, "_tag2");
+ }
+
+ {
+ SSchema* pSchema = &schemas[4];
+ pSchema->bytes = htonl(16);
+ pSchema->type = TSDB_DATA_TYPE_BINARY;
+ strcpy(pSchema->name, "_tag3");
+ }
+
+ req.stbCfg.suid = 9527;
+ req.stbCfg.nCols = 2;
+ req.stbCfg.pSchema = &schemas[0];
+ req.stbCfg.nTagCols = 3;
+ req.stbCfg.pTagSchema = &schemas[2];
+
+ int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
+ SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
+
+ pHead->contLen = htonl(contLen);
+ pHead->vgId = htonl(2);
+
+ void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
+ tSerializeSVCreateTbReq(&pBuf, &req);
+
+ SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, (void*)pHead, contLen);
+ ASSERT_NE(pRsp, nullptr);
+ ASSERT_EQ(pRsp->code, 0);
}
-#endif
}
TEST_F(DndTestVnode, 05_DROP_Stb) {
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index e8a9a68466..c9228ae785 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -301,10 +301,12 @@ typedef struct {
uint64_t uid;
uint64_t dbUid;
int32_t version;
+ int32_t nextColId;
int32_t numOfColumns;
int32_t numOfTags;
+ SSchema* pTags;
+ SSchema* pColumns;
SRWLatch lock;
- SSchema* pSchema;
} SStbObj;
typedef struct {
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index 4ccd4b63c4..006abcd8e2 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -84,12 +84,20 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER)
+ SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
- int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
- for (int32_t i = 0; i < totalCols; ++i) {
- SSchema *pSchema = &pStb->pSchema[i];
+ for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
+ SSchema *pSchema = &pStb->pColumns[i];
+ SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
+ SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
+ SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
+ SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
+ }
+
+ for (int32_t i = 0; i < pStb->numOfTags; ++i) {
+ SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
@@ -137,17 +145,26 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER)
+ SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
- int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
- pStb->pSchema = calloc(totalCols, sizeof(SSchema));
- if (pStb->pSchema == NULL) {
+ pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
+ pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
+ if (pStb->pColumns == NULL || pStb->pTags == NULL) {
goto STB_DECODE_OVER;
}
- for (int32_t i = 0; i < totalCols; ++i) {
- SSchema *pSchema = &pStb->pSchema[i];
+ for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
+ SSchema *pSchema = &pStb->pColumns[i];
+ SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
+ SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
+ SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
+ SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
+ }
+
+ for (int32_t i = 0; i < pStb->numOfTags; ++i) {
+ SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
@@ -176,6 +193,8 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
+ tfree(pStb->pColumns);
+ tfree(pStb->pTags);
return 0;
}
@@ -183,13 +202,24 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock);
- int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
- int32_t totalSize = totalCols * sizeof(SSchema);
- if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
- void *pSchema = malloc(totalSize);
- if (pSchema != NULL) {
- free(pOld->pSchema);
- pOld->pSchema = pSchema;
+
+ if (pOld->numOfColumns < pNew->numOfColumns) {
+ void *pColumns = malloc(pNew->numOfColumns * sizeof(SSchema));
+ if (pColumns != NULL) {
+ free(pOld->pColumns);
+ pOld->pColumns = pColumns;
+ } else {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
+ taosWUnLockLatch(&pOld->lock);
+ }
+ }
+
+ if (pOld->numOfTags < pNew->numOfTags) {
+ void *pTags = malloc(pNew->numOfTags * sizeof(SSchema));
+ if (pTags != NULL) {
+ free(pOld->pTags);
+ pOld->pTags = pTags;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
@@ -199,9 +229,11 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
+ pOld->nextColId = pNew->nextColId;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
- memcpy(pOld->pSchema, pNew->pSchema, totalSize);
+ memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
+ memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
taosWUnLockLatch(&pOld->lock);
return 0;
}
@@ -242,9 +274,9 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
- req.stbCfg.pSchema = pStb->pSchema;
+ req.stbCfg.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags;
- req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
+ req.stbCfg.pTagSchema = pStb->pTags;
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
@@ -295,7 +327,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
- SSchema *pSchema = &pCreate->pSchema[i];
+ SSchema *pSchema = &pCreate->pSchemas[i];
pSchema->bytes = htonl(pSchema->bytes);
}
@@ -316,7 +348,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS);
for (int32_t i = 0; i < totalCols; ++i) {
- SSchema *pSchema = &pCreate->pSchema[i];
+ SSchema *pSchema = &pCreate->pSchemas[i];
if (pSchema->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
@@ -435,27 +467,35 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
- tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
- tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
+ memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
+ memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
stbObj.createdTime = taosGetTimestampMs();
stbObj.updateTime = stbObj.createdTime;
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
stbObj.dbUid = pDb->uid;
stbObj.version = 1;
+ stbObj.nextColId = 1;
stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags;
- int32_t totalCols = stbObj.numOfColumns + stbObj.numOfTags;
- int32_t totalSize = totalCols * sizeof(SSchema);
- stbObj.pSchema = malloc(totalSize);
- if (stbObj.pSchema == NULL) {
+ stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
+ stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
+ if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
- memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
- for (int32_t i = 0; i < totalCols; ++i) {
- stbObj.pSchema[i].colId = i + 1;
+ memcpy(stbObj.pColumns, pCreate->pSchemas, stbObj.numOfColumns * sizeof(SSchema));
+ memcpy(stbObj.pTags, pCreate->pSchemas + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
+
+ for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
+ stbObj.pColumns[i].colId = stbObj.nextColId;
+ stbObj.nextColId++;
+ }
+
+ for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
+ stbObj.pTags[i].colId = stbObj.nextColId;
+ stbObj.nextColId++;
}
int32_t code = -1;
@@ -537,36 +577,396 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
return 0;
}
-static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
- SSchema *pSchema = &pAlter->schema;
- pSchema->colId = htonl(pSchema->colId);
- pSchema->bytes = htonl(pSchema->bytes);
+static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
+ pAlter->numOfSchemas = htonl(pAlter->numOfSchemas);
- if (pSchema->type <= 0) {
- terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
- return -1;
- }
- if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
- terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
- return -1;
- }
- if (pSchema->bytes <= 0) {
- terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
- return -1;
- }
- if (pSchema->name[0] == 0) {
- terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
- return -1;
+ for (int32_t i = 0; i < pAlter->numOfSchemas; ++i) {
+ SSchema *pSchema = &pAlter->pSchemas[i];
+ pSchema->colId = htonl(pSchema->colId);
+ pSchema->bytes = htonl(pSchema->bytes);
+
+ if (pSchema->type <= 0) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+ if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+ if (pSchema->bytes <= 0) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+ if (pSchema->name[0] == 0) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
}
return 0;
}
-static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
+static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
+ for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
+ if (strcasecmp(pStb->pTags[tag].name, tagName) == 0) {
+ return tag;
+ }
+ }
+
+ return -1;
+}
+
+static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
+ for (int32_t col = 0; col < pStb->numOfColumns; col++) {
+ if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
+ return col;
+ }
+ }
+
+ return -1;
+}
+
+static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
+ pNew->pTags = calloc(pNew->numOfTags, sizeof(SSchema));
+ pNew->pColumns = calloc(pNew->numOfColumns, sizeof(SSchema));
+ if (pNew->pTags == NULL || pNew->pColumns == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
+ memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
+ return 0;
+}
+
+static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ntags) {
+ if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
+ terrno = TSDB_CODE_MND_TOO_MANY_TAGS;
+ return -1;
+ }
+
+ if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
+ terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
+ return -1;
+ }
+
+ for (int32_t i = 0; i < ntags; i++) {
+ if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
+ terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
+ return -1;
+ }
+
+ if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
+ terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
+ return -1;
+ }
+ }
+
+ pNew->numOfTags = pNew->numOfTags + ntags;
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ memcpy(pNew->pTags + pOld->numOfTags, pSchemas, sizeof(SSchema) * ntags);
+
+ for (int32_t i = pOld->numOfTags; i < pNew->numOfTags; i++) {
+ SSchema *pSchema = &pNew->pTags[i];
+ pSchema->colId = pNew->nextColId;
+ pNew->nextColId++;
+ }
+
+ pNew->version++;
+ mDebug("stb:%s, start to add tag %s", pNew->name, pSchemas[0].name);
+ return 0;
+}
+
+static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
+ int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
+ if (tag < 0) {
+ terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
+ return -1;
+ }
+
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
+
+ pNew->version++;
+ mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
+ return 0;
+}
+
+static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, const char *newTagName) {
+ int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
+ if (tag < 0) {
+ terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
+ return -1;
+ }
+
+ if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
+ terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
+ return -1;
+ }
+
+ int32_t len = (int32_t)strlen(newTagName);
+ if (len >= TSDB_COL_NAME_LEN) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
+ memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
+
+ pNew->version++;
+ mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
+ return 0;
+}
+
+static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
+ int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
+ if (tag < 0) {
+ terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
+ return -1;
+ }
+
+ if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ SSchema *pTag = pNew->pTags + tag;
+ if (pSchema->bytes <= pTag->bytes) {
+ terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
+ return -1;
+ }
+
+ pTag->bytes = pSchema->bytes;
+ pNew->version++;
+
+ mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
+ return 0;
+}
+
+static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ncols) {
+ if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
+ terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
+ return -1;
+ }
+
+ for (int32_t i = 0; i < ncols; i++) {
+ if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
+ terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
+ return -1;
+ }
+
+ if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
+ terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
+ return -1;
+ }
+ }
+
+ pNew->numOfColumns = pNew->numOfColumns + ncols;
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ memcpy(pNew->pColumns + pOld->numOfColumns, pSchemas, sizeof(SSchema) * ncols);
+
+ for (int32_t i = pOld->numOfColumns; i < pNew->numOfColumns; i++) {
+ SSchema *pSchema = &pNew->pColumns[i];
+ pSchema->colId = pNew->nextColId;
+ pNew->nextColId++;
+ }
+
+ pNew->version++;
+ mDebug("stb:%s, start to add column %s", pNew->name, pSchemas[0].name);
+ return 0;
+}
+
+static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
+ int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
+ if (col <= 0) {
+ terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
+ return -1;
+ }
+
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * (pNew->numOfColumns - col - 1));
+
+ pNew->version++;
+ mDebug("stb:%s, start to drop col %s", pNew->name, colName);
+ return 0;
+}
+
+static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
+ int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
+ if (col < 0) {
+ terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
+ return -1;
+ }
+
+ if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ return -1;
+ }
+
+ uint32_t nLen = 0;
+ for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
+ nLen += (pOld->pColumns[i].colId == col) ? pSchema->bytes : pOld->pColumns[i].bytes;
+ }
+
+ if (nLen > TSDB_MAX_BYTES_PER_ROW) {
+ terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
+ return -1;
+ }
+
+ if (mndAllocStbSchemas(pOld, pNew) != 0) {
+ return -1;
+ }
+
+ SSchema *pCol = pNew->pColumns + col;
+ if (pSchema->bytes <= pCol->bytes) {
+ terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
+ return -1;
+ }
+
+ pCol->bytes = pSchema->bytes;
+ pNew->version++;
+
+ mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
+ return 0;
+}
+
+static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
+ SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
+ if (pRedoRaw == NULL) return -1;
+ if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
+ if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
+
+ return 0;
+}
+
+static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
+ SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
+ if (pCommitRaw == NULL) return -1;
+ if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
+ if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
+
+ return 0;
+}
+
+static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
+ SSdb *pSdb = pMnode->pSdb;
+ SVgObj *pVgroup = NULL;
+ void *pIter = NULL;
+ int32_t contLen;
+
+ while (1) {
+ pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
+ if (pIter == NULL) break;
+ if (pVgroup->dbUid != pDb->uid) continue;
+
+ void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
+ if (pReq == NULL) {
+ sdbCancelFetch(pSdb, pIter);
+ sdbRelease(pSdb, pVgroup);
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ STransAction action = {0};
+ action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
+ action.pCont = pReq;
+ action.contLen = contLen;
+ action.msgType = TDMT_VND_ALTER_STB;
+ if (mndTransAppendRedoAction(pTrans, &action) != 0) {
+ free(pReq);
+ sdbCancelFetch(pSdb, pIter);
+ sdbRelease(pSdb, pVgroup);
+ return -1;
+ }
+ sdbRelease(pSdb, pVgroup);
+ }
+
+ return 0;
+}
+
+static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
+ SStbObj stbObj = {0};
+ taosRLockLatch(&pOld->lock);
+ memcpy(&stbObj, pOld, sizeof(SStbObj));
+ stbObj.pColumns = NULL;
+ stbObj.pTags = NULL;
+ stbObj.updateTime = taosGetTimestampMs();
+ taosRUnLockLatch(&pOld->lock);
+
+ int32_t code = -1;
+
+ switch (pAlter->alterType) {
+ case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
+ code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchemas, 1);
+ break;
+ case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
+ code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchemas[0].name);
+ break;
+ case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
+ code = mndAlterStbTagName(pOld, &stbObj, pAlter->pSchemas[0].name, pAlter->pSchemas[1].name);
+ break;
+ case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
+ code = mndAlterStbTagBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
+ break;
+ case TSDB_ALTER_TABLE_ADD_COLUMN:
+ code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchemas, 1);
+ break;
+ case TSDB_ALTER_TABLE_DROP_COLUMN:
+ code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchemas[0].name);
+ break;
+ case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
+ code = mndAlterStbColumnBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
+ break;
+ default:
+ terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
+ break;
+ }
+
+ if (code != 0) goto ALTER_STB_OVER;
+
+ code = -1;
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
+ if (pTrans == NULL) goto ALTER_STB_OVER;
+
+ mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
+
+ if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
+ if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
+ if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) goto ALTER_STB_OVER;
+
+ code = 0;
+
+ALTER_STB_OVER:
+ mndTransDrop(pTrans);
+ tfree(stbObj.pTags);
+ tfree(stbObj.pColumns);
+ return code;
+}
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
- SMnode *pMnode = pReq->pMnode;
- SMAlterStbReq *pAlter = pReq->rpcMsg.pCont;
+ SMnode *pMnode = pReq->pMnode;
+ SMAltertbReq *pAlter = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
@@ -582,10 +982,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
return -1;
}
- SStbObj stbObj = {0};
- memcpy(&stbObj, pStb, sizeof(SStbObj));
+ SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name);
+ if (pDb == NULL) {
+ mndReleaseStb(pMnode, pStb);
+ terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
+ mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
+ return -1;
+ }
- int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
+ int32_t code = mndAlterStb(pMnode, pReq, pAlter, pDb, pStb);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
@@ -769,14 +1174,24 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
- for (int32_t i = 0; i < totalCols; ++i) {
+ for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
- SSchema *pSrcSchema = &pStb->pSchema[i];
+ SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
+
+ for (int32_t i = 0; i < pStb->numOfTags; ++i) {
+ SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
+ SSchema *pSrcSchema = &pStb->pTags[i];
+ memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
+ pSchema->type = pSrcSchema->type;
+ pSchema->colId = htonl(pSrcSchema->colId);
+ pSchema->bytes = htonl(pSrcSchema->bytes);
+ }
+
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
@@ -789,11 +1204,11 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
- SSdb *pSdb = pMnode->pSdb;
- int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
- void *buf = malloc(bufSize);
- int32_t len = 0;
- int32_t contLen = 0;
+ SSdb *pSdb = pMnode->pSdb;
+ int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
+ void *buf = malloc(bufSize);
+ int32_t len = 0;
+ int32_t contLen = 0;
STableMetaRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
@@ -803,7 +1218,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
stb->tversion = ntohs(stb->tversion);
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
- bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
+ bufSize = contLen + (num - i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
@@ -812,9 +1227,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
strcpy(pRsp->dbFName, stb->dbFName);
strcpy(pRsp->tbName, stb->stbName);
strcpy(pRsp->stbName, stb->stbName);
-
+
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
-
+
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
if (pDb == NULL) {
pRsp->numOfColumns = -1;
@@ -826,7 +1241,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
-
+
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
@@ -836,7 +1251,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
continue;
}
-
+
taosRLockLatch(&pStb->lock);
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
@@ -845,17 +1260,17 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mndReleaseStb(pMnode, pStb);
continue;
}
-
+
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t len = totalCols * sizeof(SSchema);
-
+
contLen += sizeof(STableMetaRsp) + len;
-
+
if (contLen > bufSize) {
- bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
+ bufSize = contLen + (num - i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
-
+
pRsp->numOfTags = htonl(pStb->numOfTags);
pRsp->numOfColumns = htonl(pStb->numOfColumns);
pRsp->precision = pDb->cfg.precision;
@@ -864,15 +1279,25 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
pRsp->sversion = htonl(pStb->version);
pRsp->suid = htobe64(pStb->uid);
pRsp->tuid = htobe64(pStb->uid);
-
- for (int32_t i = 0; i < totalCols; ++i) {
+
+ for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
- SSchema *pSrcSchema = &pStb->pSchema[i];
+ SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
+
+ for (int32_t i = 0; i < pStb->numOfTags; ++i) {
+ SSchema *pSchema = &pRsp->pSchema[i + pStb->numOfColumns];
+ SSchema *pSrcSchema = &pStb->pTags[i];
+ memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
+ pSchema->type = pSrcSchema->type;
+ pSchema->colId = htonl(pSrcSchema->colId);
+ pSchema->bytes = htonl(pSrcSchema->bytes);
+ }
+
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
@@ -890,7 +1315,6 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
return 0;
}
-
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
SSdb *pSdb = pMnode->pSdb;
diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp
index a0e2460334..ed0beb50a4 100644
--- a/source/dnode/mnode/impl/test/stb/stb.cpp
+++ b/source/dnode/mnode/impl/test/stb/stb.cpp
@@ -21,213 +21,289 @@ class MndTestStb : public ::testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
+
+ SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
+ SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
+ SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen);
};
Testbase MndTestStb::test;
-TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
- {
- int32_t contLen = sizeof(SCreateDbReq);
+SCreateDbReq* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
+ int32_t contLen = sizeof(SCreateDbReq);
- SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
- strcpy(pReq->db, "1.d1");
- pReq->numOfVgroups = htonl(2);
- pReq->cacheBlockSize = htonl(16);
- pReq->totalBlocks = htonl(10);
- pReq->daysPerFile = htonl(10);
- pReq->daysToKeep0 = htonl(3650);
- pReq->daysToKeep1 = htonl(3650);
- pReq->daysToKeep2 = htonl(3650);
- pReq->minRows = htonl(100);
- pReq->maxRows = htonl(4096);
- pReq->commitTime = htonl(3600);
- pReq->fsyncPeriod = htonl(3000);
- pReq->walLevel = 1;
- pReq->precision = 0;
- pReq->compression = 2;
- pReq->replications = 1;
- pReq->quorum = 1;
- pReq->update = 0;
- pReq->cacheLastRow = 0;
- pReq->ignoreExist = 1;
+ SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
+ strcpy(pReq->db, dbname);
+ pReq->numOfVgroups = htonl(2);
+ pReq->cacheBlockSize = htonl(16);
+ pReq->totalBlocks = htonl(10);
+ pReq->daysPerFile = htonl(10);
+ pReq->daysToKeep0 = htonl(3650);
+ pReq->daysToKeep1 = htonl(3650);
+ pReq->daysToKeep2 = htonl(3650);
+ pReq->minRows = htonl(100);
+ pReq->maxRows = htonl(4096);
+ pReq->commitTime = htonl(3600);
+ pReq->fsyncPeriod = htonl(3000);
+ pReq->walLevel = 1;
+ pReq->precision = 0;
+ pReq->compression = 2;
+ pReq->replications = 1;
+ pReq->quorum = 1;
+ pReq->update = 0;
+ pReq->cacheLastRow = 0;
+ pReq->ignoreExist = 1;
- SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
- ASSERT_NE(pRsp, nullptr);
- ASSERT_EQ(pRsp->code, 0);
- }
-
- {
- int32_t cols = 2;
- int32_t tags = 3;
- int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
-
- SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
- strcpy(pReq->name, "1.d1.stb");
- pReq->numOfTags = htonl(tags);
- pReq->numOfColumns = htonl(cols);
-
- {
- SSchema* pSchema = &pReq->pSchema[0];
- pSchema->bytes = htonl(8);
- pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
- strcpy(pSchema->name, "ts");
- }
-
- {
- SSchema* pSchema = &pReq->pSchema[1];
- pSchema->bytes = htonl(4);
- pSchema->type = TSDB_DATA_TYPE_INT;
- strcpy(pSchema->name, "col1");
- }
-
- {
- SSchema* pSchema = &pReq->pSchema[2];
- pSchema->bytes = htonl(2);
- pSchema->type = TSDB_DATA_TYPE_TINYINT;
- strcpy(pSchema->name, "tag1");
- }
-
- {
- SSchema* pSchema = &pReq->pSchema[3];
- pSchema->bytes = htonl(8);
- pSchema->type = TSDB_DATA_TYPE_BIGINT;
- strcpy(pSchema->name, "tag2");
- }
-
- {
- SSchema* pSchema = &pReq->pSchema[4];
- pSchema->bytes = htonl(16);
- pSchema->type = TSDB_DATA_TYPE_BINARY;
- strcpy(pSchema->name, "tag3");
- }
-
- SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
- ASSERT_NE(pRsp, nullptr);
- ASSERT_EQ(pRsp->code, 0);
- }
-
- test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
- CHECK_META("show stables", 4);
-
- CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
- CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
- CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
- CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
-
- test.SendShowRetrieveReq();
- EXPECT_EQ(test.GetShowRows(), 1);
- CheckBinary("stb", TSDB_TABLE_NAME_LEN);
- CheckTimestamp();
- CheckInt32(2);
- CheckInt32(3);
-
- // ----- meta ------
- {
- int32_t contLen = sizeof(STableInfoReq);
-
- STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
- strcpy(pReq->dbFName, "1.d1");
- strcpy(pReq->tbName, "stb");
-
- SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
- ASSERT_NE(pMsg, nullptr);
- ASSERT_EQ(pMsg->code, 0);
-
- STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
- pRsp->numOfTags = htonl(pRsp->numOfTags);
- pRsp->numOfColumns = htonl(pRsp->numOfColumns);
- pRsp->sversion = htonl(pRsp->sversion);
- pRsp->tversion = htonl(pRsp->tversion);
- pRsp->suid = be64toh(pRsp->suid);
- pRsp->tuid = be64toh(pRsp->tuid);
- pRsp->vgId = be64toh(pRsp->vgId);
- for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
- SSchema* pSchema = &pRsp->pSchema[i];
- pSchema->colId = htonl(pSchema->colId);
- pSchema->bytes = htonl(pSchema->bytes);
- }
-
- EXPECT_STREQ(pRsp->dbFName, "1.d1");
- EXPECT_STREQ(pRsp->tbName, "stb");
- EXPECT_STREQ(pRsp->stbName, "stb");
- EXPECT_EQ(pRsp->numOfColumns, 2);
- EXPECT_EQ(pRsp->numOfTags, 3);
- EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
- EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
- EXPECT_EQ(pRsp->update, 0);
- EXPECT_EQ(pRsp->sversion, 1);
- EXPECT_EQ(pRsp->tversion, 0);
- EXPECT_GT(pRsp->suid, 0);
- EXPECT_GT(pRsp->tuid, 0);
- EXPECT_EQ(pRsp->vgId, 0);
-
- {
- SSchema* pSchema = &pRsp->pSchema[0];
- EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
- EXPECT_EQ(pSchema->colId, 1);
- EXPECT_EQ(pSchema->bytes, 8);
- EXPECT_STREQ(pSchema->name, "ts");
- }
-
- {
- SSchema* pSchema = &pRsp->pSchema[1];
- EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
- EXPECT_EQ(pSchema->colId, 2);
- EXPECT_EQ(pSchema->bytes, 4);
- EXPECT_STREQ(pSchema->name, "col1");
- }
-
- {
- SSchema* pSchema = &pRsp->pSchema[2];
- EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
- EXPECT_EQ(pSchema->colId, 3);
- EXPECT_EQ(pSchema->bytes, 2);
- EXPECT_STREQ(pSchema->name, "tag1");
- }
-
- {
- SSchema* pSchema = &pRsp->pSchema[3];
- EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
- EXPECT_EQ(pSchema->colId, 4);
- EXPECT_EQ(pSchema->bytes, 8);
- EXPECT_STREQ(pSchema->name, "tag2");
- }
-
- {
- SSchema* pSchema = &pRsp->pSchema[4];
- EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
- EXPECT_EQ(pSchema->colId, 5);
- EXPECT_EQ(pSchema->bytes, 16);
- EXPECT_STREQ(pSchema->name, "tag3");
- }
- }
-
- // restart
- test.Restart();
-
- test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
- CHECK_META("show stables", 4);
- test.SendShowRetrieveReq();
- EXPECT_EQ(test.GetShowRows(), 1);
-
- CheckBinary("stb", TSDB_TABLE_NAME_LEN);
- CheckTimestamp();
- CheckInt32(2);
- CheckInt32(3);
-
- {
- int32_t contLen = sizeof(SMDropStbReq);
-
- SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
- strcpy(pReq->name, "1.d1.stb");
-
- SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
- ASSERT_NE(pRsp, nullptr);
- ASSERT_EQ(pRsp->code, 0);
- }
-
- test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
- CHECK_META("show stables", 4);
- test.SendShowRetrieveReq();
- EXPECT_EQ(test.GetShowRows(), 0);
+ *pContLen = contLen;
+ return pReq;
+}
+
+SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char *stbname, int32_t* pContLen) {
+ int32_t cols = 2;
+ int32_t tags = 3;
+ int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
+
+ SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
+ strcpy(pReq->name, stbname);
+ pReq->numOfTags = htonl(tags);
+ pReq->numOfColumns = htonl(cols);
+
+ {
+ SSchema* pSchema = &pReq->pSchemas[0];
+ pSchema->bytes = htonl(8);
+ pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
+ strcpy(pSchema->name, "ts");
+ }
+
+ {
+ SSchema* pSchema = &pReq->pSchemas[1];
+ pSchema->bytes = htonl(4);
+ pSchema->type = TSDB_DATA_TYPE_INT;
+ strcpy(pSchema->name, "col1");
+ }
+
+ {
+ SSchema* pSchema = &pReq->pSchemas[2];
+ pSchema->bytes = htonl(2);
+ pSchema->type = TSDB_DATA_TYPE_TINYINT;
+ strcpy(pSchema->name, "tag1");
+ }
+
+ {
+ SSchema* pSchema = &pReq->pSchemas[3];
+ pSchema->bytes = htonl(8);
+ pSchema->type = TSDB_DATA_TYPE_BIGINT;
+ strcpy(pSchema->name, "tag2");
+ }
+
+ {
+ SSchema* pSchema = &pReq->pSchemas[4];
+ pSchema->bytes = htonl(16);
+ pSchema->type = TSDB_DATA_TYPE_BINARY;
+ strcpy(pSchema->name, "tag3");
+ }
+
+ *pContLen = contLen;
+ return pReq;
+}
+
+// TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
+// const char *dbname = "1.d1";
+// const char *stbname = "1.d1.stb";
+
+// {
+// int32_t contLen = 0;
+// SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
+// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
+// ASSERT_NE(pRsp, nullptr);
+// ASSERT_EQ(pRsp->code, 0);
+// }
+
+// {
+// int32_t contLen = 0;
+// SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
+// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
+// ASSERT_NE(pRsp, nullptr);
+// ASSERT_EQ(pRsp->code, 0);
+// }
+
+// {
+// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
+// CHECK_META("show stables", 4);
+// CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
+// CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
+// CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
+// CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
+
+// test.SendShowRetrieveReq();
+// EXPECT_EQ(test.GetShowRows(), 1);
+// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
+// CheckTimestamp();
+// CheckInt32(2);
+// CheckInt32(3);
+// }
+
+// // ----- meta ------
+// {
+// int32_t contLen = sizeof(STableInfoReq);
+// STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
+// strcpy(pReq->dbFName, dbname);
+// strcpy(pReq->tbName, "stb");
+
+// SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
+// ASSERT_NE(pMsg, nullptr);
+// ASSERT_EQ(pMsg->code, 0);
+
+// STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
+// pRsp->numOfTags = htonl(pRsp->numOfTags);
+// pRsp->numOfColumns = htonl(pRsp->numOfColumns);
+// pRsp->sversion = htonl(pRsp->sversion);
+// pRsp->tversion = htonl(pRsp->tversion);
+// pRsp->suid = be64toh(pRsp->suid);
+// pRsp->tuid = be64toh(pRsp->tuid);
+// pRsp->vgId = be64toh(pRsp->vgId);
+// for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
+// SSchema* pSchema = &pRsp->pSchema[i];
+// pSchema->colId = htonl(pSchema->colId);
+// pSchema->bytes = htonl(pSchema->bytes);
+// }
+
+// EXPECT_STREQ(pRsp->dbFName, dbname);
+// EXPECT_STREQ(pRsp->tbName, "stb");
+// EXPECT_STREQ(pRsp->stbName, "stb");
+// EXPECT_EQ(pRsp->numOfColumns, 2);
+// EXPECT_EQ(pRsp->numOfTags, 3);
+// EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
+// EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
+// EXPECT_EQ(pRsp->update, 0);
+// EXPECT_EQ(pRsp->sversion, 1);
+// EXPECT_EQ(pRsp->tversion, 0);
+// EXPECT_GT(pRsp->suid, 0);
+// EXPECT_GT(pRsp->tuid, 0);
+// EXPECT_EQ(pRsp->vgId, 0);
+
+// {
+// SSchema* pSchema = &pRsp->pSchema[0];
+// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
+// EXPECT_EQ(pSchema->colId, 1);
+// EXPECT_EQ(pSchema->bytes, 8);
+// EXPECT_STREQ(pSchema->name, "ts");
+// }
+
+// {
+// SSchema* pSchema = &pRsp->pSchema[1];
+// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
+// EXPECT_EQ(pSchema->colId, 2);
+// EXPECT_EQ(pSchema->bytes, 4);
+// EXPECT_STREQ(pSchema->name, "col1");
+// }
+
+// {
+// SSchema* pSchema = &pRsp->pSchema[2];
+// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
+// EXPECT_EQ(pSchema->colId, 3);
+// EXPECT_EQ(pSchema->bytes, 2);
+// EXPECT_STREQ(pSchema->name, "tag1");
+// }
+
+// {
+// SSchema* pSchema = &pRsp->pSchema[3];
+// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
+// EXPECT_EQ(pSchema->colId, 4);
+// EXPECT_EQ(pSchema->bytes, 8);
+// EXPECT_STREQ(pSchema->name, "tag2");
+// }
+
+// {
+// SSchema* pSchema = &pRsp->pSchema[4];
+// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
+// EXPECT_EQ(pSchema->colId, 5);
+// EXPECT_EQ(pSchema->bytes, 16);
+// EXPECT_STREQ(pSchema->name, "tag3");
+// }
+// }
+
+// // restart
+// test.Restart();
+
+// {
+// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
+// CHECK_META("show stables", 4);
+// test.SendShowRetrieveReq();
+// EXPECT_EQ(test.GetShowRows(), 1);
+
+// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
+// CheckTimestamp();
+// CheckInt32(2);
+// CheckInt32(3);
+// }
+
+// {
+// int32_t contLen = sizeof(SMDropStbReq);
+
+// SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
+// strcpy(pReq->name, stbname);
+
+// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
+// ASSERT_NE(pRsp, nullptr);
+// ASSERT_EQ(pRsp->code, 0);
+// }
+
+// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
+// CHECK_META("show stables", 4);
+// test.SendShowRetrieveReq();
+// EXPECT_EQ(test.GetShowRows(), 0);
+// }
+
+SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen) {
+ int32_t contLen = sizeof(SMAltertbReq) + sizeof(SSchema);
+ SMAltertbReq* pReq = (SMAltertbReq*)rpcMallocCont(contLen);
+ strcpy(pReq->name, stbname);
+ pReq->numOfSchemas = htonl(1);
+ pReq->alterType = TSDB_ALTER_TABLE_ADD_TAG_COLUMN;
+
+ SSchema* pSchema = &pReq->pSchemas[0];
+ pSchema->bytes = htonl(4);
+ pSchema->type = TSDB_DATA_TYPE_INT;
+ strcpy(pSchema->name, "tag4");
+
+ *pContLen = contLen;
+ return pReq;
+}
+
+TEST_F(MndTestStb, 01_Alter_Stb) {
+ const char *dbname = "1.d2";
+ const char *stbname = "1.d2.stb";
+
+ {
+ int32_t contLen = 0;
+ SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
+ SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
+ ASSERT_NE(pRsp, nullptr);
+ ASSERT_EQ(pRsp->code, 0);
+ }
+
+ {
+ int32_t contLen = 0;
+ SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
+ SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
+ ASSERT_NE(pRsp, nullptr);
+ ASSERT_EQ(pRsp->code, 0);
+ }
+
+ {
+ int32_t contLen = 0;
+ SMAltertbReq* pReq = BuildAlterStbAddTagReq(stbname, &contLen);
+ SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
+ ASSERT_NE(pRsp, nullptr);
+ ASSERT_EQ(pRsp->code, 0);
+
+ test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
+ test.SendShowRetrieveReq();
+ EXPECT_EQ(test.GetShowRows(), 1);
+ CheckBinary("stb", TSDB_TABLE_NAME_LEN);
+ CheckTimestamp();
+ CheckInt32(2);
+ CheckInt32(4);
+ }
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c
index 6064865bc6..441ff7230a 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCommit.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c
@@ -1330,7 +1330,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit;
- int16_t colId = 0;
+ int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID;
SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock;
diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
index 3dcbb7888b..24c71fdc7e 100644
--- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
@@ -472,7 +472,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
continue;
}
- int16_t tcolId = 0;
+ int16_t tcolId = PRIMARYKEY_TIMESTAMP_COL_ID;
uint32_t toffset = TSDB_KEY_COL_OFFSET;
int32_t tlen = pBlock->keyLen;
diff --git a/source/dnode/vnode/src/vnd/vnodeBufferPool.c b/source/dnode/vnode/src/vnd/vnodeBufferPool.c
index 434498eef5..f7a72353eb 100644
--- a/source/dnode/vnode/src/vnd/vnodeBufferPool.c
+++ b/source/dnode/vnode/src/vnd/vnodeBufferPool.c
@@ -185,6 +185,7 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
free(pMA);
if (--pVMA->_ref.val == 0) {
TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
+ vmaReset(pVMA);
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
}
}
\ No newline at end of file
diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c
index 40cb02176b..28487821e6 100644
--- a/source/dnode/vnode/src/vnd/vnodeWrite.c
+++ b/source/dnode/vnode/src/vnd/vnodeWrite.c
@@ -83,7 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
free(vCreateTbReq.name);
break;
case TDMT_VND_CREATE_TABLE:
- tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
+ tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
@@ -106,7 +106,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
break;
case TDMT_VND_ALTER_STB:
- vTrace("vgId:%d, process drop stb req", pVnode->vgId);
+ vTrace("vgId:%d, process alter stb req", pVnode->vgId);
+ tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
+ free(vCreateTbReq.stbCfg.pSchema);
+ free(vCreateTbReq.stbCfg.pTagSchema);
+ free(vCreateTbReq.name);
break;
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
diff --git a/source/libs/parser/inc/astCreateContext.h b/source/libs/parser/inc/astCreateContext.h
index 5458500a82..a0bac9ea7b 100644
--- a/source/libs/parser/inc/astCreateContext.h
+++ b/source/libs/parser/inc/astCreateContext.h
@@ -28,7 +28,6 @@ typedef struct SAstCreateContext {
bool notSupport;
bool valid;
SNode* pRootNode;
- SHashObj* pResourceHash;
} SAstCreateContext;
int32_t createAstCreateContext(SParseContext* pQueryCxt, SAstCreateContext* pCxt);
diff --git a/source/libs/parser/inc/astCreateFuncs.h b/source/libs/parser/inc/astCreateFuncs.h
index 15f0792d5c..7cd7e1932d 100644
--- a/source/libs/parser/inc/astCreateFuncs.h
+++ b/source/libs/parser/inc/astCreateFuncs.h
@@ -13,11 +13,6 @@
* along with this program. If not, see .
*/
-#include "nodes.h"
-#include "nodesShowStmts.h"
-#include "astCreateContext.h"
-#include "ttoken.h"
-
#ifndef _TD_AST_CREATE_FUNCS_H_
#define _TD_AST_CREATE_FUNCS_H_
@@ -25,15 +20,26 @@
extern "C" {
#endif
+#include "nodes.h"
+#include "nodesShowStmts.h"
+#include "astCreateContext.h"
+#include "ttoken.h"
+
extern SToken nil_token;
+typedef struct STargetExprNode {
+ ENodeType nodeType;
+ char* p;
+ uint32_t n;
+ SNode* pNode;
+} STargetExprNode;
+
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode);
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
-SNode* addMinusSign(SAstCreateContext* pCxt, SNode* pNode);
SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias);
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2);
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight);
diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y
index 410e4fb187..6d3e9e729f 100644
--- a/source/libs/parser/inc/sql.y
+++ b/source/libs/parser/inc/sql.y
@@ -813,7 +813,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
@@ -842,7 +842,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type);
A = tListItemAppendToken(A, &Z, -1);
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
@@ -859,7 +859,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
@@ -882,7 +882,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
@@ -911,7 +911,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
toTSDBType(Z.type);
A = tListItemAppendToken(A, &Z, -1);
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
@@ -928,7 +928,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
diff --git a/source/libs/parser/src/astCreateFuncs.c b/source/libs/parser/src/astCreateFuncs.c
index 6091961ed5..e8b8b42f74 100644
--- a/source/libs/parser/src/astCreateFuncs.c
+++ b/source/libs/parser/src/astCreateFuncs.c
@@ -76,7 +76,10 @@ SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, cons
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral) {
SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
CHECK_OUT_OF_MEM(val);
- // todo
+ val->literal = strndup(pLiteral->z, pLiteral->n);
+ CHECK_OUT_OF_MEM(val->literal);
+ val->node.resType.type = dataType;
+ val->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
return (SNode*)val;
}
@@ -87,10 +90,6 @@ SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral)
return (SNode*)val;
}
-SNode* addMinusSign(SAstCreateContext* pCxt, SNode* pNode) {
- // todo
-}
-
SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2) {
SLogicConditionNode* cond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
CHECK_OUT_OF_MEM(cond);
diff --git a/source/libs/parser/src/astGenerator.c b/source/libs/parser/src/astGenerator.c
index 8e8da92bf5..9f0efbcf0e 100644
--- a/source/libs/parser/src/astGenerator.c
+++ b/source/libs/parser/src/astGenerator.c
@@ -610,7 +610,7 @@ SAlterTableInfo *tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *p
pAlterTable->type = type;
pAlterTable->tableType = tableType;
- if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || type == TSDB_ALTER_TABLE_CHANGE_COLUMN || type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
+ if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || type == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES || type == TSDB_ALTER_TABLE_UPDATE_TAG_BYTES) {
pAlterTable->pAddColumns = pCols;
assert(pVals == NULL);
} else {
diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c
index 697fd0c4cb..c7a1fd26a0 100644
--- a/source/libs/parser/src/astToMsg.c
+++ b/source/libs/parser/src/astToMsg.c
@@ -310,7 +310,7 @@ SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len
pCreateStbMsg->numOfColumns = htonl(numOfCols);
pCreateStbMsg->numOfTags = htonl(numOfTags);
- pSchema = (SSchema*)pCreateStbMsg->pSchema;
+ pSchema = (SSchema*)pCreateStbMsg->pSchemas;
for (int i = 0; i < numOfCols; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c
index 3ae89bca0a..50ae3bfd26 100644
--- a/source/libs/parser/src/dCDAstProcess.c
+++ b/source/libs/parser/src/dCDAstProcess.c
@@ -598,7 +598,7 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
}
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
- int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
+ int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
@@ -608,7 +608,7 @@ static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArr
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
- tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
+ tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
diff --git a/source/libs/parser/src/parserImpl.c b/source/libs/parser/src/parserImpl.c
index 7182bcfedf..1682a1cb9d 100644
--- a/source/libs/parser/src/parserImpl.c
+++ b/source/libs/parser/src/parserImpl.c
@@ -28,82 +28,11 @@ extern void NewParseFree(void*, FFree);
extern void NewParseTrace(FILE*, char*);
static uint32_t toNewTokenId(uint32_t tokenId) {
-// #define 1
-// #define NEW_TK_AND 2
-// #define NEW_TK_UNION 3
-// #define NEW_TK_ALL 4
-// #define NEW_TK_MINUS 5
-// #define NEW_TK_EXCEPT 6
-// #define NEW_TK_INTERSECT 7
-// #define NEW_TK_NK_PLUS 8
-// #define NEW_TK_NK_MINUS 9
-// #define NEW_TK_NK_STAR 10
-// #define NEW_TK_NK_SLASH 11
-// #define NEW_TK_NK_REM 12
-// #define NEW_TK_SHOW 13
-// #define NEW_TK_DATABASES 14
-// #define NEW_TK_NK_INTEGER 15
-// #define NEW_TK_NK_FLOAT 16
-// #define NEW_TK_NK_STRING 17
-// #define NEW_TK_NK_BOOL 18
-// #define NEW_TK_TIMESTAMP 19
-// #define NEW_TK_NK_VARIABLE 20
-// #define NEW_TK_NK_COMMA 21
-// #define NEW_TK_NK_ID 22
-// #define NEW_TK_NK_LP 23
-// #define NEW_TK_NK_RP 24
-// #define NEW_TK_NK_DOT 25
-// #define NEW_TK_BETWEEN 26
-// #define NEW_TK_NOT 27
-// #define NEW_TK_IS 28
-// #define NEW_TK_NULL 29
-// #define NEW_TK_NK_LT 30
-// #define NEW_TK_NK_GT 31
-// #define NEW_TK_NK_LE 32
-// #define NEW_TK_NK_GE 33
-// #define NEW_TK_NK_NE 34
-// #define 35
-// #define NEW_TK_LIKE 36
-// #define NEW_TK_MATCH 37
-// #define NEW_TK_NMATCH 38
-// #define NEW_TK_IN 39
-// #define NEW_TK_FROM 40
-// #define NEW_TK_AS 41
-// #define NEW_TK_JOIN 42
-// #define NEW_TK_ON 43
-// #define NEW_TK_INNER 44
-// #define NEW_TK_SELECT 45
-// #define NEW_TK_DISTINCT 46
-// #define 47
-// #define NEW_TK_PARTITION 48
-// #define NEW_TK_BY 49
-// #define NEW_TK_SESSION 50
-// #define NEW_TK_STATE_WINDOW 51
-// #define NEW_TK_INTERVAL 52
-// #define NEW_TK_SLIDING 53
-// #define NEW_TK_FILL 54
-// #define NEW_TK_VALUE 55
-// #define NEW_TK_NONE 56
-// #define NEW_TK_PREV 57
-// #define NEW_TK_LINEAR 58
-// #define NEW_TK_NEXT 59
-// #define NEW_TK_GROUP 60
-// #define NEW_TK_HAVING 61
-// #define NEW_TK_ORDER 62
-// #define NEW_TK_SLIMIT 63
-// #define NEW_TK_SOFFSET 64
-// #define NEW_TK_LIMIT 65
-// #define NEW_TK_OFFSET 66
-// #define NEW_TK_NK_LR 67
-// #define NEW_TK_ASC 68
-// #define NEW_TK_DESC 69
-// #define NEW_TK_NULLS 70
-// #define NEW_TK_FIRST 71
-// #define NEW_TK_LAST 72
-
switch (tokenId) {
case TK_OR:
return NEW_TK_OR;
+ case TK_AND:
+ return NEW_TK_AND;
case TK_UNION:
return NEW_TK_UNION;
case TK_ALL:
@@ -116,22 +45,62 @@ static uint32_t toNewTokenId(uint32_t tokenId) {
return NEW_TK_NK_STAR;
case TK_SLASH:
return NEW_TK_NK_SLASH;
+ case TK_REM:
+ return NEW_TK_NK_REM;
case TK_SHOW:
return NEW_TK_SHOW;
case TK_DATABASES:
return NEW_TK_DATABASES;
+ case TK_INTEGER:
+ return NEW_TK_NK_INTEGER;
+ case TK_FLOAT:
+ return NEW_TK_NK_FLOAT;
+ case TK_STRING:
+ return NEW_TK_NK_STRING;
+ case TK_BOOL:
+ return NEW_TK_NK_BOOL;
+ case TK_TIMESTAMP:
+ return NEW_TK_TIMESTAMP;
+ case TK_VARIABLE:
+ return NEW_TK_NK_VARIABLE;
+ case TK_COMMA:
+ return NEW_TK_NK_COMMA;
case TK_ID:
return NEW_TK_NK_ID;
case TK_LP:
return NEW_TK_NK_LP;
case TK_RP:
return NEW_TK_NK_RP;
- case TK_COMMA:
- return NEW_TK_NK_COMMA;
case TK_DOT:
return NEW_TK_NK_DOT;
+ case TK_BETWEEN:
+ return NEW_TK_BETWEEN;
+ case TK_NOT:
+ return NEW_TK_NOT;
+ case TK_IS:
+ return NEW_TK_IS;
+ case TK_NULL:
+ return NEW_TK_NULL;
+ case TK_LT:
+ return NEW_TK_NK_LT;
+ case TK_GT:
+ return NEW_TK_NK_GT;
+ case TK_LE:
+ return NEW_TK_NK_LE;
+ case TK_GE:
+ return NEW_TK_NK_GE;
+ case TK_NE:
+ return NEW_TK_NK_NE;
case TK_EQ:
return NEW_TK_NK_EQ;
+ case TK_LIKE:
+ return NEW_TK_LIKE;
+ case TK_MATCH:
+ return NEW_TK_MATCH;
+ case TK_NMATCH:
+ return NEW_TK_NMATCH;
+ case TK_IN:
+ return NEW_TK_IN;
case TK_SELECT:
return NEW_TK_SELECT;
case TK_DISTINCT:
@@ -142,6 +111,38 @@ static uint32_t toNewTokenId(uint32_t tokenId) {
return NEW_TK_AS;
case TK_FROM:
return NEW_TK_FROM;
+ case TK_JOIN:
+ return NEW_TK_JOIN;
+ // case TK_ON:
+ // return NEW_TK_ON;
+ // case TK_INNER:
+ // return NEW_TK_INNER;
+ // case TK_PARTITION:
+ // return NEW_TK_PARTITION;
+ case TK_SESSION:
+ return NEW_TK_SESSION;
+ case TK_STATE_WINDOW:
+ return NEW_TK_STATE_WINDOW;
+ case TK_INTERVAL:
+ return NEW_TK_INTERVAL;
+ case TK_SLIDING:
+ return NEW_TK_SLIDING;
+ case TK_FILL:
+ return NEW_TK_FILL;
+ // case TK_VALUE:
+ // return NEW_TK_VALUE;
+ case TK_NONE:
+ return NEW_TK_NONE;
+ case TK_PREV:
+ return NEW_TK_PREV;
+ case TK_LINEAR:
+ return NEW_TK_LINEAR;
+ // case TK_NEXT:
+ // return NEW_TK_NEXT;
+ case TK_GROUP:
+ return NEW_TK_GROUP;
+ case TK_HAVING:
+ return NEW_TK_HAVING;
case TK_ORDER:
return NEW_TK_ORDER;
case TK_BY:
@@ -150,6 +151,14 @@ static uint32_t toNewTokenId(uint32_t tokenId) {
return NEW_TK_ASC;
case TK_DESC:
return NEW_TK_DESC;
+ case TK_SLIMIT:
+ return NEW_TK_SLIMIT;
+ case TK_SOFFSET:
+ return NEW_TK_SOFFSET;
+ case TK_LIMIT:
+ return NEW_TK_LIMIT;
+ case TK_OFFSET:
+ return NEW_TK_OFFSET;
case TK_SPACE:
break;
default:
@@ -224,14 +233,6 @@ abort_parse:
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
}
-// typedef struct SNamespace {
-// int16_t level; // todo for correlated subquery
-// char dbName[TSDB_DB_NAME_LEN];
-// char tableAlias[TSDB_TABLE_NAME_LEN];
-// SHashObj* pColHash; // key is colname, value is index of STableMeta.schema
-// STableMeta* pMeta;
-// } SNamespace;
-
typedef enum ESqlClause {
SQL_CLAUSE_FROM = 1,
SQL_CLAUSE_WHERE
@@ -256,6 +257,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Table does not exist : %s";
case TSDB_CODE_PARSER_AMBIGUOUS_COLUMN:
return "Column ambiguously defined : %s";
+ case TSDB_CODE_PARSER_WRONG_VALUE_TYPE:
+ return "Invalid value type : %s";
default:
return "Unknown error";
}
@@ -322,7 +325,8 @@ static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColS
strcpy(pCol->node.aliasName, pColSchema->name);
}
pCol->colId = pColSchema->colId;
- pCol->colType = pColSchema->type;
+ // pCol->colType = pColSchema->type;
+ pCol->node.resType.type = pColSchema->type;
pCol->node.resType.bytes = pColSchema->bytes;
}
@@ -431,6 +435,30 @@ static bool translateValue(STranslateContext* pCxt, SValueNode* pVal) {
}
static bool translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
+ SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
+ SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
+ if (nodesIsArithmeticOp(pOp)) {
+ if (TSDB_DATA_TYPE_JSON == ldt.type || TSDB_DATA_TYPE_BLOB == ldt.type ||
+ TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
+ generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
+ return false;
+ }
+ pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
+ pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
+ return true;
+ } else if (nodesIsComparisonOp(pOp)) {
+ if (TSDB_DATA_TYPE_JSON == ldt.type || TSDB_DATA_TYPE_BLOB == ldt.type ||
+ TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
+ generateSyntaxErrMsg(pCxt, TSDB_CODE_PARSER_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
+ return false;
+ }
+ pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
+ pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
+ return true;
+ } else {
+ // todo json operator
+ return true;
+ }
return true;
}
diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c
index 6f63feb3c4..2fae10d17e 100644
--- a/source/libs/parser/src/sql.c
+++ b/source/libs/parser/src/sql.c
@@ -3199,7 +3199,7 @@ static void yy_reduce(
case 285: /* cmd ::= ALTER TABLE ids cpxName MODIFY COLUMN columnlist */
{
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
@@ -3231,7 +3231,7 @@ static void yy_reduce(
toTSDBType(yymsp[0].minor.yy0.type);
A = tListItemAppendToken(A, &yymsp[0].minor.yy0, -1);
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
@@ -3250,7 +3250,7 @@ static void yy_reduce(
case 290: /* cmd ::= ALTER TABLE ids cpxName MODIFY TAG columnlist */
{
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
@@ -3275,7 +3275,7 @@ static void yy_reduce(
case 293: /* cmd ::= ALTER STABLE ids cpxName MODIFY COLUMN columnlist */
{
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
@@ -3307,7 +3307,7 @@ static void yy_reduce(
toTSDBType(yymsp[0].minor.yy0.type);
A = tListItemAppendToken(A, &yymsp[0].minor.yy0, -1);
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
@@ -3326,7 +3326,7 @@ static void yy_reduce(
case 298: /* cmd ::= ALTER STABLE ids cpxName MODIFY TAG columnlist */
{
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
- SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
+ SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
break;
diff --git a/source/libs/parser/test/newParserTest.cpp b/source/libs/parser/test/newParserTest.cpp
index 973a6aff1e..16fd9f26d5 100644
--- a/source/libs/parser/test/newParserTest.cpp
+++ b/source/libs/parser/test/newParserTest.cpp
@@ -55,10 +55,13 @@ protected:
return (TSDB_CODE_SUCCESS != translateCode);
}
if (NULL != query_.pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_.pRoot)) {
- string sql;
- selectToSql(query_.pRoot, sql);
cout << "input sql : [" << cxt_.pSql << "]" << endl;
- cout << "output sql : [" << sql << "]" << endl;
+ // string sql;
+ // selectToSql(query_.pRoot, sql);
+ // cout << "output sql : [" << sql << "]" << endl;
+ string str;
+ selectToStr(query_.pRoot, str);
+ cout << "translate str : \n" << str << endl;
}
return (TSDB_CODE_SUCCESS == translateCode);
}
@@ -67,6 +70,162 @@ private:
static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024;
+ string dataTypeToStr(const SDataType& dt) {
+ switch (dt.type) {
+ case TSDB_DATA_TYPE_NULL:
+ return "NULL";
+ case TSDB_DATA_TYPE_BOOL:
+ return "BOOL";
+ case TSDB_DATA_TYPE_TINYINT:
+ return "TINYINT";
+ case TSDB_DATA_TYPE_SMALLINT:
+ return "SMALLINT";
+ case TSDB_DATA_TYPE_INT:
+ return "INT";
+ case TSDB_DATA_TYPE_BIGINT:
+ return "BIGINT";
+ case TSDB_DATA_TYPE_FLOAT:
+ return "FLOAT";
+ case TSDB_DATA_TYPE_DOUBLE:
+ return "DOUBLE";
+ case TSDB_DATA_TYPE_BINARY:
+ return "BINART(" + to_string(dt.bytes) + ")";
+ case TSDB_DATA_TYPE_TIMESTAMP:
+ return "TIMESTAMP";
+ case TSDB_DATA_TYPE_NCHAR:
+ return "NCHAR(" + to_string(dt.bytes) + ")";
+ case TSDB_DATA_TYPE_UTINYINT:
+ return "UTINYINT";
+ case TSDB_DATA_TYPE_USMALLINT:
+ return "USMALLINT";
+ case TSDB_DATA_TYPE_UINT:
+ return "UINT";
+ case TSDB_DATA_TYPE_UBIGINT:
+ return "UBIGINT";
+ case TSDB_DATA_TYPE_VARCHAR:
+ return "VARCHAR(" + to_string(dt.bytes) + ")";
+ case TSDB_DATA_TYPE_VARBINARY:
+ return "VARBINARY(" + to_string(dt.bytes) + ")";
+ case TSDB_DATA_TYPE_JSON:
+ return "JSON";
+ case TSDB_DATA_TYPE_DECIMAL:
+ return "DECIMAL(" + to_string(dt.precision) + ", " + to_string(dt.scale) + ")";
+ case TSDB_DATA_TYPE_BLOB:
+ return "BLOB";
+ default:
+ break;
+ }
+ return "Unknown Data Type " + to_string(dt.type);
+ }
+
+ void nodeToStr(const SNode* node, string& str, bool isProject) {
+ if (nullptr == node) {
+ return;
+ }
+
+ switch (nodeType(node)) {
+ case QUERY_NODE_COLUMN: {
+ SColumnNode* pCol = (SColumnNode*)node;
+ if ('\0' != pCol->dbName[0]) {
+ str.append(pCol->dbName);
+ str.append(".");
+ }
+ if ('\0' != pCol->tableAlias[0]) {
+ str.append(pCol->tableAlias);
+ str.append(".");
+ }
+ str.append(pCol->colName);
+ str.append(" [" + dataTypeToStr(pCol->node.resType) + "]");
+ if (isProject) {
+ str.append(" AS " + string(pCol->node.aliasName));
+ }
+ break;
+ }
+ case QUERY_NODE_VALUE: {
+ SValueNode* pVal = (SValueNode*)node;
+ str.append(pVal->literal);
+ str.append(" [" + dataTypeToStr(pVal->node.resType) + "]");
+ if (isProject) {
+ str.append(" AS " + string(pVal->node.aliasName));
+ }
+ break;
+ }
+ case QUERY_NODE_OPERATOR: {
+ SOperatorNode* pOp = (SOperatorNode*)node;
+ nodeToStr(pOp->pLeft, str, false);
+ str.append(opTypeToStr(pOp->opType));
+ nodeToStr(pOp->pRight, str, false);
+ str.append(" [" + dataTypeToStr(pOp->node.resType) + "]");
+ if (isProject) {
+ str.append(" AS " + string(pOp->node.aliasName));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ void nodeListToStr(const SNodeList* nodelist, const string& prefix, string& str, bool isProject = false) {
+ SNode* node = nullptr;
+ FOREACH(node, nodelist) {
+ str.append(prefix);
+ nodeToStr(node, str, isProject);
+ str.append("\n");
+ }
+ }
+
+ void tableToStr(const SNode* node, const string& prefix, string& str) {
+ const STableNode* table = (const STableNode*)node;
+ switch (nodeType(node)) {
+ case QUERY_NODE_REAL_TABLE: {
+ SRealTableNode* realTable = (SRealTableNode*)table;
+ str.append(prefix);
+ if ('\0' != realTable->table.dbName[0]) {
+ str.append(realTable->table.dbName);
+ str.append(".");
+ }
+ str.append(realTable->table.tableName);
+ str.append(string(" ") + realTable->table.tableAlias);
+ break;
+ }
+ case QUERY_NODE_TEMP_TABLE: {
+ STempTableNode* tempTable = (STempTableNode*)table;
+ str.append(prefix + "(\n");
+ selectToStr(tempTable->pSubquery, str, prefix + "\t");
+ str.append("\n");
+ str.append(prefix + ") ");
+ str.append(tempTable->table.tableAlias);
+ break;
+ }
+ case QUERY_NODE_JOIN_TABLE: {
+ SJoinTableNode* joinTable = (SJoinTableNode*)table;
+ tableToStr(joinTable->pLeft, prefix, str);
+ str.append("\n" + prefix + "JOIN\n");
+ tableToStr(joinTable->pRight, prefix, str);
+ if (nullptr != joinTable->pOnCond) {
+ str.append("\n" + prefix + "\tON ");
+ nodeToStr(joinTable->pOnCond, str, false);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ void selectToStr(const SNode* node, string& str, const string& prefix = "") {
+ SSelectStmt* select = (SSelectStmt*)node;
+ str.append(prefix + "SELECT ");
+ if (select->isDistinct) {
+ str.append("DISTINCT");
+ }
+ str.append("\n");
+ nodeListToStr(select->pProjectionList, prefix + "\t", str, true);
+ str.append("\n" + prefix + "FROM\n");
+ tableToStr(select->pFromTable, prefix + "\t", str);
+ }
+
void selectToSql(const SNode* node, string& sql) {
SSelectStmt* select = (SSelectStmt*)node;
sql.append("SELECT ");
@@ -123,7 +282,7 @@ private:
}
}
- string opTypeToSql(EOperatorType type) {
+ string opTypeToStr(EOperatorType type) {
switch (type) {
case OP_TYPE_ADD:
return " + ";
@@ -177,7 +336,7 @@ private:
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOp = (SOperatorNode*)node;
nodeToSql(pOp->pLeft, sql);
- sql.append(opTypeToSql(pOp->opType));
+ sql.append(opTypeToStr(pOp->opType));
nodeToSql(pOp->pRight, sql);
break;
}
@@ -213,8 +372,7 @@ private:
SQuery query_;
};
-// SELECT * FROM t1
-TEST_F(NewParserTest, selectStar) {
+TEST_F(NewParserTest, selectSimple) {
setDatabase("root", "test");
bind("SELECT * FROM t1");
@@ -233,7 +391,14 @@ TEST_F(NewParserTest, selectStar) {
ASSERT_TRUE(run());
}
-TEST_F(NewParserTest, syntaxError) {
+TEST_F(NewParserTest, selectExpression) {
+ setDatabase("root", "test");
+
+ bind("SELECT c1 + 10, c2 FROM t1");
+ ASSERT_TRUE(run());
+}
+
+TEST_F(NewParserTest, selectSyntaxError) {
setDatabase("root", "test");
bind("SELECTT * FROM t1");
@@ -249,7 +414,7 @@ TEST_F(NewParserTest, syntaxError) {
ASSERT_TRUE(run(TSDB_CODE_FAILED));
}
-TEST_F(NewParserTest, semanticError) {
+TEST_F(NewParserTest, selectSemanticError) {
setDatabase("root", "test");
bind("SELECT * FROM t10");
diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
index 082c89fed4..846f2d5099 100644
--- a/source/libs/transport/inc/transComm.h
+++ b/source/libs/transport/inc/transComm.h
@@ -134,10 +134,12 @@ typedef struct {
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
- int8_t connType; // connection type
- int64_t rid; // refId returned by taosAddRef
- SRpcMsg* pRsp; // for synchronous API
- tsem_t* pSem; // for synchronous API
+ int8_t connType; // connection type
+ int64_t rid; // refId returned by taosAddRef
+
+ SRpcMsg* pRsp; // for synchronous API
+ tsem_t* pSem; // for synchronous API
+
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c
index a286482fc1..d8ef0462fb 100644
--- a/source/libs/transport/src/rpcMain.c
+++ b/source/libs/transport/src/rpcMain.c
@@ -813,8 +813,8 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcInfo *pRpc = pContext->pRpc;
SEpSet * pEpSet = &pContext->epSet;
- pConn =
- rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
+ pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
+ pContext->connType);
if (pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
}
diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c
index 91f9a8ead2..a6040a3873 100644
--- a/source/libs/transport/src/trans.c
+++ b/source/libs/transport/src/trans.c
@@ -63,17 +63,41 @@ void rpcFreeCont(void* cont) {
}
free((char*)cont - TRANS_MSG_OVERHEAD);
}
-void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
+void* rpcReallocCont(void* ptr, int contLen) {
+ if (ptr == NULL) {
+ return rpcMallocCont(contLen);
+ }
+ char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
+ int sz = contLen + TRANS_MSG_OVERHEAD;
+ st = realloc(st, sz);
+ if (st == NULL) {
+ return NULL;
+ }
+ return st + TRANS_MSG_OVERHEAD;
+}
+
+void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
+ SRpcMsg rpcMsg;
+ memset(&rpcMsg, 0, sizeof(rpcMsg));
+
+ rpcMsg.contLen = sizeof(SEpSet);
+ rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
+ if (rpcMsg.pCont == NULL) return;
+
+ memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
+
+ rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
+ rpcMsg.handle = thandle;
+
+ rpcSendResponse(&rpcMsg);
+}
-void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
-int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
-void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
int32_t rpcInit(void) {
// impl later
- return -1;
+ return 0;
}
void rpcCleanup(void) {
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
index 24ff5e956a..3d93049c6a 100644
--- a/source/libs/transport/src/transCli.c
+++ b/source/libs/transport/src/transCli.c
@@ -123,9 +123,14 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.code = pHead->code;
rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle;
-
- tDebug("conn %p handle resp", conn);
- (pRpc->cfp)(NULL, &rpcMsg, NULL);
+ if (pCtx->pSem == NULL) {
+ tDebug("conn %p handle resp", conn);
+ (pRpc->cfp)(NULL, &rpcMsg, NULL);
+ } else {
+ tDebug("conn %p handle resp", conn);
+ memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
+ tsem_post(pCtx->pSem);
+ }
conn->notifyCount += 1;
// buf's mem alread translated to rpcMsg.pCont
@@ -159,14 +164,20 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
- // SRpcInfo* pRpc = pMsg->ctx->pRpc;
- (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
- pConn->notifyCount += 1;
+ if (pCtx->pSem == NULL) {
+ // SRpcInfo* pRpc = pMsg->ctx->pRpc;
+ (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
+ } else {
+ memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
+ // SRpcMsg rpcMsg
+ tsem_post(pCtx->pSem);
+ }
destroyCmsg(pMsg);
pConn->data = NULL;
// transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true);
+ pConn->notifyCount += 1;
}
static void clientTimeoutCb(uv_timer_t* handle) {
@@ -463,6 +474,7 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
+ setThreadName("trans-client-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
@@ -568,8 +580,8 @@ void taosCloseClient(void* arg) {
}
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later
- char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
- uint32_t port = pEpSet->port[pEpSet->inUse];
+ char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
+ uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle;
@@ -609,4 +621,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
// int end = taosGetTimestampUs() - start;
// tError("client sent to rpc, time cost: %d", (int)end);
}
+void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
+ char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
+ uint32_t port = pEpSet->eps[pEpSet->inUse].port;
+
+ SRpcInfo* pRpc = (SRpcInfo*)shandle;
+
+ STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
+ pCtx->pTransInst = (SRpcInfo*)shandle;
+ pCtx->ahandle = pReq->ahandle;
+ pCtx->msgType = pReq->msgType;
+ pCtx->ip = strdup(ip);
+ pCtx->port = port;
+ pCtx->pSem = calloc(1, sizeof(tsem_t));
+ pCtx->pRsp = pRsp;
+ tsem_init(pCtx->pSem, 0, 0);
+
+ int64_t index = pRpc->index;
+ if (pRpc->index++ >= pRpc->numOfThreads) {
+ pRpc->index = 0;
+ }
+ SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
+ cliMsg->ctx = pCtx;
+ cliMsg->msg = *pReq;
+ cliMsg->st = taosGetTimestampUs();
+
+ SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
+
+ // pthread_mutex_lock(&thrd->msgMtx);
+ // QUEUE_PUSH(&thrd->msg, &cliMsg->q);
+ // pthread_mutex_unlock(&thrd->msgMtx);
+
+ // int start = taosGetTimestampUs();
+ transSendAsync(thrd->asyncPool, &(cliMsg->q));
+
+ tsem_t* pSem = pCtx->pSem;
+ tsem_wait(pSem);
+ tsem_destroy(pSem);
+ free(pSem);
+
+ return;
+}
#endif
diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c
index a005b31fe4..4d2ac434dd 100644
--- a/source/libs/transport/src/transSrv.c
+++ b/source/libs/transport/src/transSrv.c
@@ -33,6 +33,8 @@ typedef struct SSrvConn {
void* hostThrd;
void* pSrvMsg;
+ struct sockaddr peername;
+
// SRpcMsg sendMsg;
// del later
char secured;
@@ -487,7 +489,13 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("conn %p created, fd: %d", pConn, fd);
- uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
+ int namelen = sizeof(pConn->peername);
+ if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) {
+ tError("failed to get peer name");
+ destroyConn(pConn, true);
+ } else {
+ uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
+ }
} else {
tDebug("failed to create new connection");
destroyConn(pConn, true);
@@ -496,6 +504,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
void* acceptThread(void* arg) {
// opt
+ setThreadName("trans-accept");
SServerObj* srv = (SServerObj*)arg;
uv_run(srv->loop, UV_RUN_DEFAULT);
}
@@ -548,6 +557,7 @@ static bool addHandleToAcceptloop(void* arg) {
return true;
}
void* workerThread(void* arg) {
+ setThreadName("trans-worker");
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
uv_run(pThrd->loop, UV_RUN_DEFAULT);
}
@@ -723,4 +733,16 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
// uv_async_send(pThrd->workerAsync);
}
+int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
+ SSrvConn* pConn = thandle;
+ struct sockaddr* pPeerName = &pConn->peername;
+
+ struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName);
+ pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr);
+ pInfo->clientPort = ntohs(caddr.sin_port);
+
+ tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
+ return 0;
+}
+
#endif
diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt
index 3d9c396336..3c9c40f46a 100644
--- a/source/libs/transport/test/CMakeLists.txt
+++ b/source/libs/transport/test/CMakeLists.txt
@@ -2,6 +2,7 @@ add_executable(transportTest "")
add_executable(client "")
add_executable(server "")
add_executable(transUT "")
+add_executable(syncClient "")
target_sources(transUT
PRIVATE
@@ -20,6 +21,10 @@ target_sources (server
PRIVATE
"rserver.c"
)
+target_sources (syncClient
+ PRIVATE
+ "syncClient.c"
+)
target_include_directories(transportTest
PUBLIC
@@ -67,7 +72,6 @@ target_include_directories(transUT
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
-
target_link_libraries (server
os
util
@@ -75,4 +79,17 @@ target_link_libraries (server
gtest_main
transport
)
+target_include_directories(syncClient
+ PUBLIC
+ "${CMAKE_SOURCE_DIR}/include/libs/transport"
+ "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
+)
+target_link_libraries (syncClient
+ os
+ util
+ common
+ gtest_main
+ transport
+)
+
diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c
index 308b7b54bd..4e29c02508 100644
--- a/source/libs/transport/test/rclient.c
+++ b/source/libs/transport/test/rclient.c
@@ -33,7 +33,6 @@ typedef struct {
pthread_t thread;
void * pRpc;
} SInfo;
-
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
diff --git a/source/libs/transport/test/syncClient.c b/source/libs/transport/test/syncClient.c
new file mode 100644
index 0000000000..c5d7f5664a
--- /dev/null
+++ b/source/libs/transport/test/syncClient.c
@@ -0,0 +1,220 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+
+#include
+#include "os.h"
+#include "rpcLog.h"
+#include "taoserror.h"
+#include "tglobal.h"
+#include "trpc.h"
+#include "tutil.h"
+
+typedef struct {
+ int index;
+ SEpSet epSet;
+ int num;
+ int numOfReqs;
+ int msgSize;
+ tsem_t rspSem;
+ tsem_t * pOverSem;
+ pthread_t thread;
+ void * pRpc;
+} SInfo;
+static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
+ SInfo *pInfo = (SInfo *)pMsg->ahandle;
+ tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
+ pMsg->code);
+
+ if (pEpSet) pInfo->epSet = *pEpSet;
+
+ rpcFreeCont(pMsg->pCont);
+ // tsem_post(&pInfo->rspSem);
+ tsem_post(&pInfo->rspSem);
+}
+
+static int tcount = 0;
+
+static void *sendRequest(void *param) {
+ SInfo * pInfo = (SInfo *)param;
+ SRpcMsg rpcMsg = {0};
+
+ tDebug("thread:%d, start to send request", pInfo->index);
+
+ tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
+ int u100 = 0;
+ int u500 = 0;
+ int u1000 = 0;
+ int u10000 = 0;
+ SRpcMsg respMsg = {0};
+ while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
+ pInfo->num++;
+ rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
+ rpcMsg.contLen = pInfo->msgSize;
+ rpcMsg.ahandle = pInfo;
+ rpcMsg.msgType = 1;
+ // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
+ int64_t start = taosGetTimestampUs();
+ rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &respMsg);
+ // rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
+ if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
+ // tsem_wait(&pInfo->rspSem);
+ // wtsem_wait(&pInfo->rspSem);
+ int64_t end = taosGetTimestampUs() - start;
+ if (end <= 100) {
+ u100++;
+ } else if (end > 100 && end <= 500) {
+ u500++;
+ } else if (end > 500 && end < 1000) {
+ u1000++;
+ } else {
+ u10000++;
+ }
+
+ tDebug("recv response succefully");
+
+ // usleep(100000000);
+ }
+
+ tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
+ tDebug("thread:%d, it is over", pInfo->index);
+ tcount++;
+
+ return NULL;
+}
+
+int main(int argc, char *argv[]) {
+ SRpcInit rpcInit;
+ SEpSet epSet = {0};
+ int msgSize = 128;
+ int numOfReqs = 0;
+ int appThreads = 1;
+ char serverIp[40] = "127.0.0.1";
+ char secret[20] = "mypassword";
+ struct timeval systemTime;
+ int64_t startTime, endTime;
+ pthread_attr_t thattr;
+
+ // server info
+ epSet.inUse = 0;
+ addEpIntoEpSet(&epSet, serverIp, 7000);
+ addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
+
+ // client info
+ memset(&rpcInit, 0, sizeof(rpcInit));
+ rpcInit.localPort = 0;
+ rpcInit.label = "APP";
+ rpcInit.numOfThreads = 1;
+ rpcInit.cfp = processResponse;
+ rpcInit.sessions = 100;
+ rpcInit.idleTime = 100;
+ rpcInit.user = "michael";
+ rpcInit.secret = secret;
+ rpcInit.ckey = "key";
+ rpcInit.spi = 1;
+ rpcInit.connType = TAOS_CONN_CLIENT;
+
+ for (int i = 1; i < argc; ++i) {
+ if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
+ epSet.eps[0].port = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
+ tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
+ } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
+ rpcInit.numOfThreads = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
+ msgSize = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
+ rpcInit.sessions = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
+ numOfReqs = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
+ appThreads = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
+ tsCompressMsgSize = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
+ rpcInit.user = argv[++i];
+ } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
+ rpcInit.secret = argv[++i];
+ } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
+ rpcInit.spi = atoi(argv[++i]);
+ } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
+ rpcDebugFlag = atoi(argv[++i]);
+ } else {
+ printf("\nusage: %s [options] \n", argv[0]);
+ printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
+ printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
+ printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
+ printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
+ printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
+ printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
+ printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
+ printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
+ printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
+ printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
+ printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
+ printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
+ printf(" [-h help]: print out this help\n\n");
+ exit(0);
+ }
+ }
+
+ taosInitLog("client.log", 100000, 10);
+
+ void *pRpc = rpcOpen(&rpcInit);
+ if (pRpc == NULL) {
+ tError("failed to initialize RPC");
+ return -1;
+ }
+
+ tInfo("client is initialized");
+ tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
+
+ gettimeofday(&systemTime, NULL);
+ startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
+
+ SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
+
+ pthread_attr_init(&thattr);
+ pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
+
+ for (int i = 0; i < appThreads; ++i) {
+ pInfo->index = i;
+ pInfo->epSet = epSet;
+ pInfo->numOfReqs = numOfReqs;
+ pInfo->msgSize = msgSize;
+ tsem_init(&pInfo->rspSem, 0, 0);
+ pInfo->pRpc = pRpc;
+ pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
+ pInfo++;
+ }
+
+ do {
+ usleep(1);
+ } while (tcount < appThreads);
+
+ gettimeofday(&systemTime, NULL);
+ endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
+ float usedTime = (endTime - startTime) / 1000.0f; // mseconds
+
+ tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
+ tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
+
+ int ch = getchar();
+ UNUSED(ch);
+
+ taosCloseLog();
+
+ return 0;
+}
diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc
index 08c683590b..6f80ea42ac 100644
--- a/source/libs/transport/test/transUT.cc
+++ b/source/libs/transport/test/transUT.cc
@@ -15,6 +15,7 @@
#include
#include
#include
+#include "tep.h"
#include "trpc.h"
using namespace std;
@@ -50,6 +51,25 @@ class TransObj {
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
+
+ bool sendAndRecv() {
+ SEpSet epSet = {0};
+ epSet.inUse = 0;
+ addEpIntoEpSet(&epSet, "192.168.1.1", 7000);
+ addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
+
+ if (trans == NULL) {
+ return false;
+ }
+ SRpcMsg rpcMsg = {0}, reqMsg = {0};
+ reqMsg.pCont = rpcMallocCont(10);
+ reqMsg.contLen = 10;
+ reqMsg.ahandle = NULL;
+ rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg);
+ int code = rpcMsg.code;
+ std::cout << tstrerror(code) << std::endl;
+ return true;
+ }
bool stop() {
rpcClose(trans);
trans = NULL;
@@ -75,6 +95,7 @@ class TransEnv : public ::testing::Test {
};
TEST_F(TransEnv, test_start_stop) {
assert(tr->startCli());
+ assert(tr->sendAndRecv());
assert(tr->stop());
assert(tr->startSrv());
diff --git a/source/nodes/src/nodesTraverseFuncs.c b/source/nodes/src/nodesTraverseFuncs.c
index 444ff7cbcf..0702254b5f 100644
--- a/source/nodes/src/nodesTraverseFuncs.c
+++ b/source/nodes/src/nodesTraverseFuncs.c
@@ -109,7 +109,7 @@ void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContex
}
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext) {
- (void)walkList(pList, TRAVERSAL_PREORDER, walker, pContext);
+ (void)walkList(pList, TRAVERSAL_POSTORDER, walker, pContext);
}
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext) {
diff --git a/source/nodes/src/nodesUtilFuncs.c b/source/nodes/src/nodesUtilFuncs.c
index bf4c4a83cc..af6cec755d 100644
--- a/source/nodes/src/nodesUtilFuncs.c
+++ b/source/nodes/src/nodesUtilFuncs.c
@@ -70,8 +70,19 @@ SNode* nodesMakeNode(ENodeType type) {
return NULL;
}
-void nodesDestroyNode(SNode* pNode) {
+static bool destroyNode(SNode* pNode, void* pContext) {
+ switch (nodeType(pNode)) {
+ case QUERY_NODE_VALUE:
+ tfree(((SValueNode*)pNode)->literal);
+ break;
+ default:
+ break;
+ }
+ tfree(pNode);
+}
+void nodesDestroyNode(SNode* pNode) {
+ nodesWalkNodePostOrder(pNode, destroyNode, NULL);
}
SNodeList* nodesMakeList() {
@@ -103,13 +114,63 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) {
}
void nodesDestroyList(SNodeList* pList) {
+ SNode* node;
+ FOREACH(node, pList) {
+ nodesDestroyNode(node);
+ }
+ tfree(pList);
+}
+bool nodesIsArithmeticOp(const SOperatorNode* pOp) {
+ switch (pOp->opType) {
+ case OP_TYPE_ADD:
+ case OP_TYPE_SUB:
+ case OP_TYPE_MULTI:
+ case OP_TYPE_DIV:
+ case OP_TYPE_MOD:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+bool nodesIsComparisonOp(const SOperatorNode* pOp) {
+ switch (pOp->opType) {
+ case OP_TYPE_GREATER_THAN:
+ case OP_TYPE_GREATER_EQUAL:
+ case OP_TYPE_LOWER_THAN:
+ case OP_TYPE_LOWER_EQUAL:
+ case OP_TYPE_EQUAL:
+ case OP_TYPE_NOT_EQUAL:
+ case OP_TYPE_IN:
+ case OP_TYPE_NOT_IN:
+ case OP_TYPE_LIKE:
+ case OP_TYPE_NOT_LIKE:
+ case OP_TYPE_MATCH:
+ case OP_TYPE_NMATCH:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
+bool nodesIsJsonOp(const SOperatorNode* pOp) {
+ switch (pOp->opType) {
+ case OP_TYPE_JSON_GET_VALUE:
+ case OP_TYPE_JSON_CONTAINS:
+ return true;
+ default:
+ break;
+ }
+ return false;
}
bool nodesIsTimeorderQuery(const SNode* pQuery) {
-
+ return false;
}
bool nodesIsTimelineQuery(const SNode* pQuery) {
-
+ return false;
}
\ No newline at end of file
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index ee5bea0ab7..ca0401113c 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -236,9 +236,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, "Tag already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
-TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
-TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
-TAOS_DEFINE_ERROR(TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES, "Exceed max row bytes")
+TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
+TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
+TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
// mnode-func
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")