Merge pull request #25068 from taosdata/mark/tmq-opti

opti:subscribe logic
This commit is contained in:
dapan1121 2024-03-19 10:28:08 +08:00 committed by GitHub
commit adcad780e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1055 additions and 1166 deletions

View File

@ -3921,11 +3921,11 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeatroySMqHbReq(SMqHbReq* pReq); void tDestroySMqHbReq(SMqHbReq* pReq);
int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp); int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); void tDestroySMqHbRsp(SMqHbRsp* pRsp);
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);

View File

@ -779,7 +779,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }
tDeatroySMqHbRsp(&rsp); tDestroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
} }
@ -861,7 +861,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
OVER: OVER:
tDeatroySMqHbReq(&req); tDestroySMqHbReq(&req);
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }

View File

@ -6201,9 +6201,8 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
return 0; return 0;
} }
int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) { void tDestroySMqHbRsp(SMqHbRsp *pRsp) {
taosArrayDestroy(pRsp->topicPrivileges); taosArrayDestroy(pRsp->topicPrivileges);
return 0;
} }
int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
@ -6250,13 +6249,12 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
return 0; return 0;
} }
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) { void tDestroySMqHbReq(SMqHbReq *pReq) {
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) { for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i); TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
if (vgs) taosArrayDestroy(vgs->offsetRows); if (vgs) taosArrayDestroy(vgs->offsetRows);
} }
taosArrayDestroy(pReq->topics); taosArrayDestroy(pReq->topics);
return 0;
} }
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {

View File

@ -24,27 +24,22 @@ extern "C" {
enum { enum {
MQ_CONSUMER_STATUS_REBALANCE = 1, MQ_CONSUMER_STATUS_REBALANCE = 1,
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS_READY, MQ_CONSUMER_STATUS_READY,
MQ_CONSUMER_STATUS_LOST, MQ_CONSUMER_STATUS_LOST,
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore };
// MQ_CONSUMER_STATUS__LOST_REBD,
};\
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info); void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer);
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer);
const char *mndConsumerStatusName(int status); const char *mndConsumerStatusName(int status);

View File

@ -149,6 +149,7 @@ typedef enum {
CONSUMER_REMOVE_REB, // remove after rebalance CONSUMER_REMOVE_REB, // remove after rebalance
CONSUMER_UPDATE_REC, // update after recover CONSUMER_UPDATE_REC, // update after recover
CONSUMER_UPDATE_SUB, // update after subscribe req CONSUMER_UPDATE_SUB, // update after subscribe req
CONSUMER_INSERT_SUB,
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
@ -556,8 +557,9 @@ typedef struct {
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
} SMqConsumerObj; } SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted); void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);

View File

@ -30,7 +30,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const c
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key); SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub); int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);

View File

