diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9b2bb89793..da58f98e4c 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -20,36 +20,38 @@ extern "C" { #endif -#include "tdef.h" #include "taos.h" +#include "tdef.h" -#define TSWINDOW_INITIALIZER ((STimeWindow) {INT64_MIN, INT64_MAX}) -#define TSWINDOW_DESC_INITIALIZER ((STimeWindow) {INT64_MAX, INT64_MIN}) +typedef uint64_t tb_uid_t; + +#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX}) +#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN}) #define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX)) typedef enum { - TAOS_QTYPE_RPC = 1, - TAOS_QTYPE_FWD = 2, - TAOS_QTYPE_WAL = 3, - TAOS_QTYPE_CQ = 4, + TAOS_QTYPE_RPC = 1, + TAOS_QTYPE_FWD = 2, + TAOS_QTYPE_WAL = 3, + TAOS_QTYPE_CQ = 4, TAOS_QTYPE_QUERY = 5 } EQType; typedef enum { - TSDB_SUPER_TABLE = 1, // super table - TSDB_CHILD_TABLE = 2, // table created from super table - TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing - TSDB_TEMP_TABLE = 5, // temp table created by nest query - TSDB_TABLE_MAX = 6 + TSDB_SUPER_TABLE = 1, // super table + TSDB_CHILD_TABLE = 2, // table created from super table + TSDB_NORMAL_TABLE = 3, // ordinary table + TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_TEMP_TABLE = 5, // temp table created by nest query + TSDB_TABLE_MAX = 6 } ETableType; typedef enum { - TSDB_MOD_MNODE = 1, - TSDB_MOD_HTTP = 2, + TSDB_MOD_MNODE = 1, + TSDB_MOD_HTTP = 2, TSDB_MOD_MONITOR = 3, - TSDB_MOD_MQTT = 4, - TSDB_MOD_MAX = 5 + TSDB_MOD_MQTT = 4, + TSDB_MOD_MAX = 5 } EModuleType; typedef enum { @@ -64,11 +66,7 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { - TD_ROW_DISCARD_UPDATE = 0, - TD_ROW_OVERWRITE_UPDATE = 1, - TD_ROW_PARTIAL_UPDATE = 2 -} TDUpdateConfig; +typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; extern char *qtypeStr[]; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2c5f7346ca..92b453bc1f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -22,6 +22,7 @@ extern "C" { #include "taosdef.h" #include "taoserror.h" +#include "tcoding.h" #include "tdataformat.h" #define TD_MSG_NUMBER_ @@ -1096,6 +1097,131 @@ typedef struct { uint64_t tuid; } SDropTopicInternalMsg; +typedef struct SVCreateTbReq { + uint64_t ver; // use a general definition + char* name; + uint32_t ttl; + uint32_t keep; +#define TD_SUPER_TABLE 0 +#define TD_CHILD_TABLE 1 +#define TD_NORMAL_TABLE 2 + uint8_t type; + union { + struct { + tb_uid_t suid; + uint32_t nCols; + SSchema* pSchema; + uint32_t nTagCols; + SSchema* pTagSchema; + } stbCfg; + struct { + tb_uid_t suid; + SKVRow pTag; + } ctbCfg; + struct { + uint32_t nCols; + SSchema* pSchema; + } ntbCfg; + }; +} SVCreateTbReq; + +static FORCE_INLINE int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedU32(buf, pReq->ttl); + tlen += taosEncodeFixedU32(buf, pReq->keep); + tlen += taosEncodeFixedU8(buf, pReq->type); + + switch (pReq->type) { + case TD_SUPER_TABLE: + tlen += taosEncodeFixedU64(buf, pReq->stbCfg.suid); + tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); + for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); + } + tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols); + for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); + } + break; + case TD_CHILD_TABLE: + tlen += taosEncodeFixedU64(buf, pReq->ctbCfg.suid); + tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag); + break; + case TD_NORMAL_TABLE: + tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols); + for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { + tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); + tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].colId); + tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); + tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); + } + break; + default: + ASSERT(0); + } + + return tlen; +} + +static FORCE_INLINE void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq) { + buf = taosDecodeFixedU64(buf, &(pReq->ver)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeFixedU32(buf, &(pReq->ttl)); + buf = taosDecodeFixedU32(buf, &(pReq->keep)); + buf = taosDecodeFixedU8(buf, &(pReq->type)); + + switch (pReq->type) { + case TD_SUPER_TABLE: + buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid)); + buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols)); + pReq->stbCfg.pSchema = (SSchema*)malloc(pReq->stbCfg.nCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { + buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); + buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].colId)); + buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); + buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); + } + buf = taosDecodeFixedU32(buf, &pReq->stbCfg.nTagCols); + pReq->stbCfg.pTagSchema = (SSchema*)malloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { + buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); + buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].colId); + buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); + buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); + } + break; + case TD_CHILD_TABLE: + buf = taosDecodeFixedU64(buf, &pReq->ctbCfg.suid); + buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag); + break; + case TD_NORMAL_TABLE: + buf = taosDecodeFixedU32(buf, &pReq->ntbCfg.nCols); + pReq->ntbCfg.pSchema = (SSchema*)malloc(pReq->ntbCfg.nCols * sizeof(SSchema)); + for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { + buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); + buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].colId); + buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); + buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); + } + break; + default: + ASSERT(0); + } + + return buf; +} +typedef struct SVCreateTbRsp { +} SVCreateTbRsp; + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 05373cda97..15b5b9da28 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -138,7 +138,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-super-table", SVCreateTbReq, SVCreateTbRsp) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index 113a970548..3d618862b6 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -25,7 +25,6 @@ extern "C" { #endif // Types exported -typedef uint64_t tb_uid_t; typedef struct SMeta SMeta; #define META_SUPER_TABLE 0 diff --git a/include/util/tcoding.h b/include/util/tcoding.h index a2c91c5dbe..6e6a91130c 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -357,6 +357,17 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { return POINTER_SHIFT(buf, size); } +static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { + uint64_t size = 0; + + buf = taosDecodeVariantU64(buf, &size); + memcpy(value, buf, (size_t)size); + + value[size] = '\0'; + + return POINTER_SHIFT(buf, size); +} + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 0f577c3c3b..e5acb6024e 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -177,7 +177,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb } SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; @@ -200,7 +200,36 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) { return mndAcquireDb(pMnode, db); } -static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { +static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *contLen) { + SVCreateTbReq req; + void * buf; + int bsize; + + req.ver = 0; + req.name = pStb->name; + req.ttl = 0; + req.keep = 0; + req.type = TD_SUPER_TABLE; + req.stbCfg.suid = pStb->uid; + req.stbCfg.nCols = pStb->numOfColumns; + req.stbCfg.pSchema = pStb->pSchema; + req.stbCfg.nTagCols = pStb->numOfTags; + req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns; + + bsize = tSerializeSVCreateTbReq(NULL, &req); + buf = malloc(bsize); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pBuf = buf; + tSerializeSVCreateTbReq(&pBuf, &req); + + *contLen = bsize; + return buf; + +#if 0 int32_t totalCols = pStb->numOfTags + pStb->numOfColumns; int32_t contLen = totalCols * sizeof(SSchema) + sizeof(SCreateStbInternalMsg); @@ -226,8 +255,8 @@ static SCreateStbInternalMsg *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgro pSchema->bytes = htonl(pSchema->bytes); pSchema->colId = htonl(pSchema->colId); } - - return pCreate; + return pCreate; +#endif } static SDropStbInternalMsg *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { @@ -324,16 +353,17 @@ static int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj } static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; - void *pIter = NULL; + void * pIter = NULL; + int contLen; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SCreateStbInternalMsg *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb); + void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -344,7 +374,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pMsg; - action.contLen = htonl(pMsg->head.contLen); + action.contLen = htonl(contLen); action.msgType = TDMT_VND_CREATE_STB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -359,9 +389,9 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj } static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; - void *pIter = NULL; + void * pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -461,7 +491,7 @@ CREATE_STB_OVER: } static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode * pMnode = pMsg->pMnode; SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to create", pCreate->name); @@ -484,7 +514,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { } } - //topic should have different name with stb + // topic should have different name with stb SStbObj *pTopic = mndAcquireStb(pMnode, pCreate->name); if (pTopic != NULL) { sdbRelease(pMnode->pSdb, pTopic); @@ -545,7 +575,7 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; } static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode * pMnode = pMsg->pMnode; SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to alter", pAlter->name); @@ -659,7 +689,7 @@ DROP_STB_OVER: } static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode * pMnode = pMsg->pMnode; SDropStbMsg *pDrop = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to drop", pDrop->name); @@ -694,7 +724,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { } static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode * pMnode = pMsg->pMnode; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); @@ -766,7 +796,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs } int32_t numOfStbs = 0; - void *pIter = NULL; + void * pIter = NULL; while (1) { SStbObj *pStb = NULL; pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); @@ -785,7 +815,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; + SSdb * pSdb = pMnode->pSdb; if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) { return -1; @@ -847,12 +877,12 @@ static void mndExtractTableName(char *tableId, char *name) { } static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; + SMnode * pMnode = pMsg->pMnode; + SSdb * pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStbObj *pStb = NULL; int32_t cols = 0; - char *pWrite; + char * pWrite; char prefix[64] = {0}; tstrncpy(prefix, pShow->db, 64);