add mnode stream
This commit is contained in:
parent
913e8d70d8
commit
e766f3fad4
|
@ -1108,18 +1108,34 @@ typedef struct {
|
||||||
char* sql;
|
char* sql;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
} SMCreateTopicReq;
|
} SCMCreateStreamReq;
|
||||||
|
|
||||||
int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq);
|
typedef struct {
|
||||||
int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq);
|
int64_t streamId;
|
||||||
void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq);
|
} SCMCreateStreamRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq);
|
||||||
|
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
|
||||||
|
void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
int8_t igExists;
|
||||||
|
char* sql;
|
||||||
|
char* physicalPlan;
|
||||||
|
char* logicalPlan;
|
||||||
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||||
|
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
|
||||||
|
void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t topicId;
|
int64_t topicId;
|
||||||
} SMCreateTopicRsp;
|
} SCMCreateTopicRsp;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp);
|
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
|
||||||
int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp);
|
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
|
|
|
@ -148,6 +148,9 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -113,15 +113,16 @@ typedef enum {
|
||||||
SDB_USER = 7,
|
SDB_USER = 7,
|
||||||
SDB_AUTH = 8,
|
SDB_AUTH = 8,
|
||||||
SDB_ACCT = 9,
|
SDB_ACCT = 9,
|
||||||
SDB_OFFSET = 10,
|
SDB_STREAM = 10,
|
||||||
SDB_SUBSCRIBE = 11,
|
SDB_OFFSET = 11,
|
||||||
SDB_CONSUMER = 12,
|
SDB_SUBSCRIBE = 12,
|
||||||
SDB_TOPIC = 13,
|
SDB_CONSUMER = 13,
|
||||||
SDB_VGROUP = 14,
|
SDB_TOPIC = 14,
|
||||||
SDB_STB = 15,
|
SDB_VGROUP = 15,
|
||||||
SDB_DB = 16,
|
SDB_STB = 16,
|
||||||
SDB_FUNC = 17,
|
SDB_DB = 17,
|
||||||
SDB_MAX = 18
|
SDB_FUNC = 18,
|
||||||
|
SDB_MAX = 19
|
||||||
} ESdbType;
|
} ESdbType;
|
||||||
|
|
||||||
typedef struct SSdb SSdb;
|
typedef struct SSdb SSdb;
|
||||||
|
|
|
@ -267,9 +267,11 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
||||||
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
||||||
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
|
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
|
||||||
#define TSDB_CODE_MND_MQ_PLACEHOLDER TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
|
||||||
|
|
||||||
|
|
||||||
|
// mnode-stream
|
||||||
|
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||||
|
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
|
||||||
|
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
|
||||||
|
|
||||||
// dnode
|
// dnode
|
||||||
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||||
|
|
|
@ -211,6 +211,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||||
|
#define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||||
#define TSDB_COL_NAME_LEN 65
|
#define TSDB_COL_NAME_LEN 65
|
||||||
|
|
|
@ -511,7 +511,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
||||||
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
|
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
|
||||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||||
|
|
||||||
SMCreateTopicReq req = {
|
SCMCreateTopicReq req = {
|
||||||
.igExists = 1,
|
.igExists = 1,
|
||||||
.physicalPlan = (char*)pStr,
|
.physicalPlan = (char*)pStr,
|
||||||
.sql = (char*)sql,
|
.sql = (char*)sql,
|
||||||
|
@ -519,13 +519,13 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
||||||
};
|
};
|
||||||
tNameExtractFullName(&name, req.name);
|
tNameExtractFullName(&name, req.name);
|
||||||
|
|
||||||
int tlen = tSerializeMCreateTopicReq(NULL, 0, &req);
|
int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeMCreateTopicReq(buf, tlen, &req);
|
tSerializeSCMCreateTopicReq(buf, tlen, &req);
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
/*printf("formatted: %s\n", dagStr);*/
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
|
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
|
||||||
|
|
|
@ -1899,7 +1899,7 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) {
|
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
int32_t physicalPlanLen = 0;
|
int32_t physicalPlanLen = 0;
|
||||||
int32_t logicalPlanLen = 0;
|
int32_t logicalPlanLen = 0;
|
||||||
|
@ -1927,7 +1927,7 @@ int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopic
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) {
|
int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicReq *pReq) {
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
int32_t physicalPlanLen = 0;
|
int32_t physicalPlanLen = 0;
|
||||||
int32_t logicalPlanLen = 0;
|
int32_t logicalPlanLen = 0;
|
||||||
|
@ -1956,13 +1956,13 @@ int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) {
|
void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
|
||||||
tfree(pReq->sql);
|
tfree(pReq->sql);
|
||||||
tfree(pReq->physicalPlan);
|
tfree(pReq->physicalPlan);
|
||||||
tfree(pReq->logicalPlan);
|
tfree(pReq->logicalPlan);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
int32_t tSerializeSCMCreateTopicRsp(void *buf, int32_t bufLen, const SCMCreateTopicRsp *pRsp) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
@ -1975,7 +1975,7 @@ int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopi
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) {
|
int32_t tDeserializeSCMCreateTopicRsp(void *buf, int32_t bufLen, SCMCreateTopicRsp *pRsp) {
|
||||||
SCoder decoder = {0};
|
SCoder decoder = {0};
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
@ -2423,3 +2423,43 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
|
||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
|
if (tDecodeCStr(&decoder, (const char **)&pReq->sql) < 0) return -1;
|
||||||
|
if (tDecodeCStr(&decoder, (const char **)&pReq->physicalPlan) < 0) return -1;
|
||||||
|
if (tDecodeCStr(&decoder, (const char **)&pReq->logicalPlan) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
|
||||||
|
tfree(pReq->sql);
|
||||||
|
tfree(pReq->physicalPlan);
|
||||||
|
tfree(pReq->logicalPlan);
|
||||||
|
}
|
||||||
|
|
|
@ -84,6 +84,7 @@ typedef enum {
|
||||||
TRN_TYPE_SUBSCRIBE = 1016,
|
TRN_TYPE_SUBSCRIBE = 1016,
|
||||||
TRN_TYPE_REBALANCE = 1017,
|
TRN_TYPE_REBALANCE = 1017,
|
||||||
TRN_TYPE_COMMIT_OFFSET = 1018,
|
TRN_TYPE_COMMIT_OFFSET = 1018,
|
||||||
|
TRN_TYPE_CREATE_STREAM = 1019,
|
||||||
TRN_TYPE_BASIC_SCOPE_END,
|
TRN_TYPE_BASIC_SCOPE_END,
|
||||||
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
||||||
TRN_TYPE_CREATE_DNODE = 2001,
|
TRN_TYPE_CREATE_DNODE = 2001,
|
||||||
|
@ -662,7 +663,23 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
} SStreamScheduler;
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
int64_t createTime;
|
||||||
|
int64_t updateTime;
|
||||||
|
int64_t uid;
|
||||||
|
int64_t dbUid;
|
||||||
|
int32_t version;
|
||||||
|
SRWLatch lock;
|
||||||
|
int8_t status;
|
||||||
|
// int32_t sqlLen;
|
||||||
|
char* sql;
|
||||||
|
char* logicalPlan;
|
||||||
|
char* physicalPlan;
|
||||||
|
} SStreamObj;
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
|
||||||
|
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
|
||||||
|
|
||||||
typedef struct SMnodeMsg {
|
typedef struct SMnodeMsg {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_MND_STREAM_H_
|
||||||
|
#define _TD_MND_STREAM_H_
|
||||||
|
|
||||||
|
#include "mndInt.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t mndInitStream(SMnode *pMnode);
|
||||||
|
void mndCleanupStream(SMnode *pMnode);
|
||||||
|
|
||||||
|
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
||||||
|
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||||
|
|
||||||
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||||
|
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_MND_STREAM_H_*/
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mndDef.h"
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
|
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
|
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pDecoder, (const char **)&pObj->sql) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pDecoder, (const char **)&pObj->logicalPlan) < 0) return -1;
|
||||||
|
if (tDecodeCStr(pDecoder, (const char **)&pObj->physicalPlan) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,444 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "mndStream.h"
|
||||||
|
#include "mndAuth.h"
|
||||||
|
#include "mndDb.h"
|
||||||
|
#include "mndDnode.h"
|
||||||
|
#include "mndMnode.h"
|
||||||
|
#include "mndShow.h"
|
||||||
|
#include "mndStb.h"
|
||||||
|
#include "mndTrans.h"
|
||||||
|
#include "mndUser.h"
|
||||||
|
#include "mndVgroup.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
|
#define MND_STREAM_VER_NUMBER 1
|
||||||
|
#define MND_STREAM_RESERVE_SIZE 64
|
||||||
|
|
||||||
|
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
||||||
|
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
||||||
|
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
|
||||||
|
static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq);
|
||||||
|
/*static int32_t mndProcessDropStreamReq(SMnodeMsg *pReq);*/
|
||||||
|
/*static int32_t mndProcessDropStreamInRsp(SMnodeMsg *pRsp);*/
|
||||||
|
static int32_t mndProcessStreamMetaReq(SMnodeMsg *pReq);
|
||||||
|
static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
|
static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
int32_t mndInitStream(SMnode *pMnode) {
|
||||||
|
SSdbTable table = {.sdbType = SDB_STREAM,
|
||||||
|
.keyType = SDB_KEY_BINARY,
|
||||||
|
.encodeFp = (SdbEncodeFp)mndStreamActionEncode,
|
||||||
|
.decodeFp = (SdbDecodeFp)mndStreamActionDecode,
|
||||||
|
.insertFp = (SdbInsertFp)mndStreamActionInsert,
|
||||||
|
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
|
||||||
|
.deleteFp = (SdbDeleteFp)mndStreamActionDelete};
|
||||||
|
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
|
||||||
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||||
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetStreamMeta);
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);
|
||||||
|
|
||||||
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndCleanupStream(SMnode *pMnode) {}
|
||||||
|
|
||||||
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
void *buf = NULL;
|
||||||
|
|
||||||
|
SCoder encoder;
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||||
|
if (tEncodeSStreamObj(NULL, pStream) < 0) {
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
goto STREAM_ENCODE_OVER;
|
||||||
|
}
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
|
||||||
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
|
||||||
|
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
|
||||||
|
|
||||||
|
buf = malloc(tlen);
|
||||||
|
if (buf == NULL) goto STREAM_ENCODE_OVER;
|
||||||
|
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
|
||||||
|
if (tEncodeSStreamObj(NULL, pStream) < 0) {
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
goto STREAM_ENCODE_OVER;
|
||||||
|
}
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
int32_t dataPos = 0;
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
|
||||||
|
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
STREAM_ENCODE_OVER:
|
||||||
|
tfree(buf);
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mTrace("stream:%s, encode to raw:%p, row:%p", pStream->name, pRaw, pStream);
|
||||||
|
return pRaw;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
void *buf = NULL;
|
||||||
|
|
||||||
|
int8_t sver = 0;
|
||||||
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
|
||||||
|
|
||||||
|
if (sver != MND_STREAM_VER_NUMBER) {
|
||||||
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
|
goto STREAM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t size = sizeof(SStreamObj);
|
||||||
|
SSdbRow *pRow = sdbAllocRow(size);
|
||||||
|
if (pRow == NULL) goto STREAM_DECODE_OVER;
|
||||||
|
|
||||||
|
SStreamObj *pStream = sdbGetRowObj(pRow);
|
||||||
|
if (pStream == NULL) goto STREAM_DECODE_OVER;
|
||||||
|
|
||||||
|
int32_t tlen;
|
||||||
|
int32_t dataPos = 0;
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &tlen, STREAM_DECODE_OVER);
|
||||||
|
buf = malloc(tlen + 1);
|
||||||
|
if (buf == NULL) goto STREAM_DECODE_OVER;
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
|
||||||
|
|
||||||
|
SCoder decoder;
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER);
|
||||||
|
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
|
||||||
|
goto STREAM_DECODE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
STREAM_DECODE_OVER:
|
||||||
|
tfree(buf);
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
|
mError("stream:%s, failed to decode from raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||||
|
tfree(pRow);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
mTrace("stream:%s, decode from raw:%p, row:%p", pStream->name, pRaw, pStream);
|
||||||
|
return pRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
||||||
|
mTrace("stream:%s, perform insert action", pStream->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
|
||||||
|
mTrace("stream:%s, perform delete action", pStream->name);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) {
|
||||||
|
mTrace("stream:%s, perform update action", pOldStream->name);
|
||||||
|
atomic_exchange_32(&pOldStream->updateTime, pNewStream->updateTime);
|
||||||
|
atomic_exchange_32(&pOldStream->version, pNewStream->version);
|
||||||
|
|
||||||
|
taosWLockLatch(&pOldStream->lock);
|
||||||
|
|
||||||
|
// TODO handle update
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pOldStream->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
|
||||||
|
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
|
}
|
||||||
|
return pStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
|
char db[TSDB_STREAM_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(&name, db);
|
||||||
|
|
||||||
|
return mndAcquireDb(pMnode, db);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
|
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
||||||
|
mDebug("stream:%s to create", pCreate->name);
|
||||||
|
SStreamObj streamObj = {0};
|
||||||
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
|
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
|
streamObj.updateTime = streamObj.createTime;
|
||||||
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
streamObj.dbUid = pDb->uid;
|
||||||
|
streamObj.version = 1;
|
||||||
|
streamObj.sql = pCreate->sql;
|
||||||
|
streamObj.physicalPlan = pCreate->physicalPlan;
|
||||||
|
streamObj.logicalPlan = pCreate->logicalPlan;
|
||||||
|
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
int32_t code = -1;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
SCMCreateStreamReq createStreamReq = {0};
|
||||||
|
|
||||||
|
if (tDeserializeSCMCreateStreamReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createStreamReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("stream:%s, start to create, sql:%s", createStreamReq.name, createStreamReq.sql);
|
||||||
|
|
||||||
|
if (mndCheckCreateStreamReq(&createStreamReq) != 0) {
|
||||||
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStream = mndAcquireStream(pMnode, createStreamReq.name);
|
||||||
|
if (pStream != NULL) {
|
||||||
|
if (createStreamReq.igExists) {
|
||||||
|
mDebug("stream:%s, already exist, ignore exist is set", createStreamReq.name);
|
||||||
|
code = 0;
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
} else if (terrno != TSDB_CODE_MND_STREAM_NOT_EXIST) {
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDb = mndAcquireDbByStream(pMnode, createStreamReq.name);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckWriteAuth(pUser, pDb) != 0) {
|
||||||
|
goto CREATE_STREAM_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);
|
||||||
|
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
CREATE_STREAM_OVER:
|
||||||
|
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
|
tFreeSCMCreateStreamReq(&createStreamReq);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfStreams = 0;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pStream->dbUid == pDb->uid) {
|
||||||
|
numOfStreams++;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
*pNumOfStreams = numOfStreams;
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||||
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
if (mndGetNumOfStreams(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t cols = 0;
|
||||||
|
SSchema *pSchema = pMeta->pSchemas;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "name");
|
||||||
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = 8;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
strcpy(pSchema[cols].name, "create_time");
|
||||||
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "sql");
|
||||||
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pMeta->numOfColumns = cols;
|
||||||
|
pShow->numOfColumns = cols;
|
||||||
|
|
||||||
|
pShow->offset[0] = 0;
|
||||||
|
for (int32_t i = 1; i < cols; ++i) {
|
||||||
|
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
pShow->numOfRows = sdbGetSize(pSdb, SDB_STREAM);
|
||||||
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
|
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
int32_t cols = 0;
|
||||||
|
char *pWrite;
|
||||||
|
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
|
||||||
|
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
||||||
|
if (pDb == NULL) return 0;
|
||||||
|
|
||||||
|
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
|
||||||
|
strcat(prefix, TS_PATH_DELIMITER);
|
||||||
|
int32_t prefixLen = (int32_t)strlen(prefix);
|
||||||
|
|
||||||
|
while (numOfRows < rows) {
|
||||||
|
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pStream->dbUid != pDb->uid) {
|
||||||
|
if (strncmp(pStream->name, prefix, prefixLen) != 0) {
|
||||||
|
mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
cols = 0;
|
||||||
|
|
||||||
|
char streamName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
|
tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN);
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_TO_VARSTR(pWrite, streamName);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int64_t *)pWrite = pStream->createTime;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
numOfRows++;
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
pShow->numOfReads += numOfRows;
|
||||||
|
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
}
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
#include "mndAuth.h"
|
#include "mndAuth.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
|
@ -229,7 +228,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
||||||
return pDrop;
|
return pDrop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -237,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||||
mDebug("topic:%s to create", pCreate->name);
|
mDebug("topic:%s to create", pCreate->name);
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -278,14 +277,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
SMCreateTopicReq createTopicReq = {0};
|
SCMCreateTopicReq createTopicReq = {0};
|
||||||
|
|
||||||
if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
if (tDeserializeSCMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto CREATE_TOPIC_OVER;
|
goto CREATE_TOPIC_OVER;
|
||||||
}
|
}
|
||||||
|
@ -338,7 +337,7 @@ CREATE_TOPIC_OVER:
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
tFreeSMCreateTopicReq(&createTopicReq);
|
tFreeSCMCreateTopicReq(&createTopicReq);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,35 +408,6 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
|
||||||
if (pDb == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfTopics = 0;
|
|
||||||
void *pIter = NULL;
|
|
||||||
while (1) {
|
|
||||||
SMqTopicObj *pTopic = NULL;
|
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (pTopic->dbUid == pDb->uid) {
|
|
||||||
numOfTopics++;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pTopic);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pNumOfTopics = numOfTopics;
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
|
|
|
@ -61,16 +61,16 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
|
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
|
||||||
SMCreateTopicReq createReq = {0};
|
SCMCreateTopicReq createReq = {0};
|
||||||
strcpy(createReq.name, topicName);
|
strcpy(createReq.name, topicName);
|
||||||
createReq.igExists = 0;
|
createReq.igExists = 0;
|
||||||
createReq.sql = (char*)sql;
|
createReq.sql = (char*)sql;
|
||||||
createReq.physicalPlan = (char*)"physicalPlan";
|
createReq.physicalPlan = (char*)"physicalPlan";
|
||||||
createReq.logicalPlan = (char*)"logicalPlan";
|
createReq.logicalPlan = (char*)"logicalPlan";
|
||||||
|
|
||||||
int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq);
|
int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
|
||||||
void* pReq = rpcMallocCont(contLen);
|
void* pReq = rpcMallocCont(contLen);
|
||||||
tSerializeMCreateTopicReq(pReq, contLen, &createReq);
|
tSerializeSCMCreateTopicReq(pReq, contLen, &createReq);
|
||||||
|
|
||||||
*pContLen = contLen;
|
*pContLen = contLen;
|
||||||
return pReq;
|
return pReq;
|
||||||
|
@ -100,9 +100,7 @@ TEST_F(MndTestTopic, 01_Create_Topic) {
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{ test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, ""); }
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, "");
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
#include "snode.h"
|
#include "snode.h"
|
||||||
|
@ -28,12 +29,51 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_STATUS__READY = 1,
|
||||||
|
STREAM_STATUS__STOPPED,
|
||||||
|
STREAM_STATUS__CREATING,
|
||||||
|
STREAM_STATUS__STOPING,
|
||||||
|
STREAM_STATUS__RESUMING,
|
||||||
|
STREAM_STATUS__DELETING,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
STREAM_RUNNER__RUNNING = 1,
|
||||||
|
STREAM_RUNNER__STOP,
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SSnode {
|
typedef struct SSnode {
|
||||||
SSnodeOpt cfg;
|
SSnodeOpt cfg;
|
||||||
} SSnode;
|
} SSnode;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t IdxInLevel;
|
||||||
|
int32_t level;
|
||||||
|
} SStreamInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SStreamInfo meta;
|
||||||
|
int8_t status;
|
||||||
|
void* executor;
|
||||||
|
STaosQueue* queue;
|
||||||
|
void* stateStore;
|
||||||
|
// storage handle
|
||||||
|
} SStreamRunner;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SHashObj* pHash;
|
||||||
|
} SStreamMeta;
|
||||||
|
|
||||||
|
int32_t sndCreateStream();
|
||||||
|
int32_t sndDropStream();
|
||||||
|
|
||||||
|
int32_t sndStopStream();
|
||||||
|
int32_t sndResumeStream();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_SNODE_INT_H_*/
|
#endif /*_TD_SNODE_INT_H_*/
|
||||||
|
|
|
@ -124,8 +124,13 @@ class WalRetentionEnv : public ::testing::Test {
|
||||||
|
|
||||||
void SetUp() override {
|
void SetUp() override {
|
||||||
SWalCfg cfg;
|
SWalCfg cfg;
|
||||||
cfg.rollPeriod = -1, cfg.segSize = -1, cfg.retentionPeriod = -1, cfg.retentionSize = 0, cfg.rollPeriod = 0,
|
cfg.rollPeriod = -1;
|
||||||
cfg.vgId = 0, cfg.level = TAOS_WAL_FSYNC;
|
cfg.segSize = -1;
|
||||||
|
cfg.retentionPeriod = -1;
|
||||||
|
cfg.retentionSize = 0;
|
||||||
|
cfg.rollPeriod = 0;
|
||||||
|
cfg.vgId = 0;
|
||||||
|
cfg.level = TAOS_WAL_FSYNC;
|
||||||
pWal = walOpen(pathName, &cfg);
|
pWal = walOpen(pathName, &cfg);
|
||||||
ASSERT(pWal != NULL);
|
ASSERT(pWal != NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,8 +174,8 @@ TEST(td_encode_test, encode_decode_variant_len_integer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(td_encode_test, encode_decode_cstr) {
|
TEST(td_encode_test, encode_decode_cstr) {
|
||||||
uint8_t * buf = new uint8_t[1024 * 1024];
|
uint8_t *buf = new uint8_t[1024 * 1024];
|
||||||
char * cstr = new char[1024 * 1024];
|
char *cstr = new char[1024 * 1024];
|
||||||
const char *dcstr;
|
const char *dcstr;
|
||||||
SCoder encoder;
|
SCoder encoder;
|
||||||
SCoder decoder;
|
SCoder decoder;
|
||||||
|
@ -208,7 +208,7 @@ TEST(td_encode_test, encode_decode_cstr) {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t A_a;
|
int32_t A_a;
|
||||||
int64_t A_b;
|
int64_t A_b;
|
||||||
char * A_c;
|
char *A_c;
|
||||||
} SStructA_v1;
|
} SStructA_v1;
|
||||||
|
|
||||||
static int32_t tSStructA_v1_encode(SCoder *pCoder, const SStructA_v1 *pSAV1) {
|
static int32_t tSStructA_v1_encode(SCoder *pCoder, const SStructA_v1 *pSAV1) {
|
||||||
|
@ -240,7 +240,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t A_a;
|
int32_t A_a;
|
||||||
int64_t A_b;
|
int64_t A_b;
|
||||||
char * A_c;
|
char *A_c;
|
||||||
// -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION--------------
|
// -------------------BELOW FEILDS ARE ADDED IN A NEW VERSION--------------
|
||||||
int16_t A_d;
|
int16_t A_d;
|
||||||
int16_t A_e;
|
int16_t A_e;
|
||||||
|
@ -437,4 +437,4 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
Loading…
Reference in New Issue