commit
94bc3a6e52
|
@ -1108,18 +1108,34 @@ typedef struct {
|
|||
char* sql;
|
||||
char* physicalPlan;
|
||||
char* logicalPlan;
|
||||
} SMCreateTopicReq;
|
||||
} SCMCreateStreamReq;
|
||||
|
||||
int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq);
|
||||
int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq);
|
||||
void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq);
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
} SCMCreateStreamRsp;
|
||||
|
||||
int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq);
|
||||
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
char* sql;
|
||||
char* physicalPlan;
|
||||
char* logicalPlan;
|
||||
} SCMCreateTopicReq;
|
||||
|
||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
|
||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t topicId;
|
||||
} SMCreateTopicRsp;
|
||||
} SCMCreateTopicRsp;
|
||||
|
||||
int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp);
|
||||
int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp);
|
||||
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
|
||||
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t topicNum;
|
||||
|
|
|
@ -148,6 +148,9 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
||||
|
||||
// Requests handled by VNODE
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||
|
|
|
@ -113,15 +113,16 @@ typedef enum {
|
|||
SDB_USER = 7,
|
||||
SDB_AUTH = 8,
|
||||
SDB_ACCT = 9,
|
||||
SDB_OFFSET = 10,
|
||||
SDB_SUBSCRIBE = 11,
|
||||
SDB_CONSUMER = 12,
|
||||
SDB_TOPIC = 13,
|
||||
SDB_VGROUP = 14,
|
||||
SDB_STB = 15,
|
||||
SDB_DB = 16,
|
||||
SDB_FUNC = 17,
|
||||
SDB_MAX = 18
|
||||
SDB_STREAM = 10,
|
||||
SDB_OFFSET = 11,
|
||||
SDB_SUBSCRIBE = 12,
|
||||
SDB_CONSUMER = 13,
|
||||
SDB_TOPIC = 14,
|
||||
SDB_VGROUP = 15,
|
||||
SDB_STB = 16,
|
||||
SDB_DB = 17,
|
||||
SDB_FUNC = 18,
|
||||
SDB_MAX = 19
|
||||
} ESdbType;
|
||||
|
||||
typedef struct SSdb SSdb;
|
||||
|
|
|
@ -267,9 +267,11 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
||||
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
||||
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
|
||||
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||
|
||||
|
||||
// mnode-stream
|
||||
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
||||
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
||||
|
||||
// dnode
|
||||
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||
|
|
|
@ -211,6 +211,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
|
|
|
@ -511,7 +511,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||
|
||||
SMCreateTopicReq req = {
|
||||
SCMCreateTopicReq req = {
|
||||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
|
@ -519,13 +519,13 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
};
|
||||
tNameExtractFullName(&name, req.name);
|
||||
|
||||
int tlen = tSerializeMCreateTopicReq(NULL, 0, &req);
|
||||
int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
tSerializeMCreateTopicReq(buf, tlen, &req);
|
||||
tSerializeSCMCreateTopicReq(buf, tlen, &req);
|
||||
/*printf("formatted: %s\n", dagStr);*/
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
|
||||
|
|
|
@ -1899,7 +1899,7 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) {
|
||||
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
|
||||
int32_t sqlLen = 0;
|
||||
int32_t physicalPlanLen = 0;
|
||||
int32_t logicalPlanLen = 0;
|
||||
|
@ -1927,7 +1927,7 @@ int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopic
|
|||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) {
|
||||
int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicReq *pReq) {
|
||||
int32_t sqlLen = 0;
|
||||
int32_t physicalPlanLen = 0;
|
||||
int32_t logicalPlanLen = 0;
|
||||
|
@ -1956,13 +1956,13 @@ int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) {
|
||||
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
||||
tfree(pReq->sql);
|
||||
tfree(pReq->physicalPlan);
|
||||
tfree(pReq->logicalPlan);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
||||
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
|
@ -1975,7 +1975,7 @@ int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopi
|
|||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) {
|
||||
int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
|
@ -2423,3 +2423,43 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
|
|||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1;
|
||||
if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
|
||||
tfree(pReq->sql);
|
||||
tfree(pReq->physicalPlan);
|
||||
tfree(pReq->logicalPlan);
|
||||
}
|
||||
|
|
|
@ -84,6 +84,7 @@ typedef enum {
|
|||
TRN_TYPE_SUBSCRIBE = 1016,
|
||||
TRN_TYPE_REBALANCE = 1017,
|
||||
TRN_TYPE_COMMIT_OFFSET = 1018,
|
||||
TRN_TYPE_CREATE_STREAM = 1019,
|
||||
TRN_TYPE_BASIC_SCOPE_END,
|
||||
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
||||
TRN_TYPE_CREATE_DNODE = 2001,
|
||||
|
@ -662,7 +663,23 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
|
|||
}
|
||||
|
||||
typedef struct {
|
||||
} SStreamScheduler;
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t createTime;
|
||||
int64_t updateTime;
|
||||
int64_t uid;
|
||||
int64_t dbUid;
|
||||
int32_t version;
|
||||
SRWLatch lock;
|
||||
int8_t status;
|
||||
// int32_t sqlLen;
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
} SStreamObj;
|
||||
|
||||
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
|
||||
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
|
||||
|
||||
typedef struct SMnodeMsg {
|
||||
char user[TSDB_USER_LEN];
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_MND_STREAM_H_
|
||||
#define _TD_MND_STREAM_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode);
|
||||
void mndCleanupStream(SMnode *pMnode);
|
||||
|
||||
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
||||
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_MND_STREAM_H_*/
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "mndDef.h"
|
||||
|
||||
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1;
|
||||
if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1;
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,444 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "mndStream.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_STREAM_VER_NUMBER 1
|
||||
#define MND_STREAM_RESERVE_SIZE 64
|
||||
|
||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
||||
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
|
||||
static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq);
|
||||
/*static int32_t mndProcessDropStreamReq(SMnodeMsg *pReq);*/
|
||||
/*static int32_t mndProcessDropStreamInRsp(SMnodeMsg *pRsp);*/
|
||||
static int32_t mndProcessStreamMetaReq(SMnodeMsg *pReq);
|
||||
static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_STREAM,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.encodeFp = (SdbEncodeFp)mndStreamActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndStreamActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndStreamActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndStreamActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
|
||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetStreamMeta);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupStream(SMnode *pMnode) {}
|
||||
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
SCoder encoder;
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
if (tEncodeSStreamObj(NULL, pStream) < 0) {
|
||||
tCoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
buf = malloc(tlen);
|
||||
if (buf == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
|
||||
if (tEncodeSStreamObj(NULL, pStream) < 0) {
|
||||
tCoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
tCoderClear(&encoder);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_ENCODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
|
||||
|
||||
if (sver != MND_STREAM_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SStreamObj);
|
||||
SSdbRow *pRow = sdbAllocRow(size);
|
||||
if (pRow == NULL) goto STREAM_DECODE_OVER;
|
||||
|
||||
SStreamObj *pStream = sdbGetRowObj(pRow);
|
||||
if (pStream == NULL) goto STREAM_DECODE_OVER;
|
||||
|
||||
int32_t tlen;
|
||||
int32_t dataPos = 0;
|
||||
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
|
||||
buf = malloc(tlen + 1);
|
||||
if (buf == NULL) goto STREAM_DECODE_OVER;
|
||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
|
||||
|
||||
SCoder decoder;
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER);
|
||||
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_DECODE_OVER:
|
||||
tfree(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to decode from raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||
tfree(pRow);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream);
|
||||
return pRow;
|
||||
}
|
||||
|
||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
||||
mTrace("stream:%s, perform insert action", pStream->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
|
||||
mTrace("stream:%s, perform delete action", pStream->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
|
||||
mTrace("stream:%s, perform update action", pOldStream->name);
|
||||
atomic_exchange_32(&pOldStream->updateTime, pNewStream->updateTime);
|
||||
atomic_exchange_32(&pOldStream->version, pNewStream->version);
|
||||
|
||||
taosWLockLatch(&pOldStream->lock);
|
||||
|
||||
// TODO handle update
|
||||
|
||||
taosWUnLockLatch(&pOldStream->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
|
||||
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
}
|
||||
return pStream;
|
||||
}
|
||||
|
||||
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
|
||||
SName name = {0};
|
||||
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
char db[TSDB_STREAM_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, db);
|
||||
|
||||
return mndAcquireDb(pMnode, db);
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
||||
mDebug("stream:%s to create", pCreate->name);
|
||||
SStreamObj streamObj = {0};
|
||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
streamObj.dbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
streamObj.physicalPlan = pCreate->physicalPlan;
|
||||
streamObj.logicalPlan = pCreate->logicalPlan;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
int32_t code = -1;
|
||||
SStreamObj *pStream = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
SCMCreateStreamReq createStreamReq = {0};
|
||||
|
||||
if (tDeserializeSCMCreateStreamReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createStreamReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
mDebug("stream:%s, start to create, sql:%s", createStreamReq.name, createStreamReq.sql);
|
||||
|
||||
if (mndCheckCreateStreamReq(&createStreamReq) != 0) {
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
pStream = mndAcquireStream(pMnode, createStreamReq.name);
|
||||
if (pStream != NULL) {
|
||||
if (createStreamReq.igExists) {
|
||||
mDebug("stream:%s, already exist, ignore exist is set", createStreamReq.name);
|
||||
code = 0;
|
||||
goto CREATE_STREAM_OVER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
} else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) {
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbByStream(pMnode, createStreamReq.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||
if (pUser == NULL) {
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
if (mndCheckWriteAuth(pUser, pDb) != 0) {
|
||||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
|
||||
CREATE_STREAM_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
||||
tFreeSCMCreateStreamReq(&createStreamReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfStreams = 0;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SStreamObj *pStream = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pStream->dbUid == pDb->uid) {
|
||||
numOfStreams++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
*pNumOfStreams = numOfStreams;
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
if (mndGetNumOfStreams(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->pSchemas;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "name");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sql");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = cols;
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_STREAM);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SStreamObj *pStream = NULL;
|
||||
int32_t cols = 0;
|
||||
char *pWrite;
|
||||
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
||||
if (pDb == NULL) return 0;
|
||||
|
||||
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
||||
strcat(prefix, TS_PATH_DELIMITER);
|
||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
if (pStream->dbUid != pDb->uid) {
|
||||
if (strncmp(pStream->name, prefix, prefixLen) != 0) {
|
||||
mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid);
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
cols = 0;
|
||||
|
||||
char streamName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN);
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, streamName);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pStream->createTime;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pShow->numOfReads += numOfRows;
|
||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
|
@ -13,7 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTopic.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndDb.h"
|
||||
|
@ -229,7 +228,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
|||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
||||
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||
return -1;
|
||||
|
@ -237,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||
mDebug("topic:%s to create", pCreate->name);
|
||||
SMqTopicObj topicObj = {0};
|
||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||
|
@ -278,14 +277,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
|||
}
|
||||
|
||||
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
int32_t code = -1;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
SMCreateTopicReq createTopicReq = {0};
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
int32_t code = -1;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
SCMCreateTopicReq createTopicReq = {0};
|
||||
|
||||
if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
||||
if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
@ -338,7 +337,7 @@ CREATE_TOPIC_OVER:
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
||||
tFreeSMCreateTopicReq(&createTopicReq);
|
||||
tFreeSCMCreateTopicReq(&createTopicReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -409,35 +408,6 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfTopics = 0;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pTopic->dbUid == pDb->uid) {
|
||||
numOfTopics++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
||||
*pNumOfTopics = numOfTopics;
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
|
|
|
@ -61,16 +61,16 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
|||
}
|
||||
|
||||
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
|
||||
SMCreateTopicReq createReq = {0};
|
||||
SCMCreateTopicReq createReq = {0};
|
||||
strcpy(createReq.name, topicName);
|
||||
createReq.igExists = 0;
|
||||
createReq.sql = (char*)sql;
|
||||
createReq.physicalPlan = (char*)"physicalPlan";
|
||||
createReq.logicalPlan = (char*)"logicalPlan";
|
||||
|
||||
int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq);
|
||||
int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeMCreateTopicReq(pReq, contLen, &createReq);
|
||||
tSerializeSCMCreateTopicReq(pReq, contLen, &createReq);
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
|
@ -100,9 +100,7 @@ TEST_F(MndTestTopic, 01_Create_Topic) {
|
|||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, "");
|
||||
}
|
||||
{ test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, ""); }
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#include "snode.h"
|
||||
|
@ -28,12 +29,51 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__READY = 1,
|
||||
STREAM_STATUS__STOPPED,
|
||||
STREAM_STATUS__CREATING,
|
||||
STREAM_STATUS__STOPING,
|
||||
STREAM_STATUS__RESUMING,
|
||||
STREAM_STATUS__DELETING,
|
||||
};
|
||||
|
||||
enum {
|
||||
STREAM_RUNNER__RUNNING = 1,
|
||||
STREAM_RUNNER__STOP,
|
||||
};
|
||||
|
||||
typedef struct SSnode {
|
||||
SSnodeOpt cfg;
|
||||
} SSnode;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t IdxInLevel;
|
||||
int32_t level;
|
||||
} SStreamInfo;
|
||||
|
||||
typedef struct {
|
||||
SStreamInfo meta;
|
||||
int8_t status;
|
||||
void* executor;
|
||||
STaosQueue* queue;
|
||||
void* stateStore;
|
||||
// storage handle
|
||||
} SStreamRunner;
|
||||
|
||||
typedef struct {
|
||||
SHashObj* pHash;
|
||||
} SStreamMeta;
|
||||
|
||||
int32_t sndCreateStream();
|
||||
int32_t sndDropStream();
|
||||
|
||||
int32_t sndStopStream();
|
||||
int32_t sndResumeStream();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_SNODE_INT_H_*/
|
||||
#endif /*_TD_SNODE_INT_H_*/
|
||||
|
|
|
@ -124,8 +124,13 @@ class WalRetentionEnv : public ::testing::Test {
|
|||
|
||||
void SetUp() override {
|
||||
SWalCfg cfg;
|
||||
cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0,
|
||||
cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC;
|
||||
cfg.rollPeriod = -1;
|
||||
cfg.segSize = -1;
|
||||
cfg.retentionPeriod = -1;
|
||||
cfg.retentionSize = 0;
|
||||
cfg.rollPeriod = 0;
|
||||
cfg.vgId = 0;
|
||||
cfg.level = TAOS_WAL_FSYNC;
|
||||
pWal = walOpen(pathName, &cfg);
|
||||
ASSERT(pWal != NULL);
|
||||
}
|
||||
|
|
|
@ -174,8 +174,8 @@ TEST(td_encode_test, encode_decode_variant_len_integer) {
|
|||
}
|
||||
|
||||
TEST(td_encode_test, encode_decode_cstr) {
|
||||
uint8_t * buf = new uint8_t[1024 * 1024];
|
||||
char * cstr = new char[1024 * 1024];
|
||||
uint8_t *buf = new uint8_t[1024 * 1024];
|
||||
char *cstr = new char[1024 * 1024];
|
||||
const char *dcstr;
|
||||
SCoder encoder;
|
||||
SCoder decoder;
|
||||
|
@ -208,7 +208,7 @@ TEST(td_encode_test, encode_decode_cstr) {
|
|||
typedef struct {
|
||||
int32_t A_a;
|
||||
int64_t A_b;
|
||||
char * A_c;
|
||||
char *A_c;
|
||||
} SStructA_v1;
|
||||
|
||||
static int32_t tSStructA_v1_encode(SCoder *pCoder, const SStructA_v1 *pSAV1) {
|
||||
|
@ -240,7 +240,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
|
|||
typedef struct {
|
||||
int32_t A_a;
|
||||
int64_t A_b;
|
||||
char * A_c;
|
||||
char *A_c;
|
||||
// -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION--------------
|
||||
int16_t A_d;
|
||||
int16_t A_e;
|
||||
|
@ -437,4 +437,4 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
|||
tCoderClear(&decoder);
|
||||
}
|
||||
#endif
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
Loading…
Reference in New Issue