diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6a7edb481f..5fc6131a2e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -109,6 +109,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_FUNC, + TSDB_MGMT_TABLE_INDEX, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -270,9 +271,10 @@ typedef struct { int8_t igExists; int32_t numOfColumns; int32_t numOfTags; + int32_t commentLen; SArray* pColumns; SArray* pTags; - char comment[TSDB_STB_COMMENT_LEN]; + char *comment; } SMCreateStbReq; int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); @@ -1938,12 +1940,47 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) } return buf; } + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + char stb[TSDB_TABLE_FNAME_LEN]; + int8_t igExists; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int32_t dstVgId; // for stream + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; // strlen + 1 + int32_t tagsFilterLen; // strlen + 1 + int32_t sqlLen; // strlen + 1 + int32_t astLen; // strlen + 1 + char* expr; + char* tagsFilter; + char* sql; + char* ast; +} SMCreateSmaReq; + +int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); +int32_t tDeserializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); +void tFreeSMCreateSmaReq(SMCreateSmaReq* pReq); + +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + int8_t igNotExists; +} SMDropSmaReq; + +int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); +int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); + typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t slidingUnit; // MACRO: TIME_UNIT_XXX + int8_t timezoneInt; // sma data expired if timezone changes. char indexName[TSDB_INDEX_NAME_LEN]; - char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes. + char timezone[TD_TIMEZONE_LEN]; int32_t exprLen; int32_t tagsFilterLen; int64_t indexUid; @@ -2050,8 +2087,8 @@ static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { tlen += taosEncodeFixedI8(buf, pSma->version); tlen += taosEncodeFixedI8(buf, pSma->intervalUnit); tlen += taosEncodeFixedI8(buf, pSma->slidingUnit); + tlen += taosEncodeFixedI8(buf, pSma->timezoneInt); tlen += taosEncodeString(buf, pSma->indexName); - tlen += taosEncodeString(buf, pSma->timezone); tlen += taosEncodeFixedI32(buf, pSma->exprLen); tlen += taosEncodeFixedI32(buf, pSma->tagsFilterLen); tlen += taosEncodeFixedI64(buf, pSma->indexUid); @@ -2085,8 +2122,8 @@ static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { buf = taosDecodeFixedI8(buf, &pSma->version); buf = taosDecodeFixedI8(buf, &pSma->intervalUnit); buf = taosDecodeFixedI8(buf, &pSma->slidingUnit); + buf = taosDecodeFixedI8(buf, &pSma->timezoneInt); buf = taosDecodeStringTo(buf, pSma->indexName); - buf = taosDecodeStringTo(buf, pSma->timezone); buf = taosDecodeFixedI32(buf, &pSma->exprLen); buf = taosDecodeFixedI32(buf, &pSma->tagsFilterLen); buf = taosDecodeFixedI64(buf, &pSma->indexUid); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ad8a839095..f9ce69925b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -127,6 +127,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "mnode-create-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "mnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index d04e9f817e..a28c093e85 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -119,10 +119,11 @@ typedef enum { SDB_CONSUMER = 13, SDB_TOPIC = 14, SDB_VGROUP = 15, - SDB_STB = 16, - SDB_DB = 17, - SDB_FUNC = 18, - SDB_MAX = 19 + SDB_SMA = 16, + SDB_STB = 17, + SDB_DB = 18, + SDB_FUNC = 19, + SDB_MAX = 20 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9e1e1c53dc..7661b3c2cc 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -274,18 +274,23 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) +// mnode-sma +#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0400) +#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0401) +#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0402) + // dnode -#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) -#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401) -#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402) -#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) -#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) -#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405) -#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0406) -#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0410) -#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411) -#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0412) -#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0413) +#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x04A0) +#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x04A1) +#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x04A2) +#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A3) +#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A4) +#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x04A5) +#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04A6) +#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A7) +#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A8) +#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04A9) +#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x04AA) // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) @@ -309,7 +314,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) #define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) -#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516) +#define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) +#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b2fb92600b..e5a0f2a28e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -510,6 +510,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; + if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField *pField = taosArrayGet(pReq->pColumns, i); @@ -525,7 +526,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } - if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1; + if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -542,6 +543,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); @@ -572,6 +574,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR } } + if (pReq->commentLen > 0) { + pReq->comment = malloc(pReq->commentLen); + if (pReq->comment == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; + } + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; tEndDecode(&decoder); @@ -669,6 +677,123 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { pReq->pFields = NULL; } +int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI8(&encoder, pReq->intervalUnit) < 0) return -1; + if (tEncodeI8(&encoder, pReq->slidingUnit) < 0) return -1; + if (tEncodeI8(&encoder, pReq->timezone) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; + if (tEncodeI64(&encoder, pReq->interval) < 0) return -1; + if (tEncodeI64(&encoder, pReq->offset) < 0) return -1; + if (tEncodeI64(&encoder, pReq->sliding) < 0) return -1; + if (tEncodeI32(&encoder, pReq->exprLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->astLen) < 0) return -1; + if (pReq->exprLen > 0) { + if (tEncodeBinary(&encoder, pReq->expr, pReq->exprLen) < 0) return -1; + } + if (pReq->tagsFilterLen > 0) { + if (tEncodeBinary(&encoder, pReq->tagsFilter, pReq->tagsFilterLen) < 0) return -1; + } + if (pReq->sqlLen > 0) { + if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; + } + if (pReq->astLen > 0) { + if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->intervalUnit) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->slidingUnit) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->timezone) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->interval) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->offset) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->sliding) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->exprLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->astLen) < 0) return -1; + if (pReq->exprLen > 0) { + pReq->expr = malloc(pReq->exprLen); + if (pReq->expr == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->expr) < 0) return -1; + } + if (pReq->tagsFilterLen > 0) { + pReq->tagsFilter = malloc(pReq->tagsFilterLen); + if (pReq->tagsFilter == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->tagsFilter) < 0) return -1; + } + if (pReq->sqlLen > 0) { + pReq->sql = malloc(pReq->sqlLen); + if (pReq->sql == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + } + if (pReq->astLen > 0) { + pReq->ast = malloc(pReq->astLen); + if (pReq->ast == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; + } + + tEndDecode(&decoder); + tCoderClear(&decoder); + return 0; +} + +void tFreeSMCreateSmaReq(SMCreateSmaReq *pReq) { + tfree(pReq->expr); + tfree(pReq->tagsFilter); + tfree(pReq->sql); + tfree(pReq->ast); +} + +int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index fee568d20b..6f719f3d8d 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -123,6 +123,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0); @@ -151,4 +153,6 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 7bbf730744..a98dccbca3 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -268,6 +268,9 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 30b4c923d9..d4f5fa1e3b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -103,6 +103,8 @@ typedef enum { TRN_TYPE_CREATE_STB = 4001, TRN_TYPE_ALTER_STB = 4002, TRN_TYPE_DROP_STB = 4003, + TRN_TYPE_CREATE_SMA = 4004, + TRN_TYPE_DROP_SMA = 4005, TRN_TYPE_STB_SCOPE_END, } ETrnType; @@ -305,6 +307,31 @@ typedef struct { SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; } SVgObj; +typedef struct { + char name[TSDB_TABLE_FNAME_LEN]; + char stb[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createdTime; + int64_t uid; + int64_t stbUid; + int64_t dbUid; + int8_t intervalUnit; + int8_t slidingUnit; + int8_t timezone; + int32_t dstVgId; // for stream + int64_t interval; + int64_t offset; + int64_t sliding; + int32_t exprLen; // strlen + 1 + int32_t tagsFilterLen; + int32_t sqlLen; + int32_t astLen; + char* expr; + char* tagsFilter; + char* sql; + char* ast; +} SSmaObj; + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; @@ -316,10 +343,11 @@ typedef struct { int32_t nextColId; int32_t numOfColumns; int32_t numOfTags; + int32_t commentLen; SSchema* pColumns; SSchema* pTags; + char* comment; SRWLatch lock; - char comment[TSDB_STB_COMMENT_LEN]; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index d5bffada6a..20e85973be 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -122,9 +122,9 @@ typedef struct SMnode { SMsgCb msgCb; } SMnode; -void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); -uint64_t mndGenerateUid(char *name, int32_t len); -void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +int64_t mndGenerateUid(char *name, int32_t len); +void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h new file mode 100644 index 0000000000..4a80f619d3 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_SMA_H_ +#define _TD_MND_SMA_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitSma(SMnode *pMnode); +void mndCleanupSma(SMnode *pMnode); +SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); +void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_SMA_H_*/ diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8536ee8981..eb12347e64 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -427,6 +427,8 @@ char *mndShowStr(int32_t showType) { return "show topics"; case TSDB_MGMT_TABLE_FUNC: return "show functions"; + case TSDB_MGMT_TABLE_INDEX: + return "show indexes"; default: return "undefined"; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c new file mode 100644 index 0000000000..2bd13f395f --- /dev/null +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -0,0 +1,762 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndSma.h" +#include "mndAuth.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndInfoSchema.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.c" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define TSDB_SMA_VER_NUMBER 1 +#define TSDB_SMA_RESERVE_SIZE 64 + +static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma); +static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw); +static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); +static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); +static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); +static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq); +static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq); +static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp); +static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp); +static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); + +int32_t mndInitSma(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_SMA, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndSmaActionEncode, + .decodeFp = (SdbDecodeFp)mndSmaActionDecode, + .insertFp = (SdbInsertFp)mndSmaActionInsert, + .updateFp = (SdbUpdateFp)mndSmaActionUpdate, + .deleteFp = (SdbDeleteFp)mndSmaActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); + mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp); + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndGetSmaMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma); + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupSma(SMnode *pMnode) {} + +static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + + int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + TSDB_SMA_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); + if (pRaw == NULL) goto _OVER; + + int32_t dataPos = 0; + + SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->uid, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->stbUid, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->dbUid, _OVER) + SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER) + SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER) + SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER) + SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER) + if (pSma->exprLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) + } + if (pSma->tagsFilterLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) + } + if (pSma->sqlLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER) + } + if (pSma->astLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) + } + + SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + SDB_SET_DATALEN(pRaw, dataPos, _OVER) + terrno = 0; + +_OVER: + if (terrno != 0) { + mError("sma:%s, failed to encode to raw:%p since %s", pSma->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("sma:%s, encode to raw:%p, row:%p", pSma->name, pRaw, pSma); + return pRaw; +} + +static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; + + if (sver != TSDB_SMA_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto _OVER; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SSmaObj)); + if (pRow == NULL) goto _OVER; + + SSmaObj *pSma = sdbGetRowObj(pRow); + if (pSma == NULL) goto _OVER; + + int32_t dataPos = 0; + + SDB_GET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) + SDB_GET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->uid, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->stbUid, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->dbUid, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER) + + if (pSma->exprLen > 0) { + pSma->expr = calloc(pSma->exprLen, 1); + if (pSma->expr == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) + } + + if (pSma->tagsFilterLen > 0) { + pSma->tagsFilter = calloc(pSma->tagsFilterLen, 1); + if (pSma->tagsFilter == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) + } + + if (pSma->sqlLen > 0) { + pSma->sql = calloc(pSma->sqlLen, 1); + if (pSma->sql == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER) + } + + if (pSma->astLen > 0) { + pSma->ast = calloc(pSma->astLen, 1); + if (pSma->ast == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) + } + + SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + terrno = 0; + +_OVER: + if (terrno != 0) { + mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr()); + tfree(pSma->expr); + tfree(pSma->tagsFilter); + tfree(pRow); + return NULL; + } + + mTrace("sma:%s, decode from raw:%p, row:%p", pSma->name, pRaw, pSma); + return pRow; +} + +static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma) { + mTrace("sma:%s, perform insert action, row:%p", pSma->name, pSma); + return 0; +} + +static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSma) { + mTrace("sma:%s, perform delete action, row:%p", pSma->name, pSma); + tfree(pSma->tagsFilter); + tfree(pSma->expr); + return 0; +} + +static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew) { + mTrace("sma:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + return 0; +} + +SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = sdbAcquire(pSdb, SDB_SMA, smaName); + if (pSma == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_MND_SMA_NOT_EXIST; + } + return pSma; +} + +void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pSma); +} + +SDbObj *mndAcquireDbBySma(SMnode *pMnode, const char *smaName) { + SName name = {0}; + tNameFromString(&name, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + char db[TSDB_TABLE_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, db); + + return mndAcquireDb(pMnode, db); +} + +static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { + SName name = {0}; + tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + SVCreateTSmaReq req = {0}; + req.tSma.version = 0; + req.tSma.intervalUnit = pSma->intervalUnit; + req.tSma.slidingUnit = pSma->slidingUnit; + req.tSma.timezoneInt = pSma->timezone; + tstrncpy(req.tSma.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN); + req.tSma.exprLen = pSma->exprLen; + req.tSma.tagsFilterLen = pSma->tagsFilterLen; + req.tSma.indexUid = pSma->uid; + req.tSma.tableUid = pSma->stbUid; + req.tSma.interval = pSma->interval; + req.tSma.offset = pSma->offset; + req.tSma.sliding = pSma->sliding; + req.tSma.expr = pSma->expr; + req.tSma.tagsFilter = pSma->tagsFilter; + + int32_t contLen = tSerializeSVCreateTSmaReq(NULL, &req) + sizeof(SMsgHead); + SMsgHead *pHead = malloc(contLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); + tSerializeSVCreateTSmaReq(&pBuf, &req); + + *pContLen = contLen; + return pHead; +} + +static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { + SName name = {0}; + tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + + SVDropTSmaReq req = {0}; + req.ver = 0; + req.indexUid = pSma->uid; + tstrncpy(req.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN); + + int32_t contLen = tSerializeSVDropTSmaReq(NULL, &req) + sizeof(SMsgHead); + SMsgHead *pHead = malloc(contLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); + tDeserializeSVDropTSmaReq(&pBuf, &req); + + *pContLen = contLen; + return pHead; +} + +static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { + SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { + SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + +static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + int32_t contLen; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + void *pReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &contLen); + if (pReq == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_CREATE_SMA; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pReq); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { + SSmaObj smaObj = {0}; + memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); + memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); + memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN); + smaObj.createdTime = taosGetTimestampMs(); + smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + smaObj.stbUid = pStb->uid; + smaObj.dbUid = pStb->dbUid; + smaObj.intervalUnit = pCreate->intervalUnit; + smaObj.slidingUnit = pCreate->slidingUnit; + smaObj.timezone = pCreate->timezone; + smaObj.dstVgId = pCreate->dstVgId; + smaObj.interval = pCreate->interval; + smaObj.offset = pCreate->offset; + smaObj.sliding = pCreate->sliding; + smaObj.exprLen = pCreate->exprLen; + smaObj.tagsFilterLen = pCreate->tagsFilterLen; + smaObj.sqlLen = pCreate->sqlLen; + smaObj.astLen = pCreate->astLen; + + if (smaObj.exprLen > 0) { + smaObj.expr = malloc(smaObj.exprLen); + if (smaObj.expr == NULL) goto _OVER; + memcpy(smaObj.expr, pCreate->expr, smaObj.exprLen); + } + + if (smaObj.tagsFilterLen > 0) { + smaObj.tagsFilter = malloc(smaObj.tagsFilterLen); + if (smaObj.tagsFilter == NULL) goto _OVER; + memcpy(smaObj.tagsFilter, pCreate->tagsFilter, smaObj.tagsFilterLen); + } + + if (smaObj.sqlLen > 0) { + smaObj.sql = malloc(smaObj.sqlLen); + if (smaObj.sql == NULL) goto _OVER; + memcpy(smaObj.sql, pCreate->sql, smaObj.sqlLen); + } + + if (smaObj.astLen > 0) { + smaObj.ast = malloc(smaObj.astLen); + if (smaObj.ast == NULL) goto _OVER; + memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); + } + + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); + if (pTrans == NULL) goto _OVER; + + mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); + mndTransSetDbInfo(pTrans, pDb); + + if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; + if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; + if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + + code = 0; + +_OVER: + mndTransDrop(pTrans); + return code; +} + +static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + if (pCreate->name[0] == 0) return -1; + if (pCreate->stb[0] == 0) return -1; + if (pCreate->igExists < 0 || pCreate->igExists > 1) return -1; + if (pCreate->intervalUnit < 0) return -1; + if (pCreate->slidingUnit < 0) return -1; + if (pCreate->timezone < 0) return -1; + if (pCreate->dstVgId < 0) return -1; + if (pCreate->interval < 0) return -1; + if (pCreate->offset < 0) return -1; + if (pCreate->sliding < 0) return -1; + if (pCreate->exprLen < 0) return -1; + if (pCreate->tagsFilterLen < 0) return -1; + if (pCreate->sqlLen < 0) return -1; + if (pCreate->astLen < 0) return -1; + if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) return -1; + if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) return -1; + if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) return -1; + if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) return -1; + + SName smaName = {0}; + if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1; + if (*(char *)tNameGetTableName(&smaName) == 0) return -1; + + terrno = 0; + return 0; +} + +static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SStbObj *pStb = NULL; + SSmaObj *pSma = NULL; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + SMCreateSmaReq createReq = {0}; + + if (tDeserializeSMCreateSmaReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mDebug("sma:%s, start to create", createReq.name); + if (mndCheckCreateSmaReq(&createReq) != 0) { + goto _OVER; + } + + pStb = mndAcquireStb(pMnode, createReq.stb); + if (pStb == NULL) { + mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); + goto _OVER; + } + + pSma = mndAcquireSma(pMnode, createReq.name); + if (pSma != NULL) { + if (createReq.igExists) { + mDebug("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name); + code = 0; + goto _OVER; + } else { + terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST; + goto _OVER; + } + } + + pDb = mndAcquireDbBySma(pMnode, createReq.name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto _OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto _OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto _OVER; + } + + code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("sma:%s, failed to create since %s", createReq.name, terrstr()); + } + + mndReleaseStb(pMnode, pStb); + mndReleaseSma(pMnode, pSma); + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + tFreeSMCreateSmaReq(&createReq); + + return code; +} + +static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp) { + mndTransProcessRsp(pRsp); + return 0; +} + +static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { + SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { + SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + + return 0; +} + +static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + int32_t contLen; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + int32_t contLen = 0; + void *pReq = mndBuildVDropSmaReq(pMnode, pVgroup, pSma, &contLen); + if (pReq == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_DROP_SMA; + action.acceptableCode = TSDB_CODE_VND_SMA_NOT_EXIST; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pReq); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndDropSma(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, &pReq->rpcMsg); + if (pTrans == NULL) goto _OVER; + + mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); + mndTransSetDbInfo(pTrans, pDb); + + if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + + code = 0; + +_OVER: + mndTransDrop(pTrans); + return code; +} + +static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SSmaObj *pSma = NULL; + SMDropSmaReq dropReq = {0}; + + if (tDeserializeSMDropSmaReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mDebug("sma:%s, start to drop", dropReq.name); + + pSma = mndAcquireSma(pMnode, dropReq.name); + if (pSma == NULL) { + if (dropReq.igNotExists) { + mDebug("sma:%s, not exist, ignore not exist is set", dropReq.name); + code = 0; + goto _OVER; + } else { + terrno = TSDB_CODE_MND_SMA_NOT_EXIST; + goto _OVER; + } + } + + pDb = mndAcquireDbBySma(pMnode, dropReq.name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto _OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto _OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto _OVER; + } + + code = mndDropSma(pMnode, pReq, pDb, pSma); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("sma:%s, failed to drop since %s", dropReq.name, terrstr()); + } + + mndReleaseDb(pMnode, pDb); + mndReleaseSma(pMnode, pSma); + mndReleaseUser(pMnode, pUser); + + return code; +} + +static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) { + mndTransProcessRsp(pRsp); + return 0; +} + +static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchemas; + + pShow->bytes[cols] = TSDB_INDEX_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "stb"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + + pMeta->numOfColumns = cols; + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_SMA); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbName, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SSmaObj *pSma = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[TSDB_DB_FNAME_LEN] = {0}; + + SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return 0; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_SMA, pShow->pIter, (void **)&pSma); + if (pShow->pIter == NULL) break; + + if (pSma->dbUid != pDb->uid) { + sdbRelease(pSdb, pSma); + continue; + } + + cols = 0; + + SName smaName = {0}; + tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, (char *)tNameGetTableName(&smaName)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pSma->createdTime; + cols++; + + SName stbName = {0}; + tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, (char *)tNameGetTableName(&stbName)); + cols++; + + numOfRows++; + sdbRelease(pSdb, pSma); + } + + mndReleaseDb(pMnode, pDb); + pShow->numOfReads += numOfRows; + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + return numOfRows; +} + +static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7b0de9d8bd..e8d6157661 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -13,20 +13,19 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "mndStb.h" #include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" +#include "mndInfoSchema.h" #include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "mndInfoSchema.h" #include "tname.h" -#define TSDB_STB_VER_NUMBER 1 +#define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_RESERVE_SIZE 64 static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); @@ -88,6 +87,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER) for (int32_t i = 0; i < pStb->numOfColumns; ++i) { SSchema *pSchema = &pStb->pColumns[i]; @@ -105,7 +105,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) } - SDB_SET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) @@ -150,6 +150,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER) pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); @@ -173,7 +174,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) } - SDB_GET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_DECODE_OVER) + if (pStb->commentLen > 0) { + pStb->comment = calloc(pStb->commentLen, 1); + if (pStb->comment == NULL) goto STB_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_DECODE_OVER) + } SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER) terrno = 0; @@ -183,6 +188,7 @@ STB_DECODE_OVER: mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr()); tfree(pStb->pColumns); tfree(pStb->pTags); + tfree(pStb->comment); tfree(pRow); return NULL; } @@ -200,6 +206,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); tfree(pStb->pColumns); tfree(pStb->pTags); + tfree(pStb->comment); return 0; } @@ -502,6 +509,13 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre stbObj.nextColId = 1; stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfTags = pCreate->numOfTags; + stbObj.commentLen = pCreate->commentLen; + stbObj.comment = calloc(stbObj.commentLen, 1); + if (stbObj.comment == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen); stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema)); stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema)); @@ -1162,7 +1176,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_STB_OVER; mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); @@ -1568,7 +1582,11 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, pStb->comment); + if (pStb->commentLen != 0) { + STR_TO_VARSTR(pWrite, pStb->comment); + } else { + STR_TO_VARSTR(pWrite, ""); + } cols++; numOfRows++; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d9b5341ba1..fb6906b5c9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -299,7 +299,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index e2a6e49b56..aa39c740ad 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -406,6 +406,10 @@ static const char *mndTransType(ETrnType type) { return "alter-stb"; case TRN_TYPE_DROP_STB: return "drop-stb"; + case TRN_TYPE_CREATE_SMA: + return "create-sma"; + case TRN_TYPE_DROP_SMA: + return "drop-sma"; default: return "invalid"; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 6400bf69f1..76687fc5cc 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -28,6 +28,7 @@ #include "mndProfile.h" #include "mndQnode.h" #include "mndShow.h" +#include "mndSma.h" #include "mndSnode.h" #include "mndStb.h" #include "mndStream.h" @@ -204,6 +205,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; @@ -409,15 +411,15 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } // Note: uid 0 is reserved -uint64_t mndGenerateUid(char *name, int32_t len) { +int64_t mndGenerateUid(char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); do { int64_t us = taosGetTimestampUs(); - uint64_t x = (us & 0x000000FFFFFFFFFF) << 24; - uint64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); + int64_t x = (us & 0x000000FFFFFFFFFF) << 24; + int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); if (uuid) { - return uuid; + return abs(uuid); } } while (true); } diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index df0510f783..61201f33c3 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -12,5 +12,6 @@ add_subdirectory(dnode) add_subdirectory(mnode) add_subdirectory(db) add_subdirectory(stb) +add_subdirectory(sma) add_subdirectory(func) add_subdirectory(topic) diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt new file mode 100644 index 0000000000..943695abf3 --- /dev/null +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. SMA_SRC) +add_executable(mnode_test_sma ${SMA_SRC}) +target_link_libraries( + mnode_test_sma + PUBLIC sut +) + +add_test( + NAME mnode_test_sma + COMMAND mnode_test_sma +) diff --git a/source/dnode/mnode/impl/test/sma/sma.cpp b/source/dnode/mnode/impl/test/sma/sma.cpp new file mode 100644 index 0000000000..5b48906681 --- /dev/null +++ b/source/dnode/mnode/impl/test/sma/sma.cpp @@ -0,0 +1,236 @@ +/** + * @file sma.cpp + * @author slguan (slguan@taosdata.com) + * @brief MNODE module sma tests + * @version 1.0 + * @date 2022-03-23 + * + * @copyright Copyright (c) 2022 + * + */ + +#include "sut.h" + +class MndTestSma : public ::testing::Test { + protected: + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_sma", 9035); } + static void TearDownTestSuite() { test.Cleanup(); } + + static Testbase test; + + public: + void SetUp() override {} + void TearDown() override {} + + void* BuildCreateDbReq(const char* dbname, int32_t* pContLen); + void* BuildDropDbReq(const char* dbname, int32_t* pContLen); + void* BuildCreateStbReq(const char* stbname, int32_t* pContLen); + void* BuildDropStbReq(const char* stbname, int32_t* pContLen); + void* BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, + const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen); + void* BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen); +}; + +Testbase MndTestSma::test; + +void* MndTestSma::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { + SCreateDbReq createReq = {0}; + strcpy(createReq.db, dbname); + createReq.numOfVgroups = 2; + createReq.cacheBlockSize = 16; + createReq.totalBlocks = 10; + createReq.daysPerFile = 10; + createReq.daysToKeep0 = 3650; + createReq.daysToKeep1 = 3650; + createReq.daysToKeep2 = 3650; + createReq.minRows = 100; + createReq.maxRows = 4096; + createReq.commitTime = 3600; + createReq.fsyncPeriod = 3000; + createReq.walLevel = 1; + createReq.precision = 0; + createReq.compression = 2; + createReq.replications = 1; + createReq.quorum = 1; + createReq.update = 0; + createReq.cacheLastRow = 0; + createReq.ignoreExist = 1; + + int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateDbReq(pReq, contLen, &createReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildDropDbReq(const char* dbname, int32_t* pContLen) { + SDropDbReq dropdbReq = {0}; + strcpy(dropdbReq.db, dbname); + + int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSDropDbReq(pReq, contLen, &dropdbReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { + SMCreateStbReq createReq = {0}; + createReq.numOfColumns = 3; + createReq.numOfTags = 1; + createReq.igExists = 0; + createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + strcpy(createReq.name, stbname); + + { + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(field.name, "ts"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 2; + field.type = TSDB_DATA_TYPE_TINYINT; + strcpy(field.name, "col1"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_BIGINT; + strcpy(field.name, "col2"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 2; + field.type = TSDB_DATA_TYPE_TINYINT; + strcpy(field.name, "tag1"); + taosArrayPush(createReq.pTags, &field); + } + + int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq); + void* pHead = rpcMallocCont(tlen); + tSerializeSMCreateStbReq(pHead, tlen, &createReq); + tFreeSMCreateStbReq(&createReq); + *pContLen = tlen; + return pHead; +} + +void* MndTestSma::BuildDropStbReq(const char* stbname, int32_t* pContLen) { + SMDropStbReq dropstbReq = {0}; + strcpy(dropstbReq.name, stbname); + + int32_t contLen = tSerializeSMDropStbReq(NULL, 0, &dropstbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMDropStbReq(pReq, contLen, &dropstbReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, + const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) { + SMCreateSmaReq createReq = {0}; + strcpy(createReq.name, smaname); + strcpy(createReq.stb, stbname); + createReq.igExists = igExists; + createReq.intervalUnit = 1; + createReq.slidingUnit = 2; + createReq.timezone = 3; + createReq.dstVgId = 4; + createReq.interval = 10; + createReq.offset = 5; + createReq.sliding = 6; + createReq.expr = (char*)expr; + createReq.exprLen = strlen(createReq.expr) + 1; + createReq.tagsFilter = (char*)tagsFilter; + createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1; + createReq.sql = (char*)sql; + createReq.sqlLen = strlen(createReq.sql) + 1; + createReq.ast = (char*)expr; + createReq.astLen = strlen(createReq.ast) + 1; + + int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq); + void* pHead = rpcMallocCont(tlen); + tSerializeSMCreateSmaReq(pHead, tlen, &createReq); + *pContLen = tlen; + return pHead; +} + +void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) { + SMDropSmaReq dropsmaReq = {0}; + dropsmaReq.igNotExists = igNotExists; + strcpy(dropsmaReq.name, smaname); + + int32_t contLen = tSerializeSMDropSmaReq(NULL, 0, &dropsmaReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMDropSmaReq(pReq, contLen, &dropsmaReq); + + *pContLen = contLen; + return pReq; +} + +TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { + const char* dbname = "1.d1"; + const char* stbname = "1.d1.stb"; + const char* smaname = "1.d1.sma"; + int32_t contLen = 0; + void* pReq; + SRpcMsg* pRsp; + + { + pReq = BuildCreateDbReq(dbname, &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + } + + { + pReq = BuildCreateStbReq(stbname, &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + } + + { + pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + } + + // restart + test.Restart(); + + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + CHECK_META("show indexes", 3); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckBinary("sma", TSDB_INDEX_NAME_LEN); + CheckTimestamp(); + CheckBinary("stb", TSDB_TABLE_NAME_LEN); + } + + { + pReq = BuildDropSmaReq(smaname, 0, &contLen); + pRsp = test.SendReq(TDMT_MND_DROP_SMA, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } +} diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 5eeecc5d24..ede2e2aa0e 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -168,6 +168,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA +#if 0 SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -189,10 +190,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // } tdDestroyTSma(&vCreateSmaReq.tSma); // TODO: return directly or go on follow steps? +#endif } break; case TDMT_VND_CANCEL_SMA: { // timeRangeSMA } break; case TDMT_VND_DROP_SMA: { // timeRangeSMA +#if 0 SVDropTSmaReq vDropSmaReq = {0}; if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -209,6 +212,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // return -1; // } // TODO: return directly or go on follow steps? +#endif } break; default: ASSERT(0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b7ab007cf5..1dad55d99e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -270,6 +270,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill // mnode-topic TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported") +// mnode-sma +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option") + // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline") @@ -305,6 +310,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") // tsdb