Merge pull request #9628 from taosdata/feature/tq

Feature/tq
This commit is contained in:
Shengliang Guan 2022-01-06 11:21:21 +08:00 committed by GitHub
commit 45f9eaa01c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 537 additions and 259 deletions

View File

@ -312,14 +312,23 @@ typedef struct SEpSet {
} SEpSet; } SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
if (buf == NULL) return sizeof(SEpSet); int tlen = 0;
memcpy(buf, pEp, sizeof(SEpSet)); tlen += taosEncodeFixedI8(buf, pEp->inUse);
// TODO: endian conversion tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
return sizeof(SEpSet); for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->port[i]);
tlen += taosEncodeString(buf, pEp->fqdn[i]);
}
return tlen;
} }
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEpSet) { static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
memcpy(pEpSet, buf, sizeof(SEpSet)); buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->port[i]);
buf = taosDecodeStringTo(buf, pEp->fqdn[i]);
}
return buf; return buf;
} }
@ -1083,8 +1092,8 @@ typedef struct {
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen; return tlen;
@ -1114,41 +1123,62 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi
} }
typedef struct { typedef struct {
char* topicName; int32_t topicNum;
char* consumerGroup;
int64_t consumerId; int64_t consumerId;
char* consumerGroup;
char* topicName[];
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeFixedI32(buf, pReq->topicNum);
tlen += taosEncodeString(buf, pReq->consumerGroup);
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, pReq->topicName[i]);
}
return tlen; return tlen;
} }
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeString(buf, &pReq->topicName); buf = taosDecodeFixedI32(buf, &pReq->topicNum);
buf = taosDecodeString(buf, &pReq->consumerGroup);
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) {
buf = taosDecodeString(buf, &pReq->topicName[i]);
}
return buf; return buf;
} }
typedef struct { typedef struct SMqSubTopic {
int32_t vgId; int32_t vgId;
SEpSet pEpSet; int64_t topicId;
SEpSet epSet;
} SMqSubTopic;
typedef struct {
int32_t topicNum;
SMqSubTopic topics[];
} SCMSubscribeRsp; } SCMSubscribeRsp;
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI32(buf, pRsp->vgId); tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
tlen += taosEncodeSEpSet(buf, &pRsp->pEpSet); for(int i = 0; i < pRsp->topicNum; i++) {
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return tlen; return tlen;
} }
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &pRsp->vgId); buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
buf = taosDecodeSEpSet(buf, &pRsp->pEpSet); for(int i = 0; i < pRsp->topicNum; i++) {
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return buf; return buf;
} }
@ -1157,10 +1187,36 @@ typedef struct {
int64_t consumerId; int64_t consumerId;
int64_t consumerGroupId; int64_t consumerGroupId;
int64_t offset; int64_t offset;
char* sql;
char* logicalPlan;
char* physicalPlan;
} SMVSubscribeReq; } SMVSubscribeReq;
static FORCE_INLINE int tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->topicId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
tlen += taosEncodeFixedI64(buf, pReq->offset);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
return tlen;
}
static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->topicId);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
buf = taosDecodeFixedI64(buf, &pReq->offset);
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
return buf;
}
typedef struct { typedef struct {
int64_t newOffset; int64_t status;
} SMVSubscribeRsp; } SMVSubscribeRsp;
typedef struct { typedef struct {

View File

@ -159,6 +159,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)

View File

@ -401,6 +401,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal")
#define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted")
#define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit") #define TSDB_CODE_WAL_SIZE_LIMIT TAOS_DEF_ERROR_CODE(0, 0x1002) //"WAL size exceeds limit")
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) //"WAL invalid version")
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
// tfs // tfs
#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") #define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")

View File

