opti:subscribe logic
This commit is contained in:
parent
26ea125f03
commit
4dfc531e6c
|
@ -3919,11 +3919,11 @@ int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
|||
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
||||
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||
int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
|
||||
void tDestroySMqHbReq(SMqHbReq* pReq);
|
||||
|
||||
int32_t tSerializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||
int32_t tDeserializeSMqHbRsp(void* buf, int32_t bufLen, SMqHbRsp* pRsp);
|
||||
int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp);
|
||||
void tDestroySMqHbRsp(SMqHbRsp* pRsp);
|
||||
|
||||
int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||
|
|
|
@ -779,7 +779,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
taosWUnLockLatch(&tmq->lock);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
tDeatroySMqHbRsp(&rsp);
|
||||
tDestroySMqHbRsp(&rsp);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
}
|
||||
|
@ -861,7 +861,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
OVER:
|
||||
tDeatroySMqHbReq(&req);
|
||||
tDestroySMqHbReq(&req);
|
||||
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
|
|
|
@ -6201,9 +6201,8 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDeatroySMqHbRsp(SMqHbRsp *pRsp) {
|
||||
void tDestroySMqHbRsp(SMqHbRsp *pRsp) {
|
||||
taosArrayDestroy(pRsp->topicPrivileges);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
||||
|
@ -6250,13 +6249,12 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
|
||||
void tDestroySMqHbReq(SMqHbReq *pReq) {
|
||||
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
|
||||
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
|
||||
if (vgs) taosArrayDestroy(vgs->offsetRows);
|
||||
}
|
||||
taosArrayDestroy(pReq->topics);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||
|
|
|
@ -24,27 +24,22 @@ extern "C" {
|
|||
|
||||
enum {
|
||||
MQ_CONSUMER_STATUS_REBALANCE = 1,
|
||||
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
|
||||
MQ_CONSUMER_STATUS_READY,
|
||||
MQ_CONSUMER_STATUS_LOST,
|
||||
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
|
||||
// MQ_CONSUMER_STATUS__LOST_REBD,
|
||||
};\
|
||||
};
|
||||
|
||||
int32_t mndInitConsumer(SMnode *pMnode);
|
||||
void mndCleanupConsumer(SMnode *pMnode);
|
||||
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info);
|
||||
void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo* info);
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
|
||||
const char *mndConsumerStatusName(int status);
|
||||
|
||||
|
|
|
@ -149,6 +149,7 @@ typedef enum {
|
|||
CONSUMER_REMOVE_REB, // remove after rebalance
|
||||
CONSUMER_UPDATE_REC, // update after recover
|
||||
CONSUMER_UPDATE_SUB, // update after subscribe req
|
||||
CONSUMER_INSERT_SUB,
|
||||
} ECsmUpdateType;
|
||||
|
||||
typedef struct {
|
||||
|
@ -556,8 +557,9 @@ typedef struct {
|
|||
int32_t resetOffsetCfg;
|
||||
} SMqConsumerObj;
|
||||
|
||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool isDeleted);
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe);
|
||||
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
|
||||
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
|
||||
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const c
|
|||
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
void mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||
|
|
|
@ -68,25 +68,27 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||
|
||||
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo *info) {
|
||||
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
|
||||
if (pClearMsg == NULL) {
|
||||
void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SRpcHandleInfo *info) {
|
||||
void *msg = rpcMallocCont(sizeof(int64_t));
|
||||
if (msg == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " failed to clear consumer due to out of memory. alloc size:%d", consumerId,
|
||||
(int32_t)sizeof(SMqConsumerClearMsg));
|
||||
(int32_t)sizeof(int64_t));
|
||||
return;
|
||||
}
|
||||
|
||||
pClearMsg->consumerId = consumerId;
|
||||
*(int64_t*)msg = consumerId;
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR,
|
||||
.pCont = pClearMsg,
|
||||
.contLen = sizeof(SMqConsumerClearMsg),
|
||||
.msgType = msgType,
|
||||
.pCont = msg,
|
||||
.contLen = sizeof(int64_t),
|
||||
.info = *info,
|
||||
};
|
||||
|
||||
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
return;
|
||||
mInfo("mndSendConsumerMsg type:%d consumer:0x%" PRIx64, msgType, consumerId);
|
||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (code != 0){
|
||||
mError("consumer:%"PRId64" send consumer msg:%d error:%d", consumerId, msgType, code);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
|
||||
|
@ -148,54 +150,62 @@ FAILED:
|
|||
}
|
||||
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId,
|
||||
pConsumer->status, mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
return -1;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||
pConsumerNew->updateType = CONSUMER_UPDATE_REC;
|
||||
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL);
|
||||
if (pConsumerNew == NULL){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
|
||||
if (pTrans == NULL) {
|
||||
goto FAIL;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false) != 0) {
|
||||
goto FAIL;
|
||||
code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
END:
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
FAIL:
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo check the clear process
|
||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
|
@ -206,33 +216,37 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
|||
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
|
||||
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
|
||||
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL);
|
||||
if (pConsumerNew == NULL){
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||
if (pTrans == NULL) {
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
|
||||
if (pTrans == NULL) goto FAIL;
|
||||
// this is the drop action, not the update action
|
||||
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||
code = mndSetConsumerDropLogs(pTrans, pConsumerNew);
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
|
||||
END:
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
|
||||
rsp->topicPrivileges = taosArrayInit(taosArrayGetSize(pConsumer->currentTopics), sizeof(STopicPrivilege));
|
||||
if (rsp->topicPrivileges == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->currentTopics); i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
|
@ -253,6 +267,47 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRs
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pConsumer){
|
||||
for (int i = 0; i < taosArrayGetSize(req->topics); i++) {
|
||||
TopicOffsetRows *data = taosArrayGet(req->topics, i);
|
||||
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
taosWLockLatch(&pSub->lock);
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
if (pConsumerEp) {
|
||||
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||
pConsumerEp->offsetRows = data->offsetRows;
|
||||
data->offsetRows = NULL;
|
||||
}
|
||||
taosWUnLockLatch(&pSub->lock);
|
||||
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
|
||||
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
|
||||
if (tlen <= 0){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){
|
||||
rpcFreeCont(buf);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
@ -261,7 +316,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
@ -270,7 +324,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " not exist", consumerId);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto end;
|
||||
}
|
||||
|
@ -284,88 +337,152 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mInfo("try to recover consumer:0x%" PRIx64 "", consumerId);
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
SRpcMsg pRpcMsg = {
|
||||
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
||||
.pCont = pRecoverMsg,
|
||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||
.info = pMsg->info,
|
||||
};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
|
||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(req.topics); i++) {
|
||||
TopicOffsetRows *data = taosArrayGet(req.topics, i);
|
||||
mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
|
||||
if (pSub == NULL) {
|
||||
#ifdef TMQ_DEBUG
|
||||
ASSERT(0);
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
taosWLockLatch(&pSub->lock);
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
if (pConsumerEp) {
|
||||
taosArrayDestroy(pConsumerEp->offsetRows);
|
||||
pConsumerEp->offsetRows = data->offsetRows;
|
||||
data->offsetRows = NULL;
|
||||
}
|
||||
taosWUnLockLatch(&pSub->lock);
|
||||
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
// encode rsp
|
||||
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, &rsp);
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tSerializeSMqHbRsp(buf, tlen, &rsp);
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
storeOffsetRows(pMnode, &req, pConsumer);
|
||||
code = buildMqHbRsp(pMsg, &rsp);
|
||||
|
||||
end:
|
||||
tDeatroySMqHbRsp(&rsp);
|
||||
tDestroySMqHbRsp(&rsp);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
tDeatroySMqHbReq(&req);
|
||||
tDestroySMqHbReq(&req);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
|
||||
|
||||
rsp->topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
|
||||
if (rsp->topics == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// handle all topics subscribed by this consumer
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
// txn guarantees pSub is created
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
SMqSubTopicEp topicEp = {0};
|
||||
strcpy(topicEp.topic, topic);
|
||||
|
||||
// 2.1 fetch topic schema
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
if (topicEp.schema.nCols) {
|
||||
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
|
||||
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
|
||||
}
|
||||
taosRUnLockLatch(&pTopic->lock);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
// this customer assigned vgroups
|
||||
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
||||
if (topicEp.vgs == NULL) {
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
// char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
// mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
|
||||
|
||||
if (epoch == -1) {
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (pVgroup) {
|
||||
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
}
|
||||
// 2.2.1 build vg ep
|
||||
SMqSubVgEp vgEp = {
|
||||
.epSet = pVgEp->epSet,
|
||||
.vgId = pVgEp->vgId,
|
||||
.offset = -1,
|
||||
};
|
||||
|
||||
taosArrayPush(topicEp.vgs, &vgEp);
|
||||
}
|
||||
taosArrayPush(rsp->topics, &topicEp);
|
||||
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
|
||||
// encode rsp
|
||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SMqRspHead *pHead = buf;
|
||||
|
||||
pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
|
||||
pHead->epoch = serverEpoch;
|
||||
pHead->consumerId = consumerId;
|
||||
pHead->walsver = 0;
|
||||
pHead->walever = 0;
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqAskEpRsp(&abuf, rsp);
|
||||
|
||||
// send rsp
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqAskEpReq req = {0};
|
||||
SMqAskEpRsp rsp = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t epoch = req.epoch;
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
mError("consumer:0x%" PRIx64 " group:%s not exists in sdb", consumerId, req.cgroup);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
return -1;
|
||||
return TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
}
|
||||
|
||||
int32_t ret = strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup));
|
||||
if (ret != 0) {
|
||||
if (strncmp(req.cgroup, pConsumer->cgroup, tListLen(pConsumer->cgroup)) != 0) {
|
||||
mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup,
|
||||
pConsumer->cgroup);
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto FAIL;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto END;
|
||||
}
|
||||
|
||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||
|
@ -374,156 +491,37 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
|||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
|
||||
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
|
||||
|
||||
pRecoverMsg->consumerId = consumerId;
|
||||
SRpcMsg pRpcMsg = {
|
||||
.msgType = TDMT_MND_TMQ_CONSUMER_RECOVER,
|
||||
.pCont = pRecoverMsg,
|
||||
.contLen = sizeof(SMqConsumerRecoverMsg),
|
||||
.info = pMsg->info,
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
|
||||
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_CONSUMER_RECOVER, &pMsg->info);
|
||||
}
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS_READY) {
|
||||
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto FAIL;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
int32_t epoch = req.epoch;
|
||||
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
|
||||
// 2. check epoch, only send ep info when epochs do not match
|
||||
if (epoch != serverEpoch) {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d", consumerId, epoch,
|
||||
serverEpoch);
|
||||
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
|
||||
|
||||
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
|
||||
if (rsp.topics == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
goto FAIL;
|
||||
mInfo("process ask ep, consumer:0x%" PRIx64 "(epoch %d) update with server epoch %d",
|
||||
consumerId, epoch, serverEpoch);
|
||||
code = addEpSetInfo(pMnode, pConsumer, epoch, &rsp);
|
||||
if(code != 0){
|
||||
goto END;
|
||||
}
|
||||
|
||||
// handle all topics subscribed by this consumer
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
// txn guarantees pSub is created
|
||||
if (pSub == NULL) {
|
||||
#ifdef TMQ_DEBUG
|
||||
ASSERT(0);
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
SMqSubTopicEp topicEp = {0};
|
||||
strcpy(topicEp.topic, topic);
|
||||
|
||||
// 2.1 fetch topic schema
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic == NULL) {
|
||||
#ifdef TMQ_DEBUG
|
||||
ASSERT(0);
|
||||
#endif
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pTopic->lock);
|
||||
tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN);
|
||||
topicEp.schema.nCols = pTopic->schema.nCols;
|
||||
if (topicEp.schema.nCols) {
|
||||
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
|
||||
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
|
||||
}
|
||||
taosRUnLockLatch(&pTopic->lock);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
// this customer assigned vgroups
|
||||
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
||||
if (topicEp.vgs == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
// char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
// mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
|
||||
|
||||
if (epoch == -1) {
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (pVgroup) {
|
||||
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
}
|
||||
// 2.2.1 build vg ep
|
||||
SMqSubVgEp vgEp = {
|
||||
.epSet = pVgEp->epSet,
|
||||
.vgId = pVgEp->vgId,
|
||||
.offset = -1,
|
||||
};
|
||||
|
||||
taosArrayPush(topicEp.vgs, &vgEp);
|
||||
}
|
||||
taosArrayPush(rsp.topics, &topicEp);
|
||||
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
}
|
||||
|
||||
// encode rsp
|
||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, &rsp);
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
}
|
||||
code = buildAskEpRsp(pMsg, &rsp, serverEpoch, consumerId);
|
||||
|
||||
SMqRspHead *pHead = buf;
|
||||
|
||||
pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
|
||||
pHead->epoch = serverEpoch;
|
||||
pHead->consumerId = pConsumer->consumerId;
|
||||
pHead->walsver = 0;
|
||||
pHead->walever = 0;
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
tEncodeSMqAskEpRsp(&abuf, &rsp);
|
||||
|
||||
// release consumer and free memory
|
||||
END:
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
// send rsp
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
|
@ -531,7 +529,7 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
|
||||
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
|
@ -539,8 +537,6 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
||||
|
||||
static void freeItem(void *param) {
|
||||
void *pItem = *(void **)param;
|
||||
if (pItem != NULL) {
|
||||
|
@ -548,21 +544,52 @@ static void freeItem(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
int32_t code = -1;
|
||||
static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerObj *pConsumerNew){
|
||||
pConsumerNew->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
pConsumerNew->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumerNew->rebNewTopics == NULL || pConsumerNew->rebRemovedTopics == NULL){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumerNew->assignedTopics);
|
||||
int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < oldTopicNum || j < newTopicNum) {
|
||||
if (i >= oldTopicNum) {
|
||||
char *newTopicCopy = taosStrdup(taosArrayGetP(pConsumerNew->assignedTopics, j));
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
j++;
|
||||
continue;
|
||||
} else if (j >= newTopicNum) {
|
||||
char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||
char *newTopic = taosArrayGetP(pConsumerNew->assignedTopics, j);
|
||||
int comp = strcmp(oldTopic, newTopic);
|
||||
if (comp == 0) {
|
||||
i++;
|
||||
j++;
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
char *oldTopicCopy = taosStrdup(oldTopic);
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *newTopicCopy = taosStrdup(newTopic);
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
j++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t consumerId = subscribe.consumerId;
|
||||
char *cgroup = subscribe.cgroup;
|
||||
SMqConsumerObj *pExistedConsumer = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
SArray *pTopicList = subscribe.topicNames;
|
||||
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
|
||||
taosArraySort(pTopicList, taosArrayCompareString);
|
||||
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
||||
|
||||
|
@ -571,125 +598,107 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
|
||||
if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
|
||||
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
|
||||
code = terrno;
|
||||
goto _over;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// check topic existence
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
|
||||
if (pTrans == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){
|
||||
int64_t consumerId = subscribe->consumerId;
|
||||
char *cgroup = subscribe->cgroup;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
SMqConsumerObj *pExistedConsumer = NULL;
|
||||
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||
if (pExistedConsumer == NULL) {
|
||||
mInfo("receive subscribe request from new consumer:0x%" PRIx64 " cgroup:%s, numOfTopics:%d", consumerId,
|
||||
subscribe.cgroup, (int32_t)taosArrayGetSize(pTopicList));
|
||||
mInfo("receive subscribe request from new consumer:0x%" PRIx64
|
||||
",cgroup:%s, numOfTopics:%d", consumerId,
|
||||
subscribe->cgroup, (int32_t)taosArrayGetSize(subscribe->topicNames));
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
|
||||
|
||||
pConsumerNew->withTbName = subscribe.withTbName;
|
||||
pConsumerNew->autoCommit = subscribe.autoCommit;
|
||||
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
|
||||
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
|
||||
|
||||
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
|
||||
taosArrayDestroy(pConsumerNew->assignedTopics);
|
||||
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
|
||||
|
||||
// all subscribed topics should re-balance.
|
||||
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
||||
pConsumerNew->rebNewTopics = pTopicList;
|
||||
subscribe.topicNames = NULL;
|
||||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_INSERT_SUB, NULL, subscribe);
|
||||
if (pConsumerNew == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
} else {
|
||||
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
||||
|
||||
mInfo("receive subscribe request from existed consumer:0x%" PRIx64
|
||||
" cgroup:%s, current status:%d(%s), subscribe topic num: %d",
|
||||
consumerId, subscribe.cgroup, status, mndConsumerStatusName(status), newTopicNum);
|
||||
",cgroup:%s, current status:%d(%s), subscribe topic num: %d",
|
||||
consumerId, subscribe->cgroup, status, mndConsumerStatusName(status),
|
||||
(int32_t)taosArrayGetSize(subscribe->topicNames));
|
||||
|
||||
if (status != MQ_CONSUMER_STATUS_READY) {
|
||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup, CONSUMER_UPDATE_SUB, NULL, subscribe);
|
||||
if (pConsumerNew == NULL) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
// set the update type
|
||||
pConsumerNew->updateType = CONSUMER_UPDATE_SUB;
|
||||
taosArrayDestroy(pConsumerNew->assignedTopics);
|
||||
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
|
||||
|
||||
int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < oldTopicNum || j < newTopicNum) {
|
||||
if (i >= oldTopicNum) {
|
||||
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
j++;
|
||||
continue;
|
||||
} else if (j >= newTopicNum) {
|
||||
char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||
char *newTopic = taosArrayGetP(pTopicList, j);
|
||||
int comp = strcmp(oldTopic, newTopic);
|
||||
if (comp == 0) {
|
||||
i++;
|
||||
j++;
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
char *oldTopicCopy = taosStrdup(oldTopic);
|
||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||
i++;
|
||||
continue;
|
||||
} else {
|
||||
char *newTopicCopy = taosStrdup(newTopic);
|
||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||
j++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
int32_t code = getTopicAddDelete(pExistedConsumer, pConsumerNew);
|
||||
if (code != 0){
|
||||
terrno = code;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
// no topics need to be rebalanced
|
||||
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
|
||||
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||
}
|
||||
return pConsumerNew;
|
||||
|
||||
_over:
|
||||
mndReleaseConsumer(pMnode, pExistedConsumer);
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
char *msgStr = pMsg->pCont;
|
||||
int32_t code = 0;
|
||||
|
||||
SCMSubscribeReq subscribe = {0};
|
||||
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
code = checkAndSortTopic(pMnode, subscribe.topicNames);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _over;
|
||||
}
|
||||
|
||||
code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
||||
pConsumerNew = buildSubConsumer(pMnode, &subscribe);
|
||||
if(pConsumerNew == NULL){
|
||||
code = -1;
|
||||
goto _over;
|
||||
}
|
||||
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
|
||||
if (code != 0) goto _over;
|
||||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != 0) goto _over;
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
_over:
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (pExistedConsumer) {
|
||||
/*taosRUnLockLatch(&pExistedConsumer->lock);*/
|
||||
mndReleaseConsumer(pMnode, pExistedConsumer);
|
||||
}
|
||||
|
||||
tDeleteSMqConsumerObj(pConsumerNew, true);
|
||||
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||
return code;
|
||||
}
|
||||
|
@ -796,7 +805,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
|||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||
mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status,
|
||||
mndConsumerStatusName(pConsumer->status));
|
||||
tDeleteSMqConsumerObj(pConsumer, false);
|
||||
tClearSMqConsumerObj(pConsumer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -888,20 +897,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
pOldConsumer->subscribeTime = taosGetTimestampMs();
|
||||
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
|
||||
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId);
|
||||
// } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) {
|
||||
// int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||
// for (int32_t i = 0; i < sz; i++) {
|
||||
// char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i));
|
||||
// taosArrayPush(pOldConsumer->rebRemovedTopics, &topic);
|
||||
// }
|
||||
//
|
||||
// int32_t prevStatus = pOldConsumer->status;
|
||||
// pOldConsumer->status = MQ_CONSUMER_STATUS_LOST;
|
||||
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ",
|
||||
// reb-removed-topics:%d",
|
||||
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus),
|
||||
// mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime,
|
||||
// (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
|
||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
|
||||
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
|
|
@ -249,7 +249,9 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) {
|
||||
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe) {
|
||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -264,36 +266,64 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) {
|
|||
pConsumer->hbStatus = 0;
|
||||
|
||||
taosInitRWLatch(&pConsumer->lock);
|
||||
pConsumer->createTime = taosGetTimestampMs();
|
||||
pConsumer->updateType = updateType;
|
||||
|
||||
pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
|
||||
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
|
||||
pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
|
||||
if (updateType == CONSUMER_ADD_REB){
|
||||
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumer->rebNewTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
|
||||
pConsumer->assignedTopics == NULL) {
|
||||
taosArrayDestroy(pConsumer->currentTopics);
|
||||
taosArrayDestroy(pConsumer->rebNewTopics);
|
||||
taosArrayDestroy(pConsumer->rebRemovedTopics);
|
||||
taosArrayDestroy(pConsumer->assignedTopics);
|
||||
taosMemoryFree(pConsumer);
|
||||
return NULL;
|
||||
char* topicTmp = taosStrdup(topic);
|
||||
taosArrayPush(pConsumer->rebNewTopics, &topicTmp);
|
||||
}else if (updateType == CONSUMER_REMOVE_REB) {
|
||||
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
|
||||
if(pConsumer->rebRemovedTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
char* topicTmp = taosStrdup(topic);
|
||||
taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp);
|
||||
}else if (updateType == CONSUMER_INSERT_SUB){
|
||||
tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
|
||||
pConsumer->withTbName = subscribe->withTbName;
|
||||
pConsumer->autoCommit = subscribe->autoCommit;
|
||||
pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
|
||||
pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
|
||||
|
||||
|
||||
pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
|
||||
if (pConsumer->rebNewTopics == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
pConsumer->assignedTopics = subscribe->topicNames;
|
||||
subscribe->topicNames = NULL;
|
||||
}else if (updateType == CONSUMER_UPDATE_SUB){
|
||||
pConsumer->assignedTopics = subscribe->topicNames;;
|
||||
subscribe->topicNames = NULL;
|
||||
}
|
||||
|
||||
pConsumer->createTime = taosGetTimestampMs();
|
||||
|
||||
return pConsumer;
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) {
|
||||
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
|
||||
if (pConsumer == NULL) return;
|
||||
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
|
||||
if (delete) {
|
||||
taosMemoryFree(pConsumer);
|
||||
}
|
||||
}
|
||||
|
||||
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
|
||||
tClearSMqConsumerObj(pConsumer);
|
||||
taosMemoryFree(pConsumer);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||
|
@ -548,6 +578,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
}
|
||||
|
||||
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||
if (pSub == NULL) return;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -653,9 +653,103 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool checkTopic(SArray *topics, char *topicName){
|
||||
int32_t sz = taosArrayGetSize(topics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(topics, i);
|
||||
if (strcmp(name, topicName) == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
while (1) {
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||
code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
goto end;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
end:
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
|
||||
// broadcast to all vnode
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *buf = NULL;
|
||||
while (1) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
|
||||
if (buf == NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
|
||||
code = mndTransAppendRedoAction(pTrans, &action);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
end:
|
||||
taosMemoryFree(buf);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMDropTopicReq dropReq = {0};
|
||||
int32_t code = 0;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
|
@ -705,68 +799,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found){
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info);
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
continue;
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
code = mndDropConsumerByTopic(pMnode, pTrans, dropReq.name);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = mndDropSubByTopic(pMnode, pTrans, dropReq.name);
|
||||
|
@ -776,36 +811,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (pTopic->ntbUid != 0) {
|
||||
// broadcast to all vnode
|
||||
pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
|
||||
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
|
||||
code = mndTransAppendRedoAction(pTrans, &action);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
goto end;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
code = mndDropCheckInfoByTopic(pMnode, pTrans, pTopic);
|
||||
if (code != 0) {
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -822,7 +830,6 @@ end:
|
|||
|
||||
SName name = {0};
|
||||
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
//reuse this function for topic
|
||||
|
||||
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
|
||||
|
||||
|
|
Loading…
Reference in New Issue