@ -68,25 +68,27 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) { void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); void *msg = rpcMallocCont(sizeof(int64_t));
if (pClearMsg == NULL) { if (msg == NULL) {
mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId, mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId,
(int32_t)sizeof(SMqConsumerClearMsg)); (int32_t)sizeof(int64_t));
return; return;
} }
pClearMsg->consumerId = consumerId; *(int64_t*)msg = consumerId;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .msgType = msgType,
.pCont = pClearMsg, .pCont = msg,
.contLen = sizeof(SMqConsumerClearMsg), .contLen = sizeof(int64_t),
.info = *info, .info = *info,
}; };
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return; if (code != 0){
mError("consumer:%"PRId64" send consumer msg:%d error:%d", consumerId, msgType, code);
}
} }
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
@ -148,54 +150,62 @@ FAILED:
} }
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
int32_t code = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
return -1; code = -1;
goto END;
} }
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
pConsumer->status, mndConsumerStatusName(pConsumer->status)); pConsumer->status, mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1; code = -1;
goto END;
} }
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL);
pConsumerNew->updateType = CONSUMER_UPDATE_REC; if (pConsumerNew == NULL){
code = -1;
goto END;
}
mndReleaseConsumer(pMnode, pConsumer); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
if (pTrans == NULL) { if (pTrans == NULL) {
goto FAIL; code = -1;
goto END;
} }
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0) { code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
goto FAIL; if (code != 0) {
goto END;
} }
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (code != 0) {
goto END;
tDeleteSMqConsumerObj(pConsumerNew, true); }
code = mndTransPrepare(pMnode, pTrans);
END:
mndReleaseConsumer(pMnode, pConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
FAIL:
tDeleteSMqConsumerObj(pConsumerNew, true);
mndTransDrop(pTrans);
return -1;
} }
// todo check the clear process
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
int32_t code = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
@ -206,33 +216,37 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL);
if (pConsumerNew == NULL){
code = -1;
goto END;
}
mndReleaseConsumer(pMnode, pConsumer); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) {
code = -1;
goto END;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) goto FAIL;
// this is the drop action, not the update action // this is the drop action, not the update action
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; code = mndSetConsumerDropLogs(pTrans, pConsumerNew);
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (code != 0) {
goto END;
}
tDeleteSMqConsumerObj(pConsumerNew, true); code = mndTransPrepare(pMnode, pTrans);
END:
mndReleaseConsumer(pMnode, pConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
FAIL:
tDeleteSMqConsumerObj(pConsumerNew, true);
mndTransDrop(pTrans);
return -1;
} }
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) { static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege)); rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
if (rsp->topicPrivileges == NULL) { if (rsp->topicPrivileges == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
} }
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) { for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *topic = taosArrayGetP(pConsumer->currentTopics, i);
@ -253,6 +267,47 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRs
return 0; return 0;
} }
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
TopicOffsetRows *data = taosArrayGet(req->topics, i);
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if (pSub == NULL) {
continue;
}
taosWLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
if (pConsumerEp) {
taosArrayDestroy(pConsumerEp->offsetRows);
pConsumerEp->offsetRows = data->offsetRows;
data->offsetRows = NULL;
}
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
}
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
if (tlen <= 0){
return TSDB_CODE_OUT_OF_MEMORY;
}
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){
rpcFreeCont(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
return 0;
}
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
@ -261,7 +316,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
@ -270,7 +324,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
pConsumer = mndAcquireConsumer(pMnode, consumerId); pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " not exist", consumerId); mError("consumer:0x%" PRIx64 " not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto end; goto end;
} }
@ -284,130 +337,29 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS_LOST) { if (status == MQ_CONSUMER_STATUS_LOST) {
mInfo("try to recover consumer:0x%" PRIx64 "", consumerId); mInfo("try to recover consumer:0x%" PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
pRecoverMsg->consumerId = consumerId;
SRpcMsg pRpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
} }
for (int i = 0; i < taosArrayGetSize(req.topics); i++) { storeOffsetRows(pMnode, &req, pConsumer);
TopicOffsetRows *data = taosArrayGet(req.topics, i); code = buildMqHbRsp(pMsg, &rsp);
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if (pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue;
}
taosWLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
if (pConsumerEp) {
taosArrayDestroy(pConsumerEp->offsetRows);
pConsumerEp->offsetRows = data->offsetRows;
data->offsetRows = NULL;
}
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// encode rsp
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
tSerializeSMqHbRsp(buf, tlen, &rsp);
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
end: end:
tDeatroySMqHbRsp(&rsp); tDestroySMqHbRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
tDeatroySMqHbReq(&req); tDestroySMqHbReq(&req);
return code; return code;
} }
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
SMnode *pMnode = pMsg->info.node;
SMqAskEpReq req = {0};
SMqAskEpRsp rsp = {0};
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int64_t consumerId = req.consumerId;
int32_t epoch = req.epoch;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
if (ret != 0) {
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
pConsumer->cgroup);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto FAIL;
}
atomic_store_32(&pConsumer->hbStatus, 0);
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS_LOST) {
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
pRecoverMsg->consumerId = consumerId;
SRpcMsg pRpcMsg = {
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
.pCont = pRecoverMsg,
.contLen = sizeof(SMqConsumerRecoverMsg),
.info = pMsg->info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
// 2. check epoch, only send ep info when epochs do not match
if (epoch != serverEpoch) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
serverEpoch);
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
if (rsp.topics == NULL) { if (rsp->topics == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
goto FAIL; return TSDB_CODE_OUT_OF_MEMORY;
} }
// handle all topics subscribed by this consumer // handle all topics subscribed by this consumer
@ -416,9 +368,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created // txn guarantees pSub is created
if (pSub == NULL) { if (pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue; continue;
} }
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -429,9 +378,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema // 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { if (pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
continue; continue;
@ -447,17 +393,16 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
// 2.2 iterate all vg assigned to the consumer of that topic // 2.2 iterate all vg assigned to the consumer of that topic
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
// this customer assigned vgroups // this customer assigned vgroups
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
if (topicEp.vgs == NULL) { if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
goto FAIL; return TSDB_CODE_OUT_OF_MEMORY;
} }
for (int32_t j = 0; j < vgNum; j++) { for (int32_t j = 0; j < vgNum; j++) {
@ -481,49 +426,102 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
taosArrayPush(topicEp.vgs, &vgEp); taosArrayPush(topicEp.vgs, &vgEp);
} }
taosArrayPush(rsp.topics, &topicEp); taosArrayPush(rsp->topics, &topicEp);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} return 0;
}
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
// encode rsp // encode rsp
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
} }
SMqRspHead *pHead = buf; SMqRspHead *pHead = buf;
pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
pHead->epoch = serverEpoch; pHead->epoch = serverEpoch;
pHead->consumerId = pConsumer->consumerId; pHead->consumerId = consumerId;
pHead->walsver = 0; pHead->walsver = 0;
pHead->walever = 0; pHead->walever = 0;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqAskEpRsp(&abuf, &rsp); tEncodeSMqAskEpRsp(&abuf, rsp);
// release consumer and free memory
tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
// send rsp // send rsp
pMsg->info.rsp = buf; pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen; pMsg->info.rspLen = tlen;
return 0; return 0;
FAIL:
tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
} }
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqAskEpReq req = {0};
SMqAskEpRsp rsp = {0};
int32_t code = 0;
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t consumerId = req.consumerId;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
}
if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
pConsumer->cgroup);
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto END;
}
atomic_store_32(&pConsumer->hbStatus, 0);
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS_LOST) {
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
}
if (status != MQ_CONSUMER_STATUS_READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto END;
}
int32_t epoch = req.epoch;
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
// 2. check epoch, only send ep info when epochs do not match
if (epoch != serverEpoch) {
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
consumerId, epoch, serverEpoch);
code = addEpSetInfo(pMnode, pConsumer, epoch, &rsp);
if(code != 0){
goto END;
}
}
code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
END:
tDeleteSMqAskEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
return code;
}
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
@ -531,7 +529,7 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p
return 0; return 0;
} }
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) { int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
@ -539,8 +537,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return 0; return 0;
} }
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
static void freeItem(void *param) { static void freeItem(void *param) {
void *pItem = *(void **)param; void *pItem = *(void **)param;
if (pItem != NULL) { if (pItem != NULL) {
@ -548,97 +544,19 @@ static void freeItem(void *param) {
} }
} }
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
SMnode *pMnode = pMsg->info.node; pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
char *msgStr = pMsg->pCont; pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
int32_t code = -1; if(pConsumerNew->rebNewTopics == NULL || pConsumerNew->rebRemovedTopics == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup;
SMqConsumerObj *pExistedConsumer = NULL;
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
SArray *pTopicList = subscribe.topicNames;
taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
int32_t newTopicNum = taosArrayGetSize(pTopicList);
for (int i = 0; i < newTopicNum; i++) {
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
code = terrno;
goto _over;
} }
} int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pExistedConsumer == NULL) {
mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
pConsumerNew->withTbName = subscribe.withTbName;
pConsumerNew->autoCommit = subscribe.autoCommit;
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
// all subscribed topics should re-balance.
taosArrayDestroy(pConsumerNew->rebNewTopics);
pConsumerNew->rebNewTopics = pTopicList;
subscribe.topicNames = NULL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else {
int32_t status = atomic_load_32(&pExistedConsumer->status);
mInfo("receive subscribe request from existed consumer:0x%" PRIx64
" cgroup:%s, current status:%d(%s), subscribe topic num: %d",
consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
if (status != MQ_CONSUMER_STATUS_READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto _over;
}
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
if (pConsumerNew == NULL) {
goto _over;
}
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE_SUB;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
while (i < oldTopicNum || j < newTopicNum) { while (i < oldTopicNum || j < newTopicNum) {
if (i >= oldTopicNum) { if (i >= oldTopicNum) {
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j)); char *newTopicCopy = taosStrdup(taosArrayGetP(pConsumerNew->assignedTopics, j));
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy); taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
j++; j++;
continue; continue;
@ -649,7 +567,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
continue; continue;
} else { } else {
char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i); char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
char *newTopic = taosArrayGetP(pTopicList, j); char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
int comp = strcmp(oldTopic, newTopic); int comp = strcmp(oldTopic, newTopic);
if (comp == 0) { if (comp == 0) {
i++; i++;
@ -668,28 +586,114 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
} }
} }
return 0;
}
// no topics need to be rebalanced static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
int32_t newTopicNum = taosArrayGetSize(pTopicList);
for (int i = 0; i < newTopicNum; i++) {
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
return -1;
}
}
return TSDB_CODE_SUCCESS;
}
static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){
int64_t consumerId = subscribe->consumerId;
char *cgroup = subscribe->cgroup;
SMqConsumerObj *pConsumerNew = NULL;
SMqConsumerObj *pExistedConsumer = NULL;
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pExistedConsumer == NULL) {
mInfo("receive subscribe request from new consumer:0x%" PRIx64
",cgroup:%s, numOfTopics:%d", consumerId,
subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe);
if (pConsumerNew == NULL) {
goto _over;
}
} else {
int32_t status = atomic_load_32(&pExistedConsumer->status);
mInfo("receive subscribe request from existed consumer:0x%" PRIx64
",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
(int32_t)taosArrayGetSize(subscribe->topicNames));
if (status != MQ_CONSUMER_STATUS_READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto _over;
}
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe);
if (pConsumerNew == NULL) {
goto _over; goto _over;
} }
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; int32_t code = getTopicAddDelete(pExistedConsumer, pConsumerNew);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; if (code != 0){
terrno = code;
goto _over;
}
}
mndReleaseConsumer(pMnode, pExistedConsumer);
return pConsumerNew;
_over:
mndReleaseConsumer(pMnode, pExistedConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree);
return NULL;
}
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
int32_t code = 0;
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;
code = checkAndSortTopic(pMnode, subscribe.topicNames);
if(code != TSDB_CODE_SUCCESS){
goto _over;
} }
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
if (pTrans == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _over;
}
code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
pConsumerNew = buildSubConsumer(pMnode, &subscribe);
if(pConsumerNew == NULL){
code = -1;
goto _over;
}
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
if (code != 0) goto _over;
code = mndTransPrepare(pMnode, pTrans);
if (code != 0) goto _over;
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
_over: _over:
mndTransDrop(pTrans); mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew);
if (pExistedConsumer) {
/*taosRUnLockLatch(&pExistedConsumer->lock);*/
mndReleaseConsumer(pMnode, pExistedConsumer);
}
tDeleteSMqConsumerObj(pConsumerNew, true);
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code; return code;
} }
@ -796,7 +800,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
tDeleteSMqConsumerObj(pConsumer, false); tClearSMqConsumerObj(pConsumer);
return 0; return 0;
} }
@ -812,48 +816,17 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
} }
} }
// remove from new topic // remove from topic list
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); int32_t size = taosArrayGetSize(topicList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i); char *p = taosArrayGetP(topicList, i);
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i); taosArrayRemove(topicList, i);
taosMemoryFree(p); taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
break;
}
}
}
// remove from removed topic
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
break;
}
}
}
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(pTopic, topic) == 0) {
taosArrayRemove(pConsumer->currentTopics, i);
taosMemoryFree(topic);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
break; break;
} }
} }
@ -887,60 +860,38 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
// }
//
// int32_t prevStatus = pOldConsumer->status;
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ",
// reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus),
// mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime,
// (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
taosArrayPush(pOldConsumer->rebNewTopics, &topic); taosArrayPush(pOldConsumer->rebNewTopics, &topic);
} }
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " timer update, timer recover", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_ADD_REB) { } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
// check if exist in current topic
removeFromNewTopicList(pOldConsumer, pNewTopic);
// add to current topic
bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
if (existing) { if (existing) {
mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
taosMemoryFree(pNewTopic); taosMemoryFree(pNewTopic);
} else { // added into current topic list } else {
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} }
// set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); updateConsumerStatus(pOldConsumer);
// the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d", ", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
@ -948,22 +899,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) { } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
// remove from removed topic
removeFromRemoveTopicList(pOldConsumer, removedTopic);
// remove from current topic
removeFromCurrentTopicList(pOldConsumer, removedTopic);
// set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); updateConsumerStatus(pOldConsumer);
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d", ", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,

