From e9831674b231110dae66b486c851f547a4b4aa09 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 23 Mar 2022 17:53:00 +0800 Subject: [PATCH] sma --- include/common/tmsg.h | 14 +- include/util/taoserror.h | 2 + source/common/src/tmsg.c | 38 ++- source/dnode/mgmt/mnode/src/mmMsg.c | 2 + source/dnode/mgmt/vnode/src/vmMsg.c | 3 + source/dnode/mnode/impl/inc/mndDef.h | 10 +- source/dnode/mnode/impl/inc/mndInt.h | 6 +- source/dnode/mnode/impl/src/mndShow.c | 2 + source/dnode/mnode/impl/src/mndSma.c | 177 +++++++------ source/dnode/mnode/impl/src/mndStb.c | 19 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 10 +- source/dnode/mnode/impl/test/CMakeLists.txt | 1 + .../dnode/mnode/impl/test/sma/CMakeLists.txt | 11 + source/dnode/mnode/impl/test/sma/sma.cpp | 236 ++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeWrite.c | 4 + source/util/src/terror.c | 2 + 17 files changed, 444 insertions(+), 95 deletions(-) create mode 100644 source/dnode/mnode/impl/test/sma/CMakeLists.txt create mode 100644 source/dnode/mnode/impl/test/sma/sma.cpp diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4fc3f6ce8f..0df8cff670 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -270,9 +270,10 @@ typedef struct { int8_t igExists; int32_t numOfColumns; int32_t numOfTags; + int32_t commentLen; SArray* pColumns; SArray* pTags; - char comment[TSDB_STB_COMMENT_LEN]; + char *comment; } SMCreateStbReq; int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); @@ -1906,13 +1907,18 @@ typedef struct { int8_t intervalUnit; int8_t slidingUnit; int8_t timezone; + int32_t dstVgId; // for stream int64_t interval; int64_t offset; int64_t sliding; - int32_t exprLen; - int32_t tagsFilterLen; - char* expr; + int32_t exprLen; // strlen + 1 + int32_t tagsFilterLen; // strlen + 1 + int32_t sqlLen; // strlen + 1 + int32_t astLen; // strlen + 1 + char* expr; char* tagsFilter; + char* sql; + char* ast; } SMCreateSmaReq; int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 09b9ac7121..6d394aa2f2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -277,6 +277,7 @@ int32_t* taosGetErrno(); // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0401) +#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0402) // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x04A0) @@ -313,6 +314,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) #define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) +#define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d4e9dfed73..4692a77ff9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -430,6 +430,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; + if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; for (int32_t i = 0; i < pReq->numOfColumns; ++i) { SField *pField = taosArrayGet(pReq->pColumns, i); @@ -445,7 +446,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq if (tEncodeCStr(&encoder, pField->name) < 0) return -1; } - if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1; + if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -462,6 +463,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); @@ -492,6 +494,12 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR } } + if (pReq->commentLen > 0) { + pReq->comment = malloc(pReq->commentLen); + if (pReq->comment == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; + } + if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; tEndDecode(&decoder); @@ -600,17 +608,26 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq if (tEncodeI8(&encoder, pReq->intervalUnit) < 0) return -1; if (tEncodeI8(&encoder, pReq->slidingUnit) < 0) return -1; if (tEncodeI8(&encoder, pReq->timezone) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; if (tEncodeI64(&encoder, pReq->interval) < 0) return -1; if (tEncodeI64(&encoder, pReq->offset) < 0) return -1; if (tEncodeI64(&encoder, pReq->sliding) < 0) return -1; if (tEncodeI32(&encoder, pReq->exprLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; + if (tEncodeI32(&encoder, pReq->astLen) < 0) return -1; if (pReq->exprLen > 0) { if (tEncodeBinary(&encoder, pReq->expr, pReq->exprLen) < 0) return -1; } - if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1; if (pReq->tagsFilterLen > 0) { if (tEncodeBinary(&encoder, pReq->tagsFilter, pReq->tagsFilterLen) < 0) return -1; } + if (pReq->sqlLen > 0) { + if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; + } + if (pReq->astLen > 0) { + if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -629,21 +646,34 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR if (tDecodeI8(&decoder, &pReq->intervalUnit) < 0) return -1; if (tDecodeI8(&decoder, &pReq->slidingUnit) < 0) return -1; if (tDecodeI8(&decoder, &pReq->timezone) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->interval) < 0) return -1; if (tDecodeI64(&decoder, &pReq->offset) < 0) return -1; if (tDecodeI64(&decoder, &pReq->sliding) < 0) return -1; if (tDecodeI32(&decoder, &pReq->exprLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->astLen) < 0) return -1; if (pReq->exprLen > 0) { pReq->expr = malloc(pReq->exprLen); if (pReq->expr == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->expr) < 0) return -1; } - if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1; if (pReq->tagsFilterLen > 0) { pReq->tagsFilter = malloc(pReq->tagsFilterLen); if (pReq->tagsFilter == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->tagsFilter) < 0) return -1; } + if (pReq->sqlLen > 0) { + pReq->sql = malloc(pReq->sqlLen); + if (pReq->sql == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + } + if (pReq->astLen > 0) { + pReq->ast = malloc(pReq->astLen); + if (pReq->ast == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; + } tEndDecode(&decoder); tCoderClear(&decoder); @@ -653,6 +683,8 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR void tFreeSMCreateSmaReq(SMCreateSmaReq *pReq) { tfree(pReq->expr); tfree(pReq->tagsFilter); + tfree(pReq->sql); + tfree(pReq->ast); } int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) { diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index b6ff4f3ae1..d6d65be41c 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -150,5 +150,7 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0); } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index e4a4cfcd9f..08901504f4 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -268,6 +268,9 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1be5279d17..c5372f6875 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -318,13 +318,18 @@ typedef struct { int8_t intervalUnit; int8_t slidingUnit; int8_t timezone; + int32_t dstVgId; // for stream int64_t interval; int64_t offset; int64_t sliding; - int32_t exprLen; + int32_t exprLen; // strlen + 1 int32_t tagsFilterLen; + int32_t sqlLen; + int32_t astLen; char* expr; char* tagsFilter; + char* sql; + char* ast; } SSmaObj; typedef struct { @@ -338,10 +343,11 @@ typedef struct { int32_t nextColId; int32_t numOfColumns; int32_t numOfTags; + int32_t commentLen; SSchema* pColumns; SSchema* pTags; + char* comment; SRWLatch lock; - char comment[TSDB_STB_COMMENT_LEN]; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index d5bffada6a..20e85973be 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -122,9 +122,9 @@ typedef struct SMnode { SMsgCb msgCb; } SMnode; -void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); -uint64_t mndGenerateUid(char *name, int32_t len); -void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +int64_t mndGenerateUid(char *name, int32_t len); +void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 33a8b7fbe5..d675d441ba 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -411,6 +411,8 @@ char *mndShowStr(int32_t showType) { return "show topics"; case TSDB_MGMT_TABLE_FUNC: return "show functions"; + case TSDB_MGMT_TABLE_INDEX: + return "show indexes"; default: return "undefined"; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index bd459b873b..2bd13f395f 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -84,17 +84,29 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_INT8(pRaw, dataPos, pSma->intervalUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pSma->slidingUnit, _OVER) SDB_SET_INT8(pRaw, dataPos, pSma->timezone, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->dstVgId, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->interval, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->offset, _OVER) SDB_SET_INT64(pRaw, dataPos, pSma->sliding, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->exprLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER) + SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER) + if (pSma->exprLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) + } + if (pSma->tagsFilterLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) + } + if (pSma->sqlLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER) + } + if (pSma->astLen > 0) { + SDB_SET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) + } SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) - terrno = 0; _OVER: @@ -137,22 +149,40 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pSma->intervalUnit, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSma->slidingUnit, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSma->timezone, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->dstVgId, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->interval, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->offset, _OVER) SDB_GET_INT64(pRaw, dataPos, &pSma->sliding, _OVER) SDB_GET_INT32(pRaw, dataPos, &pSma->exprLen, _OVER) SDB_GET_INT32(pRaw, dataPos, &pSma->tagsFilterLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->sqlLen, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pSma->astLen, _OVER) - pSma->expr = calloc(pSma->exprLen, 1); - pSma->tagsFilter = calloc(pSma->tagsFilterLen, 1); - if (pSma->expr == NULL || pSma->tagsFilter == NULL) { - goto _OVER; + if (pSma->exprLen > 0) { + pSma->expr = calloc(pSma->exprLen, 1); + if (pSma->expr == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) } - SDB_GET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) - SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) - SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + if (pSma->tagsFilterLen > 0) { + pSma->tagsFilter = calloc(pSma->tagsFilterLen, 1); + if (pSma->tagsFilter == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->tagsFilter, pSma->tagsFilterLen, _OVER) + } + if (pSma->sqlLen > 0) { + pSma->sql = calloc(pSma->sqlLen, 1); + if (pSma->sql == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->sql, pSma->sqlLen, _OVER) + } + + if (pSma->astLen > 0) { + pSma->ast = calloc(pSma->astLen, 1); + if (pSma->ast == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, pSma->ast, pSma->astLen, _OVER) + } + + SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) terrno = 0; _OVER: @@ -217,7 +247,7 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm req.tSma.version = 0; req.tSma.intervalUnit = pSma->intervalUnit; req.tSma.slidingUnit = pSma->slidingUnit; - req.tSma.slidingUnit = pSma->timezone; + req.tSma.timezoneInt = pSma->timezone; tstrncpy(req.tSma.indexName, (char *)tNameGetTableName(&name), TSDB_INDEX_NAME_LEN); req.tSma.exprLen = pSma->exprLen; req.tSma.tagsFilterLen = pSma->tagsFilterLen; @@ -281,15 +311,6 @@ static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj * return 0; } -static int32_t mndSetCreateSmaUndoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { - SSdbRaw *pUndoRaw = mndSmaActionEncode(pSma); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; - - return 0; -} - static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma); if (pCommitRaw == NULL) return -1; @@ -338,77 +359,61 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndSetCreateSmaUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) { - sdbRelease(pSdb, pVgroup); - continue; - } - - int32_t contLen = 0; - void *pReq = mndBuildVDropSmaReq(pMnode, pVgroup, pSma, &contLen); - if (pReq == NULL) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_VND_DROP_SMA; - if (mndTransAppendUndoAction(pTrans, &action) != 0) { - free(pReq); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndCreateTSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { SSmaObj smaObj = {0}; memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN); + memcpy(smaObj.db, pDb->name, TSDB_DB_FNAME_LEN); smaObj.createdTime = taosGetTimestampMs(); smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; + smaObj.dbUid = pStb->dbUid; smaObj.intervalUnit = pCreate->intervalUnit; smaObj.slidingUnit = pCreate->slidingUnit; smaObj.timezone = pCreate->timezone; + smaObj.dstVgId = pCreate->dstVgId; smaObj.interval = pCreate->interval; smaObj.offset = pCreate->offset; smaObj.sliding = pCreate->sliding; smaObj.exprLen = pCreate->exprLen; smaObj.tagsFilterLen = pCreate->tagsFilterLen; - smaObj.expr = pCreate->expr; - pCreate->expr = NULL; - smaObj.tagsFilter = pCreate->tagsFilter; - pCreate->tagsFilter = NULL; + smaObj.sqlLen = pCreate->sqlLen; + smaObj.astLen = pCreate->astLen; + + if (smaObj.exprLen > 0) { + smaObj.expr = malloc(smaObj.exprLen); + if (smaObj.expr == NULL) goto _OVER; + memcpy(smaObj.expr, pCreate->expr, smaObj.exprLen); + } + + if (smaObj.tagsFilterLen > 0) { + smaObj.tagsFilter = malloc(smaObj.tagsFilterLen); + if (smaObj.tagsFilter == NULL) goto _OVER; + memcpy(smaObj.tagsFilter, pCreate->tagsFilter, smaObj.tagsFilterLen); + } + + if (smaObj.sqlLen > 0) { + smaObj.sql = malloc(smaObj.sqlLen); + if (smaObj.sql == NULL) goto _OVER; + memcpy(smaObj.sql, pCreate->sql, smaObj.sqlLen); + } + + if (smaObj.astLen > 0) { + smaObj.ast = malloc(smaObj.astLen); + if (smaObj.ast == NULL) goto _OVER; + memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); + } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, &pReq->rpcMsg); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mndTransSetDbInfo(pTrans, pDb); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaUndoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndSetCreateSmaUndoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -419,11 +424,31 @@ _OVER: } static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { - if (pCreate->igExists < 0 || pCreate->igExists > 1) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; - return -1; - } + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + if (pCreate->name[0] == 0) return -1; + if (pCreate->stb[0] == 0) return -1; + if (pCreate->igExists < 0 || pCreate->igExists > 1) return -1; + if (pCreate->intervalUnit < 0) return -1; + if (pCreate->slidingUnit < 0) return -1; + if (pCreate->timezone < 0) return -1; + if (pCreate->dstVgId < 0) return -1; + if (pCreate->interval < 0) return -1; + if (pCreate->offset < 0) return -1; + if (pCreate->sliding < 0) return -1; + if (pCreate->exprLen < 0) return -1; + if (pCreate->tagsFilterLen < 0) return -1; + if (pCreate->sqlLen < 0) return -1; + if (pCreate->astLen < 0) return -1; + if (pCreate->exprLen != 0 && strlen(pCreate->expr) + 1 != pCreate->exprLen) return -1; + if (pCreate->tagsFilterLen != 0 && strlen(pCreate->tagsFilter) + 1 != pCreate->tagsFilterLen) return -1; + if (pCreate->sqlLen != 0 && strlen(pCreate->sql) + 1 != pCreate->sqlLen) return -1; + if (pCreate->astLen != 0 && strlen(pCreate->ast) + 1 != pCreate->astLen) return -1; + SName smaName = {0}; + if (tNameFromString(&smaName, pCreate->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE) < 0) return -1; + if (*(char *)tNameGetTableName(&smaName) == 0) return -1; + + terrno = 0; return 0; } @@ -479,7 +504,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { goto _OVER; } - code = mndCreateTSma(pMnode, pReq, &createReq, pDb, pStb); + code = mndCreateSma(pMnode, pReq, &createReq, pDb, pStb); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; _OVER: @@ -547,7 +572,7 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_VND_DROP_SMA; - action.acceptableCode = TSDB_CODE_VND_TB_NOT_EXIST; + action.acceptableCode = TSDB_CODE_VND_SMA_NOT_EXIST; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pReq); sdbCancelFetch(pSdb, pIter); @@ -562,7 +587,7 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndDropSma(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, &pReq->rpcMsg); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); @@ -649,7 +674,7 @@ static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe int32_t cols = 0; SSchema *pSchema = pMeta->pSchemas; - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pShow->bytes[cols] = TSDB_INDEX_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "name"); pSchema[cols].bytes = pShow->bytes[cols]; @@ -663,7 +688,7 @@ static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "name"); + strcpy(pSchema[cols].name, "stb"); pSchema[cols].bytes = pShow->bytes[cols]; cols++; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6bf0b383ef..47565bec8b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -87,6 +87,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER) for (int32_t i = 0; i < pStb->numOfColumns; ++i) { SSchema *pSchema = &pStb->pColumns[i]; @@ -104,7 +105,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) } - SDB_SET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) @@ -149,6 +150,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER) pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); @@ -172,7 +174,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) } - SDB_GET_BINARY(pRaw, dataPos, pStb->comment, TSDB_STB_COMMENT_LEN, STB_DECODE_OVER) + if (pStb->commentLen > 0) { + pStb->comment = calloc(pStb->commentLen, 1); + if (pStb->comment == NULL) goto STB_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_DECODE_OVER) + } SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_DECODE_OVER) terrno = 0; @@ -182,6 +188,7 @@ STB_DECODE_OVER: mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr()); tfree(pStb->pColumns); tfree(pStb->pTags); + tfree(pStb->comment); tfree(pRow); return NULL; } @@ -199,6 +206,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); tfree(pStb->pColumns); tfree(pStb->pTags); + tfree(pStb->comment); return 0; } @@ -501,6 +509,13 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre stbObj.nextColId = 1; stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfTags = pCreate->numOfTags; + stbObj.commentLen = pCreate->commentLen; + stbObj.comment = calloc(stbObj.commentLen, 1); + if (stbObj.comment == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen); stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema)); stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema)); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 99aded0292..7a48d81d31 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -223,7 +223,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.physicalPlan = ""; streamObj.logicalPlan = ""; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 6400bf69f1..76687fc5cc 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -28,6 +28,7 @@ #include "mndProfile.h" #include "mndQnode.h" #include "mndShow.h" +#include "mndSma.h" #include "mndSnode.h" #include "mndStb.h" #include "mndStream.h" @@ -204,6 +205,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; @@ -409,15 +411,15 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } // Note: uid 0 is reserved -uint64_t mndGenerateUid(char *name, int32_t len) { +int64_t mndGenerateUid(char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); do { int64_t us = taosGetTimestampUs(); - uint64_t x = (us & 0x000000FFFFFFFFFF) << 24; - uint64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); + int64_t x = (us & 0x000000FFFFFFFFFF) << 24; + int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); if (uuid) { - return uuid; + return abs(uuid); } } while (true); } diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index df0510f783..61201f33c3 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -12,5 +12,6 @@ add_subdirectory(dnode) add_subdirectory(mnode) add_subdirectory(db) add_subdirectory(stb) +add_subdirectory(sma) add_subdirectory(func) add_subdirectory(topic) diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt new file mode 100644 index 0000000000..943695abf3 --- /dev/null +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. SMA_SRC) +add_executable(mnode_test_sma ${SMA_SRC}) +target_link_libraries( + mnode_test_sma + PUBLIC sut +) + +add_test( + NAME mnode_test_sma + COMMAND mnode_test_sma +) diff --git a/source/dnode/mnode/impl/test/sma/sma.cpp b/source/dnode/mnode/impl/test/sma/sma.cpp new file mode 100644 index 0000000000..5b48906681 --- /dev/null +++ b/source/dnode/mnode/impl/test/sma/sma.cpp @@ -0,0 +1,236 @@ +/** + * @file sma.cpp + * @author slguan (slguan@taosdata.com) + * @brief MNODE module sma tests + * @version 1.0 + * @date 2022-03-23 + * + * @copyright Copyright (c) 2022 + * + */ + +#include "sut.h" + +class MndTestSma : public ::testing::Test { + protected: + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_sma", 9035); } + static void TearDownTestSuite() { test.Cleanup(); } + + static Testbase test; + + public: + void SetUp() override {} + void TearDown() override {} + + void* BuildCreateDbReq(const char* dbname, int32_t* pContLen); + void* BuildDropDbReq(const char* dbname, int32_t* pContLen); + void* BuildCreateStbReq(const char* stbname, int32_t* pContLen); + void* BuildDropStbReq(const char* stbname, int32_t* pContLen); + void* BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, + const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen); + void* BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen); +}; + +Testbase MndTestSma::test; + +void* MndTestSma::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { + SCreateDbReq createReq = {0}; + strcpy(createReq.db, dbname); + createReq.numOfVgroups = 2; + createReq.cacheBlockSize = 16; + createReq.totalBlocks = 10; + createReq.daysPerFile = 10; + createReq.daysToKeep0 = 3650; + createReq.daysToKeep1 = 3650; + createReq.daysToKeep2 = 3650; + createReq.minRows = 100; + createReq.maxRows = 4096; + createReq.commitTime = 3600; + createReq.fsyncPeriod = 3000; + createReq.walLevel = 1; + createReq.precision = 0; + createReq.compression = 2; + createReq.replications = 1; + createReq.quorum = 1; + createReq.update = 0; + createReq.cacheLastRow = 0; + createReq.ignoreExist = 1; + + int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateDbReq(pReq, contLen, &createReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildDropDbReq(const char* dbname, int32_t* pContLen) { + SDropDbReq dropdbReq = {0}; + strcpy(dropdbReq.db, dbname); + + int32_t contLen = tSerializeSDropDbReq(NULL, 0, &dropdbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSDropDbReq(pReq, contLen, &dropdbReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { + SMCreateStbReq createReq = {0}; + createReq.numOfColumns = 3; + createReq.numOfTags = 1; + createReq.igExists = 0; + createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + strcpy(createReq.name, stbname); + + { + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(field.name, "ts"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 2; + field.type = TSDB_DATA_TYPE_TINYINT; + strcpy(field.name, "col1"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_BIGINT; + strcpy(field.name, "col2"); + taosArrayPush(createReq.pColumns, &field); + } + + { + SField field = {0}; + field.bytes = 2; + field.type = TSDB_DATA_TYPE_TINYINT; + strcpy(field.name, "tag1"); + taosArrayPush(createReq.pTags, &field); + } + + int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq); + void* pHead = rpcMallocCont(tlen); + tSerializeSMCreateStbReq(pHead, tlen, &createReq); + tFreeSMCreateStbReq(&createReq); + *pContLen = tlen; + return pHead; +} + +void* MndTestSma::BuildDropStbReq(const char* stbname, int32_t* pContLen) { + SMDropStbReq dropstbReq = {0}; + strcpy(dropstbReq.name, stbname); + + int32_t contLen = tSerializeSMDropStbReq(NULL, 0, &dropstbReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMDropStbReq(pReq, contLen, &dropstbReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, + const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) { + SMCreateSmaReq createReq = {0}; + strcpy(createReq.name, smaname); + strcpy(createReq.stb, stbname); + createReq.igExists = igExists; + createReq.intervalUnit = 1; + createReq.slidingUnit = 2; + createReq.timezone = 3; + createReq.dstVgId = 4; + createReq.interval = 10; + createReq.offset = 5; + createReq.sliding = 6; + createReq.expr = (char*)expr; + createReq.exprLen = strlen(createReq.expr) + 1; + createReq.tagsFilter = (char*)tagsFilter; + createReq.tagsFilterLen = strlen(createReq.tagsFilter) + 1; + createReq.sql = (char*)sql; + createReq.sqlLen = strlen(createReq.sql) + 1; + createReq.ast = (char*)expr; + createReq.astLen = strlen(createReq.ast) + 1; + + int32_t tlen = tSerializeSMCreateSmaReq(NULL, 0, &createReq); + void* pHead = rpcMallocCont(tlen); + tSerializeSMCreateSmaReq(pHead, tlen, &createReq); + *pContLen = tlen; + return pHead; +} + +void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) { + SMDropSmaReq dropsmaReq = {0}; + dropsmaReq.igNotExists = igNotExists; + strcpy(dropsmaReq.name, smaname); + + int32_t contLen = tSerializeSMDropSmaReq(NULL, 0, &dropsmaReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMDropSmaReq(pReq, contLen, &dropsmaReq); + + *pContLen = contLen; + return pReq; +} + +TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { + const char* dbname = "1.d1"; + const char* stbname = "1.d1.stb"; + const char* smaname = "1.d1.sma"; + int32_t contLen = 0; + void* pReq; + SRpcMsg* pRsp; + + { + pReq = BuildCreateDbReq(dbname, &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + } + + { + pReq = BuildCreateStbReq(stbname, &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + } + + { + pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); + pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + } + + // restart + test.Restart(); + + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + CHECK_META("show indexes", 3); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckBinary("sma", TSDB_INDEX_NAME_LEN); + CheckTimestamp(); + CheckBinary("stb", TSDB_TABLE_NAME_LEN); + } + + { + pReq = BuildDropSmaReq(smaname, 0, &contLen); + pRsp = test.SendReq(TDMT_MND_DROP_SMA, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } +} diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index d3769b8a30..06e9c1c5db 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -141,6 +141,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA +#if 0 SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -162,10 +163,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // } tdDestroyTSma(&vCreateSmaReq.tSma); // TODO: return directly or go on follow steps? +#endif } break; case TDMT_VND_CANCEL_SMA: { // timeRangeSMA } break; case TDMT_VND_DROP_SMA: { // timeRangeSMA +#if 0 SVDropTSmaReq vDropSmaReq = {0}; if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -182,6 +185,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // return -1; // } // TODO: return directly or go on follow steps? +#endif } break; default: ASSERT(0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 04da9a4d09..a0eddd3a0d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -273,6 +273,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregatio // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma option") // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") @@ -309,6 +310,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")