@ -177,6 +177,7 @@ do { \
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE

View File

@ -25,11 +25,8 @@ extern "C" {
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode);
SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SCGroupObj *mndAcquireCGroup(SMnode *pMnode, char *consumerGroup);
void mndReleaseCGroup(SMnode *pMnode, SCGroupObj *pCGroup);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,6 +22,7 @@
#include "sync.h" #include "sync.h"
#include "tmsg.h" #include "tmsg.h"
#include "thash.h" #include "thash.h"
#include "tlist.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
@ -307,46 +308,117 @@ typedef struct {
char payload[]; char payload[];
} SShowObj; } SShowObj;
typedef struct { #if 0
char name[TSDB_TOPIC_FNAME_LEN]; typedef struct SConsumerObj {
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
char* logicalPlan;
char* physicalPlan;
} STopicObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid; uint64_t uid;
int64_t createTime;
int64_t updateTime;
//uint64_t dbUid; //uint64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
SArray* topics;
} SConsumerObj; } SConsumerObj;
typedef struct { typedef struct SMqTopicConsumer {
char name[TSDB_TOPIC_FNAME_LEN]; int64_t consumerId;
char db[TSDB_DB_FNAME_LEN]; SList* topicList;
int64_t createTime; } SMqTopicConsumer;
int64_t updateTime; #endif
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
SList *consumerIds; // SList<int64_t>
SList *idleVGroups; // SList<int32_t>
} SMqCGroup;
typedef struct SMqTopicObj {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
char *sql;
char *logicalPlan;
char *physicalPlan;
SHashObj *cgroups; // SHashObj<SMqCGroup>
} SMqTopicObj;
// TODO: add cache and change name to id
typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN];
SList *vgroups; // SList<int32_t>
} SMqConsumerTopic;
typedef struct SMqConsumerObj {
SRWLatch lock;
int64_t consumerId;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic>
} SMqConsumerObj;
typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned
SList *vgId; // SList<int32_t>
} SMqSubConsumerObj;
typedef struct SMqSubCGroupObj {
char name[TSDB_CONSUMER_GROUP_LEN];
SList *consumers; // SList<SMqConsumerObj>
} SMqSubCGroupObj;
typedef struct SMqSubTopicObj {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
char *sql;
char *logicalPlan;
char *physicalPlan;
SList *cgroups; // SList<SMqSubCGroupObj>
} SMqSubTopicObj;
typedef struct SMqConsumerSubObj {
int64_t topicUid;
SList *vgIds; // SList<int64_t>
} SMqConsumerSubObj;
typedef struct SMqConsumerHbObj {
int64_t consumerId;
SList *consumerSubs; // SList<SMqConsumerSubObj>
} SMqConsumerHbObj;
typedef struct SMqVGroupSubObj {
int64_t topicUid;
SList *consumerIds; // SList<int64_t>
} SMqVGroupSubObj;
typedef struct SMqVGroupHbObj {
int64_t vgId;
SList *vgSubs; // SList<SMqVGroupSubObj>
} SMqVGroupHbObj;
#if 0
typedef struct SCGroupObj {
char name[TSDB_TOPIC_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid; uint64_t uid;
//uint64_t dbUid; //uint64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
SList* consumerIds;
} SCGroupObj; } SCGroupObj;
#endif
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];

View File

