add tsma
This commit is contained in:
parent
20017d82b3
commit
9006b0f576
|
@ -1897,6 +1897,35 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
|
|||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char stb[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int8_t timezone;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int32_t exprLen;
|
||||
int32_t tagsFilterLen;
|
||||
char* expr;
|
||||
char* tagsFilter;
|
||||
} SMCreateSmaReq;
|
||||
|
||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||
int32_t tDeserializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||
void tFreeSMCreateSmaReq(SMCreateSmaReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t igNotExists;
|
||||
} SMDropSmaReq;
|
||||
|
||||
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int8_t version; // for compatibility(default 0)
|
||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||
|
|
|
@ -127,6 +127,8 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "mnode-create-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "mnode-drop-sma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "mnode-table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "mnode-qnode-list", NULL, NULL)
|
||||
|
|
|
@ -274,18 +274,22 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
||||
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
||||
|
||||
// mnode-sma
|
||||
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||
#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0401)
|
||||
|
||||
// dnode
|
||||
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
|
||||
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402)
|
||||
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
|
||||
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404)
|
||||
#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405)
|
||||
#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0406)
|
||||
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0410)
|
||||
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411)
|
||||
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0412)
|
||||
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0413)
|
||||
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x04A0)
|
||||
#define TSDB_CODE_DND_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x04A1)
|
||||
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x04A2)
|
||||
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A3)
|
||||
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A4)
|
||||
#define TSDB_CODE_NODE_PARSE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x04A5)
|
||||
#define TSDB_CODE_NODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04A6)
|
||||
#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A7)
|
||||
#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x04A8)
|
||||
#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x04A9)
|
||||
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x04AA)
|
||||
|
||||
// vnode
|
||||
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
|
||||
|
|
|
@ -589,6 +589,99 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
|
|||
pReq->pFields = NULL;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->intervalUnit) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->slidingUnit) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->timezone) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->interval) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->offset) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->sliding) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->exprLen) < 0) return -1;
|
||||
if (pReq->exprLen > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->expr, pReq->exprLen) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pReq->tagsFilterLen) < 0) return -1;
|
||||
if (pReq->tagsFilterLen > 0) {
|
||||
if (tEncodeBinary(&encoder, pReq->tagsFilter, pReq->tagsFilterLen) < 0) return -1;
|
||||
}
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->intervalUnit) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->slidingUnit) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->timezone) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->interval) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->offset) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->sliding) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->exprLen) < 0) return -1;
|
||||
if (pReq->exprLen > 0) {
|
||||
pReq->expr = malloc(pReq->exprLen);
|
||||
if (pReq->expr == NULL) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->expr) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI32(&decoder, &pReq->tagsFilterLen) < 0) return -1;
|
||||
if (pReq->tagsFilterLen > 0) {
|
||||
pReq->tagsFilter = malloc(pReq->tagsFilterLen);
|
||||
if (pReq->tagsFilter == NULL) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->tagsFilter) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMCreateSmaReq(SMCreateSmaReq *pReq) {
|
||||
tfree(pReq->expr);
|
||||
tfree(pReq->tagsFilter);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
|
|
@ -123,6 +123,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
|||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
|
|
|
@ -103,6 +103,8 @@ typedef enum {
|
|||
TRN_TYPE_CREATE_STB = 4001,
|
||||
TRN_TYPE_ALTER_STB = 4002,
|
||||
TRN_TYPE_DROP_STB = 4003,
|
||||
TRN_TYPE_CREATE_SMA = 4004,
|
||||
TRN_TYPE_DROP_SMA = 4005,
|
||||
TRN_TYPE_STB_SCOPE_END,
|
||||
} ETrnType;
|
||||
|
||||
|
@ -306,20 +308,38 @@ typedef struct {
|
|||
} SVgObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int32_t nextColId;
|
||||
int32_t numOfColumns;
|
||||
int32_t numOfTags;
|
||||
SSchema* pColumns;
|
||||
SSchema* pTags;
|
||||
SRWLatch lock;
|
||||
char comment[TSDB_STB_COMMENT_LEN];
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char stb[TSDB_DB_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t uid;
|
||||
int64_t stbUid;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int8_t timezone;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
int32_t exprLen;
|
||||
int32_t tagsFilterLen;
|
||||
char* expr;
|
||||
char* tagsFilter;
|
||||
} SSmaObj;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
int32_t nextColId;
|
||||
int32_t numOfColumns;
|
||||
int32_t numOfTags;
|
||||
SSchema* pColumns;
|
||||
SSchema* pTags;
|
||||
SRWLatch lock;
|
||||
char comment[TSDB_STB_COMMENT_LEN];
|
||||
} SStbObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MND_SMA_H_
|
||||
#define _TD_MND_SMA_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitSma(SMnode *pMnode);
|
||||
void mndCleanupSma(SMnode *pMnode);
|
||||
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
|
||||
void mndReleaseSma(SMnode *pMnode, SStbObj *pStb);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MND_SMA_H_*/
|
|
@ -18,15 +18,15 @@
|
|||
#include "mndAuth.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndInfoSchema.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "mndInfoSchema.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define TSDB_STB_VER_NUMBER 1
|
||||
#define TSDB_STB_VER_NUMBER 1
|
||||
#define TSDB_STB_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -1162,7 +1162,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
|
|
@ -406,6 +406,10 @@ static const char *mndTransType(ETrnType type) {
|
|||
return "alter-stb";
|
||||
case TRN_TYPE_DROP_STB:
|
||||
return "drop-stb";
|
||||
case TRN_TYPE_CREATE_SMA:
|
||||
return "create-sma";
|
||||
case TRN_TYPE_DROP_SMA:
|
||||
return "drop-sma";
|
||||
default:
|
||||
return "invalid";
|
||||
}
|
||||
|
|
|
@ -270,6 +270,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill
|
|||
// mnode-topic
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
|
||||
|
||||
// mnode-sma
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "SMA does not exist")
|
||||
|
||||
// dnode
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")
|
||||
|
|
Loading…
Reference in New Issue