refactor: adjust mnode def
This commit is contained in:
parent
285c396b0b
commit
f018ccc28e
|
@ -72,7 +72,7 @@ typedef enum {
|
||||||
TRN_TYPE_DROP_USER = 1003,
|
TRN_TYPE_DROP_USER = 1003,
|
||||||
TRN_TYPE_CREATE_FUNC = 1004,
|
TRN_TYPE_CREATE_FUNC = 1004,
|
||||||
TRN_TYPE_DROP_FUNC = 1005,
|
TRN_TYPE_DROP_FUNC = 1005,
|
||||||
|
|
||||||
TRN_TYPE_CREATE_SNODE = 1006,
|
TRN_TYPE_CREATE_SNODE = 1006,
|
||||||
TRN_TYPE_DROP_SNODE = 1007,
|
TRN_TYPE_DROP_SNODE = 1007,
|
||||||
TRN_TYPE_CREATE_QNODE = 1008,
|
TRN_TYPE_CREATE_QNODE = 1008,
|
||||||
|
@ -115,7 +115,10 @@ typedef enum {
|
||||||
TRN_TYPE_STB_SCOPE_END,
|
TRN_TYPE_STB_SCOPE_END,
|
||||||
} ETrnType;
|
} ETrnType;
|
||||||
|
|
||||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
typedef enum {
|
||||||
|
TRN_POLICY_ROLLBACK = 0,
|
||||||
|
TRN_POLICY_RETRY = 1,
|
||||||
|
} ETrnPolicy;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
DND_REASON_ONLINE = 0,
|
DND_REASON_ONLINE = 0,
|
||||||
|
@ -131,6 +134,15 @@ typedef enum {
|
||||||
DND_REASON_OTHERS
|
DND_REASON_OTHERS
|
||||||
} EDndReason;
|
} EDndReason;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
CONSUMER_UPDATE__TOUCH = 1,
|
||||||
|
CONSUMER_UPDATE__ADD,
|
||||||
|
CONSUMER_UPDATE__REMOVE,
|
||||||
|
CONSUMER_UPDATE__LOST,
|
||||||
|
CONSUMER_UPDATE__RECOVER,
|
||||||
|
CONSUMER_UPDATE__MODIFY,
|
||||||
|
} ECsmUpdateType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
ETrnStage stage;
|
ETrnStage stage;
|
||||||
|
@ -386,7 +398,6 @@ typedef struct {
|
||||||
int32_t codeSize;
|
int32_t codeSize;
|
||||||
char* pComment;
|
char* pComment;
|
||||||
char* pCode;
|
char* pCode;
|
||||||
char pData[];
|
|
||||||
} SFuncObj;
|
} SFuncObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -425,18 +436,8 @@ typedef struct {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SMqOffsetObj;
|
} SMqOffsetObj;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
|
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
|
||||||
int32_t tlen = 0;
|
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
|
||||||
tlen += taosEncodeString(buf, pOffset->key);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
|
|
||||||
buf = taosDecodeStringTo(buf, pOffset->key);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
@ -459,26 +460,15 @@ typedef struct {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqTopicObj;
|
} SMqTopicObj;
|
||||||
|
|
||||||
enum {
|
|
||||||
CONSUMER_UPDATE__TOUCH = 1,
|
|
||||||
CONSUMER_UPDATE__ADD,
|
|
||||||
CONSUMER_UPDATE__REMOVE,
|
|
||||||
CONSUMER_UPDATE__LOST,
|
|
||||||
CONSUMER_UPDATE__RECOVER,
|
|
||||||
CONSUMER_UPDATE__MODIFY,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
char appId[TSDB_CGROUP_LEN];
|
char appId[TSDB_CGROUP_LEN];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
// hbStatus is not applicable to serialization
|
int32_t hbStatus; // hbStatus is not applicable to serialization
|
||||||
int32_t hbStatus;
|
SRWLatch lock; // lock is used for topics update
|
||||||
// lock is used for topics update
|
|
||||||
SRWLatch lock;
|
|
||||||
SArray* currentTopics; // SArray<char*>
|
SArray* currentTopics; // SArray<char*>
|
||||||
SArray* rebNewTopics; // SArray<char*>
|
SArray* rebNewTopics; // SArray<char*>
|
||||||
SArray* rebRemovedTopics; // SArray<char*>
|
SArray* rebRemovedTopics; // SArray<char*>
|
||||||
|
@ -492,7 +482,6 @@ typedef struct {
|
||||||
int64_t upTime;
|
int64_t upTime;
|
||||||
int64_t subscribeTime;
|
int64_t subscribeTime;
|
||||||
int64_t rebalanceTime;
|
int64_t rebalanceTime;
|
||||||
|
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
|
@ -581,19 +570,18 @@ typedef struct {
|
||||||
} SMqRebOutputObj;
|
} SMqRebOutputObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char sourceDb[TSDB_DB_FNAME_LEN];
|
char sourceDb[TSDB_DB_FNAME_LEN];
|
||||||
char targetDb[TSDB_DB_FNAME_LEN];
|
char targetDb[TSDB_DB_FNAME_LEN];
|
||||||
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
char targetSTbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int64_t createTime;
|
int64_t createTime;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int32_t vgNum;
|
int32_t vgNum;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
// int32_t sqlLen;
|
|
||||||
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
|
||||||
int32_t fixedSinkVgId; // 0 for shuffle
|
int32_t fixedSinkVgId; // 0 for shuffle
|
||||||
int64_t smaId; // 0 for unused
|
int64_t smaId; // 0 for unused
|
||||||
|
|
|
@ -13,12 +13,14 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndDef.h"
|
#include "mndDef.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
|
|
||||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
||||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeString(buf, pOffset->key);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pOffset->offset);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
|
||||||
|
buf = taosDecodeStringTo(buf, pOffset->key);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pOffset->offset);
|
||||||
|
return buf;
|
||||||
|
}
|
Loading…
Reference in New Issue