@ -25,8 +25,11 @@ extern "C" {
int32_t mndInitTopic(SMnode *pMnode); int32_t mndInitTopic(SMnode *pMnode);
void mndCleanupTopic(SMnode *pMnode); void mndCleanupTopic(SMnode *pMnode);
STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName);
void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -24,16 +24,17 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer); static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pConsumer, SConsumerObj *pNewConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
@ -57,8 +58,8 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp); /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq); /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
@ -66,33 +67,52 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return 0;
}
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) goto CM_ENCODE_OVER; if (pRaw == NULL) goto CM_ENCODE_OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CM_ENCODE_OVER) int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CM_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime, CM_ENCODE_OVER) int32_t len = strlen(pConsumer->cgroup);
SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime, CM_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid, CM_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER);
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pConsumer->version, CM_ENCODE_OVER) for (int i = 0; i < topicNum; i++) {
int32_t len;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
len = strlen(pConsumerTopic->name);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER);
int vgSize;
if (pConsumerTopic->vgroups == NULL) {
vgSize = 0;
} else {
vgSize = listNEles(pConsumerTopic->vgroups);
}
SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER);
for (int j = 0; j < vgSize; j++) {
// SList* head;
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
CM_ENCODE_OVER: CM_ENCODE_OVER:
if (terrno != 0) { if (terrno != 0) {
mError("consumer:%s, failed to encode to raw:%p since %s", pConsumer->name, pRaw, terrstr()); mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return NULL; return NULL;
} }
mTrace("consumer:%s, encode to raw:%p, row:%p", pConsumer->name, pRaw, pConsumer); mTrace("consumer:%ld, encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
return pRaw; return pRaw;
} }
@ -107,68 +127,77 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto CONSUME_DECODE_OVER; goto CONSUME_DECODE_OVER;
} }
int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); int32_t size = sizeof(SMqConsumerObj);
SSdbRow *pRow = sdbAllocRow(size); SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto CONSUME_DECODE_OVER; if (pRow == NULL) goto CONSUME_DECODE_OVER;
SConsumerObj *pConsumer = sdbGetRowObj(pRow); SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CONSUME_DECODE_OVER; if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN, CONSUME_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN, CONSUME_DECODE_OVER) int32_t len, topicNum;
SDB_GET_INT64(pRaw, dataPos, &pConsumer->createTime, CONSUME_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pConsumer->updateTime, CONSUME_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pConsumer->uid, CONSUME_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ for (int i = 0; i < topicNum; i++) {
SDB_GET_INT32(pRaw, dataPos, &pConsumer->version, CONSUME_DECODE_OVER) int32_t topicLen;
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER) SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
terrno = 0; if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO
return NULL;
}
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
CONSUME_DECODE_OVER: CONSUME_DECODE_OVER:
if (terrno != 0) { if (terrno != 0) {
mError("consumer:%s, failed to decode from raw:%p since %s", pConsumer->name, pRaw, terrstr()); mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
tfree(pRow); tfree(pRow);
return NULL; return NULL;
} }
mTrace("consumer:%s, decode from raw:%p, row:%p", pConsumer->name, pRaw, pConsumer); /*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
return pRow; return pRow;
} }
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%s, perform insert action, row:%p", pConsumer->name, pConsumer); mTrace("consumer:%ld, perform insert action", pConsumer->consumerId);
return 0; return 0;
} }
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%s, perform delete action, row:%p", pConsumer->name, pConsumer); mTrace("consumer:%ld, perform delete action", pConsumer->consumerId);
return 0; return 0;
} }
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%s, perform update action, old_row:%p new_row:%p", pOldConsumer->name, pOldConsumer, pNewConsumer); mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId);
atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime);
atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version);
taosWLockLatch(&pOldConsumer->lock);
// TODO handle update // TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/
/*taosWUnLockLatch(&pOldConsumer->lock);*/
taosWUnLockLatch(&pOldConsumer->lock);
return 0; return 0;
} }
SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
} }
return pConsumer; return pConsumer;
} }
void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer) { void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
@ -178,23 +207,185 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq *pSubscribe; SCMSubscribeReq *pSubscribe;
tDeserializeSCMSubscribeReq(msgStr, pSubscribe); tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
// add consumerGroupId -> list<consumerId> to sdb int64_t consumerId = pSubscribe->consumerId;
// add consumerId -> list<consumer> to sdb char *consumerGroup = pSubscribe->consumerGroup;
// add consumer -> list<consumerId> to sdb
SArray *newSub = NULL;
int newTopicNum = pSubscribe->topicNum;
if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
}
for (int i = 0; i < newTopicNum; i++) {
char *topic = pSubscribe->topicName[i];
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO: free
return -1;
}
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
taosArrayPush(newSub, pConsumerTopic);
free(pConsumerTopic);
}
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = malloc(sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
strcpy(pConsumer->cgroup, pSubscribe->consumerGroup);
} else {
oldSub = pConsumer->topics;
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
return -1;
}
int i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
SMqConsumerTopic *pOldTopic = NULL;
SMqConsumerTopic *pNewTopic = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
pOldTopic = taosArrayGet(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
pNewTopic = taosArrayGet(newSub, i);
} else {
pNewTopic = taosArrayGet(newSub, i);
pOldTopic = taosArrayGet(oldSub, j);
char *newName = pNewTopic->name;
char *oldName = pOldTopic->name;
int comp = compareLenPrefixedStr(newName, oldName);
if (comp == 0) {
// do nothing
pOldTopic = pNewTopic = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
pOldTopic = NULL;
i++;
} else {
pNewTopic = NULL;
j++;
}
}
if (pOldTopic != NULL) {
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
SListIter iter;
tdListInitIter(vgroups, &iter, TD_LIST_FORWARD);
SListNode *pn;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// TODO release
if (pVgObj == NULL) {
// TODO handle error
continue;
}
// acquire and get epset
void *pMqVgSetReq =
mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = pMqVgSetReq;
action.contLen = 0; // TODO
action.msgType = TDMT_VND_MQ_SET_CONN;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMqVgSetReq);
mndTransDrop(pTrans);
// TODO free
return -1;
}
}
taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
} else if (pNewTopic != NULL) {
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
ASSERT(pTopic != NULL);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup));
if (pGroup == NULL) {
// add new group
pGroup = malloc(sizeof(SMqCGroup));
if (pGroup == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->consumerIds = tdListNew(sizeof(int64_t));
if (pGroup->consumerIds == NULL) {
free(pGroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pGroup->status = 0;
// add into cgroups
taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup,
sizeof(SMqCGroup));
}
// put the consumer into list
// rebalance will be triggered by timer
tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId);
SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic);
sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pTopicRaw);
} else {
ASSERT(0);
}
}
// destroy old sub
taosArrayDestroy(oldSub);
// put new sub into consumerobj
pConsumer->topics = newSub;
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
// TODO: error handling
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
// TODO: free memory
mndTransDrop(pTrans);
return 0; return 0;
} }
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
@ -272,13 +463,11 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO
int32_t numOfConsumers = 0; int32_t numOfConsumers = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break; if (pIter == NULL) break;
if (strcmp(pConsumer->db, dbName) == 0) { numOfConsumers++;
numOfConsumers++;
}
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
} }
@ -337,57 +526,6 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs
return 0; return 0;
} }
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SConsumerObj *pConsumer = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
if (pShow->pIter == NULL) break;
if (strncmp(pConsumer->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pConsumer);
continue;
}
cols = 0;
char consumerName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(consumerName, pConsumer->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, consumerName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pConsumer->createTime;
cols++;
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pConsumer->numOfColumns;*/
/*cols++;*/
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pConsumer->numOfTags;*/
/*cols++;*/
numOfRows++;
sdbRelease(pSdb, pConsumer);
}
pShow->numOfReads += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);

View File

@ -14,6 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTopic.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
@ -27,18 +28,16 @@
#define MND_TOPIC_VER_NUMBER 1 #define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_RESERVE_SIZE 64 #define MND_TOPIC_RESERVE_SIZE 64
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic); static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TOPIC, SSdbTable table = {.sdbType = SDB_TOPIC,
@ -58,45 +57,31 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {} void mndCleanupTopic(SMnode *pMnode) {}
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER; if (pRaw == NULL) goto WTF;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, WTF);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pTopic->uid, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, WTF);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTopic->version, WTF);
SDB_SET_INT32(pRaw, dataPos, pTopic->execLen, TOPIC_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, WTF);
SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, WTF);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, WTF);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, WTF);
terrno = 0; WTF:
TOPIC_ENCODE_OVER:
if (terrno != 0) {
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
return pRaw; return pRaw;
} }
static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
@ -105,25 +90,28 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); int32_t size = sizeof(SMqTopicObj);
SSdbRow *pRow = sdbAllocRow(size); SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto TOPIC_DECODE_OVER; if (pRow == NULL) goto TOPIC_DECODE_OVER;
STopicObj *pTopic = sdbGetRowObj(pRow); SMqTopicObj *pTopic = sdbGetRowObj(pRow);
if (pTopic == NULL) goto TOPIC_DECODE_OVER; if (pTopic == NULL) goto TOPIC_DECODE_OVER;
int32_t len;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->execLen, TOPIC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen, TOPIC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
@ -140,18 +128,18 @@ TOPIC_DECODE_OVER:
return pRow; return pRow;
} }
static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic) { static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform insert action, row:%p", pTopic->name, pTopic); mTrace("topic:%s, perform insert action", pTopic->name);
return 0; return 0;
} }
static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic) { static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
mTrace("topic:%s, perform delete action, row:%p", pTopic->name, pTopic); mTrace("topic:%s, perform delete action", pTopic->name);
return 0; return 0;
} }
static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj *pNewTopic) { static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
mTrace("topic:%s, perform update action, old_row:%p new_row:%p", pOldTopic->name, pOldTopic, pNewTopic); mTrace("topic:%s, perform update action", pOldTopic->name);
atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version); atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
@ -163,16 +151,16 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj
return 0; return 0;
} }
STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
STopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
if (pTopic == NULL) { if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
} }
return pTopic; return pTopic;
} }
void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic) { void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
@ -187,7 +175,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicMsg); int32_t contLen = sizeof(SDDropTopicMsg);
SDDropTopicMsg *pDrop = calloc(1, contLen); SDDropTopicMsg *pDrop = calloc(1, contLen);
@ -210,7 +198,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) {
} }
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
STopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
topicObj.createTime = taosGetTimestampMs(); topicObj.createTime = taosGetTimestampMs();
@ -222,6 +210,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1; if (pTopicRaw == NULL) return -1;
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
// TODO: replace with trans to support recovery
return sdbWrite(pMnode->pSdb, pTopicRaw); return sdbWrite(pMnode->pSdb, pTopicRaw);
} }
@ -238,7 +227,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return -1; return -1;
} }
STopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name);
if (pTopic != NULL) { if (pTopic != NULL) {
sdbRelease(pMnode->pSdb, pTopic); sdbRelease(pMnode->pSdb, pTopic);
if (pCreate->igExists) { if (pCreate->igExists) {
@ -270,7 +259,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { return 0; } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; }
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
@ -278,7 +267,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
mDebug("topic:%s, start to drop", pDrop->name); mDebug("topic:%s, start to drop", pDrop->name);
STopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name);
if (pTopic == NULL) { if (pTopic == NULL) {
if (pDrop->igNotExists) { if (pDrop->igNotExists) {
mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name);
@ -384,13 +373,11 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
int32_t numOfTopics = 0; int32_t numOfTopics = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
STopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break; if (pIter == NULL) break;
if (strcmp(pTopic->db, dbName) == 0) { numOfTopics++;
numOfTopics++;
}
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
@ -463,13 +450,13 @@ static void mndExtractTableName(char *tableId, char *name) {
} }
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
STopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t cols = 0; int32_t cols = 0;
char *pWrite; char *pWrite;
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pShow->db, 64); tstrncpy(prefix, pShow->db, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);

View File

@ -261,15 +261,18 @@ int walLoadMeta(SWal* pWal) {
memset(buf, 0, size + 5); memset(buf, 0, size + 5);
int tfd = tfOpenRead(fnameStr); int tfd = tfOpenRead(fnameStr);
if (tfRead(tfd, buf, size) != size) { if (tfRead(tfd, buf, size) != size) {
tfClose(tfd);
free(buf); free(buf);
return -1; return -1;
} }
// load into fileInfoSet // load into fileInfoSet
int code = walMetaDeserialize(pWal, buf); int code = walMetaDeserialize(pWal, buf);
if (code != 0) { if (code != 0) {
tfClose(tfd);
free(buf); free(buf);
return -1; return -1;
} }
tfClose(tfd);
free(buf); free(buf);
return 0; return 0;
} }

View File

@ -15,6 +15,7 @@
#include "tfile.h" #include "tfile.h"
#include "walInt.h" #include "walInt.h"
#include "taoserror.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
@ -32,6 +33,7 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
pRead->status = 0; pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead)); pRead->pHead = malloc(sizeof(SWalHead));
if (pRead->pHead == NULL) { if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
free(pRead); free(pRead);
return NULL; return NULL;
} }
@ -57,16 +59,19 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
code = tfLseek(idxTfd, offset, SEEK_SET); code = tfLseek(idxTfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalIdxEntry entry; SWalIdxEntry entry;
if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
// TODO:deserialize // TODO:deserialize
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET); code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
return code; return code;
@ -81,6 +86,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int64_t logTfd = tfOpenRead(fnameStr); int64_t logTfd = tfOpenRead(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
@ -102,6 +108,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
@ -115,7 +122,6 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
if (pRead->curFileFirstVer != pRet->firstVer) { if (pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer); code = walReadChangeFile(pRead, pRet->firstVer);
if (code < 0) { if (code < 0) {
// TODO: set error flag
return -1; return -1;
} }
} }
@ -134,7 +140,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
// TODO: check wal life // TODO: check wal life
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code != 0) { if (code < 0) {
return -1; return -1;
} }
} }
@ -147,11 +153,13 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
code = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.len) { if (pRead->capacity < pRead->pHead->head.len) {
void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); void *ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pRead->pHead = ptr;
@ -165,6 +173,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead); code = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
pRead->curVersion++; pRead->curVersion++;