View File

@ -249,7 +249,9 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
return (void *)buf; return (void *)buf;
} }
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) { static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -264,36 +266,64 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) {
pConsumer->hbStatus = 0; pConsumer->hbStatus = 0;
taosInitRWLatch(&pConsumer->lock); taosInitRWLatch(&pConsumer->lock);
pConsumer->createTime = taosGetTimestampMs();
pConsumer->updateType = updateType;
pConsumer->currentTopics = taosArrayInit(0, sizeof(void *)); if (updateType == CONSUMER_ADD_REB){
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); if(pConsumer->rebNewTopics == NULL){
pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *)); terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
pConsumer->assignedTopics == NULL) {
taosArrayDestroy(pConsumer->currentTopics);
taosArrayDestroy(pConsumer->rebNewTopics);
taosArrayDestroy(pConsumer->rebRemovedTopics);
taosArrayDestroy(pConsumer->assignedTopics);
taosMemoryFree(pConsumer);
return NULL;
} }
pConsumer->createTime = taosGetTimestampMs(); char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumer->rebNewTopics, &topicTmp);
}else if (updateType == CONSUMER_REMOVE_REB) {
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
if(pConsumer->rebRemovedTopics == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp);
}else if (updateType == CONSUMER_INSERT_SUB){
tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
pConsumer->withTbName = subscribe->withTbName;
pConsumer->autoCommit = subscribe->autoCommit;
pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL;
}else if (updateType == CONSUMER_UPDATE_SUB){
pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL;
}
return pConsumer; return pConsumer;
END:
tDeleteSMqConsumerObj(pConsumer);
return NULL;
} }
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
if (pConsumer == NULL) return; if (pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
if (delete) { }
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
tClearSMqConsumerObj(pConsumer);
taosMemoryFree(pConsumer); taosMemoryFree(pConsumer);
}
} }
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
@ -548,6 +578,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
} }
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
if (pSub == NULL) return;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter); pIter = taosHashIterate(pSub->consumerHash, pIter);