View File

@ -30,17 +30,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, idxOff, SEEK_SET); code = tfLseek(idxTfd, idxOff, SEEK_SET);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalIdxEntry entry; SWalIdxEntry entry;
// TODO:deserialize // TODO:deserialize
code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry)); code = tfRead(idxTfd, &entry, sizeof(SWalIdxEntry));
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_CUR); code = tfLseek(logTfd, entry.offset, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
return code; return code;
@ -56,11 +59,13 @@ int walChangeFileToLast(SWal* pWal) {
walBuildIdxName(pWal, fileFirstVer, fnameStr); walBuildIdxName(pWal, fileFirstVer, fnameStr);
idxTfd = tfOpenReadWrite(fnameStr); idxTfd = tfOpenReadWrite(fnameStr);
if (idxTfd < 0) { if (idxTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
logTfd = tfOpenReadWrite(fnameStr); logTfd = tfOpenReadWrite(fnameStr);
if (logTfd < 0) { if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
// switch file // switch file
@ -76,11 +81,12 @@ int walChangeFile(SWal* pWal, int64_t ver) {
code = tfClose(pWal->writeLogTfd); code = tfClose(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
code = tfClose(pWal->writeIdxTfd); code = tfClose(pWal->writeIdxTfd);
if (code != 0) { if (code != 0) {
// TODO terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
@ -113,6 +119,7 @@ int walSeekVer(SWal* pWal, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {

View File

@ -25,6 +25,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { if (ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
pWal->vers.commitVer = ver; pWal->vers.commitVer = ver;
@ -38,6 +39,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1; return -1;
} }
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);