File diff suppressed because it is too large Load Diff

View File

@ -353,6 +353,66 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
return 0; return 0;
} }
static int32_t sendCheckInfoToVnode(STrans *pTrans, SMnode *pMnode, SMqTopicObj *topicObj){
STqCheckInfo info;
memcpy(info.topic, topicObj->name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj->ntbUid;
info.colIdList = topicObj->ntbColIds;
// broadcast forbid alter info
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
int32_t code = 0;
void *buf = NULL;
while (1) {
// iterate vg
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, topicObj->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
// encoder check alter info
int32_t len;
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
code = tEncodeSTqCheckInfo(&encoder, &info);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
tEncoderClear(&encoder);
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
// add redo action
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO;
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) {
goto END;
}
sdbRelease(pSdb, pVgroup);
buf = NULL;
}
END:
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
return code;
}
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb, static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
const char *userName) { const char *userName) {
mInfo("start to create topic:%s", pCreate->name); mInfo("start to create topic:%s", pCreate->name);
@ -396,13 +456,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.withMeta = pCreate->withMeta; topicObj.withMeta = pCreate->withMeta;
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) { if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
if (pCreate->withMeta) {
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
code = terrno;
goto _OUT;
}
topicObj.ast = taosStrdup(pCreate->ast); topicObj.ast = taosStrdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
@ -474,60 +527,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) { if (topicObj.ntbUid != 0) {
STqCheckInfo info; code = sendCheckInfoToVnode(pTrans, pMnode, &topicObj);
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); if (code != 0){
info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds;
// broadcast forbid alter info
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
while (1) {
// iterate vg
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
// encoder check alter info
int32_t len;
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto _OUT; goto _OUT;
} }
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
tEncoderClear(&encoder);
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
// add redo action
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_TMQ_ADD_CHECKINFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
code = -1;
goto _OUT;
}
buf = NULL;
sdbRelease(pSdb, pVgroup);
}
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
@ -653,9 +656,104 @@ _OVER:
return code; return code;
} }
static bool checkTopic(SArray *topics, char *topicName){
int32_t sz = taosArrayGetSize(topics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(topics, i);
if (strcmp(name, topicName) == 0) {
return true;
}
}
return false;
}
static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
bool found = checkTopic(pConsumer->assignedTopics, topicName);
if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
code = mndSetConsumerDropLogs(pTrans, pConsumer);
if (code != 0) {
goto end;
}
sdbRelease(pSdb, pConsumer);
continue;
}
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
topicName, pConsumer->consumerId, pConsumer->cgroup);
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
goto end;
}
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
topicName, pConsumer->consumerId, pConsumer->cgroup);
goto end;
}
sdbRelease(pSdb, pConsumer);
}
end:
sdbRelease(pSdb, pConsumer);
sdbCancelFetch(pSdb, pIter);
return code;
}
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
// broadcast to all vnode
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *buf = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
if (buf == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) {
taosMemoryFree(buf);
goto end;
}
sdbRelease(pSdb, pVgroup);
}
end:
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
return code;
}
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0}; SMDropTopicReq dropReq = {0};
int32_t code = 0; int32_t code = 0;
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
@ -705,70 +803,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
goto end; goto end;
} }
void *pIter = NULL; code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name);
SMqConsumerObj *pConsumer; if (code != 0) {
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
bool found = false;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
found = true;
break;
}
}
if (found){
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info);
mndReleaseConsumer(pMnode, pConsumer);
continue;
}
mndReleaseConsumer(pMnode, pConsumer);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
code = -1;
goto end; goto end;
} }
sz = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
code = -1;
goto end;
}
}
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
sdbCancelFetch(pSdb, pIter);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
code = -1;
goto end;
}
}
sdbRelease(pSdb, pConsumer);
}
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
if (code < 0) { if (code < 0) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
@ -776,37 +815,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
} }
if (pTopic->ntbUid != 0) { if (pTopic->ntbUid != 0) {
// broadcast to all vnode code = mndDropCheckInfoByTopic(pMnode, pTrans, pTopic);
pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) { if (code != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter);
goto end; goto end;
} }
sdbRelease(pSdb, pVgroup);
}
} }
code = mndDropTopic(pMnode, pTrans, pReq, pTopic); code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
@ -822,7 +834,6 @@ end:
SName name = {0}; SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for topic
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);

View File

@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) {
return -1; return -1;
} }
// if (tqMetaRestoreHandle(pTq) < 0) {
// return -1;
// }
if (tqMetaRestoreCheckInfo(pTq) < 0) { if (tqMetaRestoreCheckInfo(pTq) < 0) {
return -1; return -1;
} }
@ -167,32 +163,30 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
void* pVal = NULL; void* pVal = NULL;
int vLen = 0; int vLen = 0;
SDecoder decoder; SDecoder decoder;
int32_t code = 0;
tdbTbcMoveToFirst(pCur); tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info; STqCheckInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { code = tDecodeSTqCheckInfo(&decoder, &info);
if (code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdbFree(pKey); goto END;
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
if (code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
}
END:
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return -1; return code;
}
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return 0;
} }
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {