1133 lines
38 KiB
C
1133 lines
38 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#define _DEFAULT_SOURCE
|
|
#include "mndSubscribe.h"
|
|
#include "mndConsumer.h"
|
|
#include "mndScheduler.h"
|
|
#include "mndShow.h"
|
|
#include "mndTopic.h"
|
|
#include "mndTrans.h"
|
|
#include "mndVgroup.h"
|
|
#include "tcompare.h"
|
|
#include "tname.h"
|
|
|
|
#define MND_SUBSCRIBE_VER_NUMBER 1
|
|
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
|
|
|
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
|
|
|
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
|
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
|
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
|
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
|
|
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
|
|
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
|
|
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
|
|
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
|
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
|
|
|
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
|
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
|
if (pRedoRaw == NULL) return -1;
|
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
|
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
|
|
if (pCommitRaw == NULL) return -1;
|
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
|
return 0;
|
|
}
|
|
|
|
int32_t mndInitSubscribe(SMnode *pMnode) {
|
|
SSdbTable table = {
|
|
.sdbType = SDB_SUBSCRIBE,
|
|
.keyType = SDB_KEY_BINARY,
|
|
.encodeFp = (SdbEncodeFp)mndSubActionEncode,
|
|
.decodeFp = (SdbDecodeFp)mndSubActionDecode,
|
|
.insertFp = (SdbInsertFp)mndSubActionInsert,
|
|
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
|
|
.deleteFp = (SdbDeleteFp)mndSubActionDelete,
|
|
};
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
|
|
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
|
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
|
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
|
|
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
|
|
|
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
|
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
|
|
|
|
return sdbSetTable(pMnode->pSdb, table);
|
|
}
|
|
|
|
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
|
|
SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
|
|
if (pSub == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return NULL;
|
|
}
|
|
|
|
pSub->dbUid = pTopic->dbUid;
|
|
pSub->stbUid = pTopic->stbUid;
|
|
pSub->subType = pTopic->subType;
|
|
pSub->withMeta = pTopic->withMeta;
|
|
|
|
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
|
tDeleteSubscribeObj(pSub);
|
|
taosMemoryFree(pSub);
|
|
return NULL;
|
|
}
|
|
|
|
return pSub;
|
|
}
|
|
|
|
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub,
|
|
const SMqRebOutputVg *pRebVg) {
|
|
SMqRebVgReq req = {0};
|
|
req.oldConsumerId = pRebVg->oldConsumerId;
|
|
req.newConsumerId = pRebVg->newConsumerId;
|
|
req.vgId = pRebVg->pVgEp->vgId;
|
|
req.qmsg = pRebVg->pVgEp->qmsg;
|
|
req.subType = pSub->subType;
|
|
req.withMeta = pSub->withMeta;
|
|
req.suid = pSub->stbUid;
|
|
tstrncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
|
|
|
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
|
|
void *buf = taosMemoryMalloc(tlen);
|
|
if (buf == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return -1;
|
|
}
|
|
|
|
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
|
|
|
pMsgHead->contLen = htonl(tlen);
|
|
pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
|
|
|
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
tEncodeSMqRebVgReq(&abuf, &req);
|
|
*pBuf = buf;
|
|
*pLen = tlen;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
|
|
const SMqRebOutputVg *pRebVg) {
|
|
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
|
return -1;
|
|
}
|
|
|
|
void *buf;
|
|
int32_t tlen;
|
|
if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) {
|
|
return -1;
|
|
}
|
|
|
|
int32_t vgId = pRebVg->pVgEp->vgId;
|
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
|
if (pVgObj == NULL) {
|
|
taosMemoryFree(buf);
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return -1;
|
|
}
|
|
|
|
STransAction action = {0};
|
|
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
|
action.pCont = buf;
|
|
action.contLen = tlen;
|
|
action.msgType = TDMT_VND_TMQ_SUBSCRIBE;
|
|
|
|
mndReleaseVgroup(pMnode, pVgObj);
|
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
taosMemoryFree(buf);
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) {
|
|
int32_t i = 0;
|
|
while (key[i] != TMQ_SEPARATOR) {
|
|
i++;
|
|
}
|
|
memcpy(cgroup, key, i);
|
|
cgroup[i] = 0;
|
|
if (fullName) {
|
|
strcpy(topic, &key[i + 1]);
|
|
} else {
|
|
while (key[i] != '.') {
|
|
i++;
|
|
}
|
|
strcpy(topic, &key[i + 1]);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
|
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
|
if (pRebSub == NULL) {
|
|
pRebSub = tNewSMqRebSubscribe(key);
|
|
if (pRebSub == NULL) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
return NULL;
|
|
}
|
|
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
|
|
}
|
|
return pRebSub;
|
|
}
|
|
|
|
static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
|
|
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
|
const char *pSubKey = pOutput->pSub->key;
|
|
|
|
int32_t actualRemoved = 0;
|
|
for (int32_t i = 0; i < numOfRemoved; i++) {
|
|
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
|
|
|
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
|
|
|
// consumer exists till now
|
|
if (pConsumerEp) {
|
|
actualRemoved++;
|
|
|
|
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
|
for (int32_t j = 0; j < consumerVgNum; j++) {
|
|
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
|
SMqRebOutputVg outputVg = {
|
|
.oldConsumerId = consumerId,
|
|
.newConsumerId = -1,
|
|
.pVgEp = pVgEp,
|
|
};
|
|
|
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
|
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
|
|
}
|
|
|
|
taosArrayDestroy(pConsumerEp->vgs);
|
|
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
|
|
|
// put into removed
|
|
taosArrayPush(pOutput->removedConsumers, &consumerId);
|
|
}
|
|
}
|
|
|
|
if (numOfRemoved != actualRemoved) {
|
|
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
|
|
} else {
|
|
mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
|
|
}
|
|
}
|
|
|
|
static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
|
|
int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
|
const char *pSubKey = pOutput->pSub->key;
|
|
|
|
for (int32_t i = 0; i < numOfNewConsumers; i++) {
|
|
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
|
|
|
SMqConsumerEp newConsumerEp;
|
|
newConsumerEp.consumerId = consumerId;
|
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
|
|
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
|
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
|
|
}
|
|
}
|
|
|
|
static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
|
|
const char *pSubKey = pOutput->pSub->key;
|
|
int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
|
|
|
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
|
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
|
|
SMqRebOutputVg rebOutput = {
|
|
.oldConsumerId = -1,
|
|
.newConsumerId = -1,
|
|
.pVgEp = pVgEp,
|
|
};
|
|
|
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
|
mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
|
}
|
|
}
|
|
|
|
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
|
|
int32_t imbConsumerNum) {
|
|
const char *pSubKey = pOutput->pSub->key;
|
|
|
|
int32_t imbCnt = 0;
|
|
void *pIter = NULL;
|
|
|
|
while (1) {
|
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
|
if (pIter == NULL) {
|
|
break;
|
|
}
|
|
|
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
|
|
|
// all old consumers still existing need to be modified
|
|
// TODO optimize: modify only consumer whose vgs changed
|
|
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
|
|
if (consumerVgNum > minVgCnt) {
|
|
if (imbCnt < imbConsumerNum) {
|
|
if (consumerVgNum == minVgCnt + 1) {
|
|
imbCnt++;
|
|
continue;
|
|
} else {
|
|
// pop until equal minVg + 1
|
|
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
|
|
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
|
SMqRebOutputVg outputVg = {
|
|
.oldConsumerId = pConsumerEp->consumerId,
|
|
.newConsumerId = -1,
|
|
.pVgEp = pVgEp,
|
|
};
|
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
|
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
|
|
pConsumerEp->consumerId);
|
|
}
|
|
imbCnt++;
|
|
}
|
|
} else {
|
|
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
|
|
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
|
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
|
SMqRebOutputVg outputVg = {
|
|
.oldConsumerId = pConsumerEp->consumerId,
|
|
.newConsumerId = -1,
|
|
.pVgEp = pVgEp,
|
|
};
|
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
|
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
|
|
pConsumerEp->consumerId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
|
|
int32_t totalVgNum = pOutput->pSub->vgNum;
|
|
const char *pSubKey = pOutput->pSub->key;
|
|
|
|
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
|
int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
|
|
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
|
|
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
|
|
|
|
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
|
|
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
|
|
|
// 2. check and get actual removed consumers, put their vg into pHash
|
|
doRemoveExistedConsumers(pOutput, pHash, pInput);
|
|
|
|
// 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
|
|
addUnassignedVgroups(pOutput, pHash);
|
|
|
|
// 4. calc vg number of each consumer
|
|
int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
|
|
|
|
int32_t minVgCnt = 0;
|
|
int32_t imbConsumerNum = 0;
|
|
|
|
// calc num
|
|
if (numOfFinal) {
|
|
minVgCnt = totalVgNum / numOfFinal;
|
|
imbConsumerNum = totalVgNum % numOfFinal;
|
|
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
|
|
pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
|
|
} else {
|
|
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
|
|
}
|
|
|
|
// 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
|
|
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
|
|
|
|
// 6. add new consumer into sub
|
|
doAddNewConsumers(pOutput, pInput);
|
|
|
|
SMqRebOutputVg *pRebVg = NULL;
|
|
void *pRemovedIter = NULL;
|
|
void *pIter = NULL;
|
|
|
|
// 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
|
|
while (1) {
|
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
|
if (pIter == NULL) {
|
|
break;
|
|
}
|
|
|
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
|
|
// push until equal minVg
|
|
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
|
|
// iter hash and find one vg
|
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
|
if (pRemovedIter == NULL) {
|
|
mError("sub:%s removed iter is null, never can reach hear", pSubKey);
|
|
break;
|
|
}
|
|
|
|
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
|
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
|
}
|
|
}
|
|
|
|
while (1) {
|
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
|
if (pIter == NULL) {
|
|
break;
|
|
}
|
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
|
|
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
|
if (pRemovedIter == NULL) {
|
|
mInfo("sub:%s removed iter is null", pSubKey);
|
|
break;
|
|
}
|
|
|
|
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
|
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
|
}
|
|
}
|
|
|
|
// All assigned vg should be put into pOutput->rebVgs
|
|
if(pRemovedIter != NULL){
|
|
mError("sub:%s error pRemovedIter should be NULL", pSubKey);
|
|
}
|
|
while (1) {
|
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
|
if (pRemovedIter == NULL) {
|
|
break;
|
|
}
|
|
|
|
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
|
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
|
|
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
|
}
|
|
}
|
|
|
|
// 8. generate logs
|
|
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
|
|
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
|
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
|
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
|
|
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
|
}
|
|
|
|
pIter = NULL;
|
|
while (1) {
|
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
|
if (pIter == NULL) break;
|
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
|
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
|
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
|
pConsumerEp->consumerId);
|
|
}
|
|
}
|
|
|
|
// 9. clear
|
|
taosHashCleanup(pHash);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
|
if (pTrans == NULL) {
|
|
return -1;
|
|
}
|
|
|
|
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
|
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
|
mndTransDrop(pTrans);
|
|
return -1;
|
|
}
|
|
|
|
// make txn:
|
|
// 1. redo action: action to all vg
|
|
const SArray *rebVgs = pOutput->rebVgs;
|
|
int32_t vgNum = taosArrayGetSize(rebVgs);
|
|
for (int32_t i = 0; i < vgNum; i++) {
|
|
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
|
|
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
|
|
goto REB_FAIL;
|
|
}
|
|
}
|
|
|
|
// 2. redo log: subscribe and vg assignment
|
|
// subscribe
|
|
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
|
|
goto REB_FAIL;
|
|
}
|
|
|
|
// 3. commit log: consumer to update status and epoch
|
|
// 3.1 set touched consumer
|
|
int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers);
|
|
for (int32_t i = 0; i < consumerNum; i++) {
|
|
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
|
|
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
|
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
|
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
goto REB_FAIL;
|
|
}
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
}
|
|
// 3.2 set new consumer
|
|
consumerNum = taosArrayGetSize(pOutput->newConsumers);
|
|
for (int32_t i = 0; i < consumerNum; i++) {
|
|
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
|
|
|
|
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
|
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
|
|
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
|
char cgroup[TSDB_CGROUP_LEN];
|
|
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
|
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
|
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
goto REB_FAIL;
|
|
}
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
}
|
|
|
|
// 3.3 set removed consumer
|
|
consumerNum = taosArrayGetSize(pOutput->removedConsumers);
|
|
for (int32_t i = 0; i < consumerNum; i++) {
|
|
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
|
|
|
|
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
|
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
|
|
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
|
|
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
|
char cgroup[TSDB_CGROUP_LEN];
|
|
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
|
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
|
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
goto REB_FAIL;
|
|
}
|
|
tDeleteSMqConsumerObj(pConsumerNew);
|
|
taosMemoryFree(pConsumerNew);
|
|
}
|
|
|
|
// 4. TODO commit log: modification log
|
|
|
|
// 5. set cb
|
|
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
|
|
|
|
// 6. execution
|
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
mError("failed to prepare trans rebalance since %s", terrstr());
|
|
goto REB_FAIL;
|
|
}
|
|
|
|
mndTransDrop(pTrans);
|
|
return 0;
|
|
|
|
REB_FAIL:
|
|
mndTransDrop(pTrans);
|
|
return -1;
|
|
}
|
|
|
|
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|
SMnode *pMnode = pMsg->info.node;
|
|
SMqDoRebalanceMsg *pReq = pMsg->pCont;
|
|
void *pIter = NULL;
|
|
// bool rebalanceOnce = false; // to ensure only once.
|
|
|
|
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
|
|
|
|
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
|
|
while (1) {
|
|
// if (rebalanceOnce) {
|
|
// break;
|
|
// }
|
|
|
|
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
|
if (pIter == NULL) {
|
|
break;
|
|
}
|
|
|
|
// todo handle the malloc failure
|
|
SMqRebInputObj rebInput = {0};
|
|
SMqRebOutputObj rebOutput = {0};
|
|
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
|
|
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
|
rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
|
|
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
|
|
|
|
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
|
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
|
|
|
|
rebInput.pRebInfo = pRebInfo;
|
|
|
|
if (pSub == NULL) {
|
|
// split sub key and extract topic
|
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
|
char cgroup[TSDB_CGROUP_LEN];
|
|
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
|
|
|
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
|
if (pTopic == NULL) {
|
|
mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
|
|
continue;
|
|
}
|
|
|
|
taosRLockLatch(&pTopic->lock);
|
|
|
|
rebOutput.pSub = mndCreateSubscription(pMnode, pTopic, pRebInfo->key);
|
|
|
|
if (rebOutput.pSub == NULL) {
|
|
mError("mq rebalance %s failed create sub since %s, ignore", pRebInfo->key, terrstr());
|
|
taosRUnLockLatch(&pTopic->lock);
|
|
mndReleaseTopic(pMnode, pTopic);
|
|
continue;
|
|
}
|
|
|
|
memcpy(rebOutput.pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN);
|
|
taosRUnLockLatch(&pTopic->lock);
|
|
mndReleaseTopic(pMnode, pTopic);
|
|
|
|
rebInput.oldConsumerNum = 0;
|
|
mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
|
|
} else {
|
|
taosRLockLatch(&pSub->lock);
|
|
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
|
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
|
taosRUnLockLatch(&pSub->lock);
|
|
mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
}
|
|
|
|
if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
|
|
mError("mq re-balance internal error");
|
|
}
|
|
|
|
// if add more consumer to balanced subscribe,
|
|
// possibly no vg is changed
|
|
// when each topic is re-balanced, issue an trans to save the results in sdb.
|
|
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
|
mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr());
|
|
}
|
|
|
|
taosArrayDestroy(rebOutput.newConsumers);
|
|
taosArrayDestroy(rebOutput.modifyConsumers);
|
|
taosArrayDestroy(rebOutput.removedConsumers);
|
|
taosArrayDestroy(rebOutput.rebVgs);
|
|
tDeleteSubscribeObj(rebOutput.pSub);
|
|
taosMemoryFree(rebOutput.pSub);
|
|
|
|
// taosSsleep(100);
|
|
// rebalanceOnce = true;
|
|
}
|
|
|
|
// reset flag
|
|
mInfo("mq re-balance completed successfully");
|
|
taosHashCleanup(pReq->rebSubHash);
|
|
mndRebEnd();
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|
SMnode *pMnode = pMsg->info.node;
|
|
SMDropCgroupReq dropReq = {0};
|
|
|
|
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
|
terrno = TSDB_CODE_INVALID_MSG;
|
|
return -1;
|
|
}
|
|
|
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
|
|
if (pSub == NULL) {
|
|
if (dropReq.igNotExists) {
|
|
mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
|
|
return 0;
|
|
} else {
|
|
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
|
mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
|
terrno = TSDB_CODE_MND_CGROUP_USED;
|
|
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
return -1;
|
|
}
|
|
|
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
|
|
if (pTrans == NULL) {
|
|
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
mndTransDrop(pTrans);
|
|
return -1;
|
|
}
|
|
|
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
|
|
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
|
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
mndTransDrop(pTrans);
|
|
return -1;
|
|
}
|
|
|
|
if (mndTransPrepare(pMnode, pTrans) < 0) {
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
mndTransDrop(pTrans);
|
|
return -1;
|
|
}
|
|
mndReleaseSubscribe(pMnode, pSub);
|
|
|
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
|
}
|
|
|
|
void mndCleanupSubscribe(SMnode *pMnode) {}
|
|
|
|
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
void *buf = NULL;
|
|
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
|
if (tlen <= 0) goto SUB_ENCODE_OVER;
|
|
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
|
|
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
|
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
|
|
|
buf = taosMemoryMalloc(tlen);
|
|
if (buf == NULL) goto SUB_ENCODE_OVER;
|
|
|
|
void *abuf = buf;
|
|
tEncodeSubscribeObj(&abuf, pSub);
|
|
|
|
int32_t dataPos = 0;
|
|
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
|
|
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER);
|
|
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
|
|
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
SUB_ENCODE_OVER:
|
|
taosMemoryFreeClear(buf);
|
|
if (terrno != TSDB_CODE_SUCCESS) {
|
|
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
|
|
sdbFreeRaw(pRaw);
|
|
return NULL;
|
|
}
|
|
|
|
mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub);
|
|
return pRaw;
|
|
}
|
|
|
|
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
SSdbRow *pRow = NULL;
|
|
SMqSubscribeObj *pSub = NULL;
|
|
void *buf = NULL;
|
|
|
|
int8_t sver = 0;
|
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
|
|
|
|
if (sver != MND_SUBSCRIBE_VER_NUMBER) {
|
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
|
goto SUB_DECODE_OVER;
|
|
}
|
|
|
|
pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
|
|
if (pRow == NULL) goto SUB_DECODE_OVER;
|
|
|
|
pSub = sdbGetRowObj(pRow);
|
|
if (pSub == NULL) goto SUB_DECODE_OVER;
|
|
|
|
int32_t dataPos = 0;
|
|
int32_t tlen;
|
|
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
|
buf = taosMemoryMalloc(tlen);
|
|
if (buf == NULL) goto SUB_DECODE_OVER;
|
|
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
|
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
|
|
|
if (tDecodeSubscribeObj(buf, pSub) == NULL) {
|
|
goto SUB_DECODE_OVER;
|
|
}
|
|
|
|
// update epset saved in mnode
|
|
if (pSub->unassignedVgs != NULL) {
|
|
int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs);
|
|
for (int32_t i = 0; i < size; ++i) {
|
|
SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pSub->unassignedVgs, i);
|
|
tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
|
|
}
|
|
}
|
|
if (pSub->consumerHash != NULL) {
|
|
void *pIter = taosHashIterate(pSub->consumerHash, NULL);
|
|
while (pIter) {
|
|
SMqConsumerEp *pConsumerEp = pIter;
|
|
int32_t size = (int32_t)taosArrayGetSize(pConsumerEp->vgs);
|
|
for (int32_t i = 0; i < size; ++i) {
|
|
SMqVgEp *pMqVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
|
|
tmsgUpdateDnodeEpSet(&pMqVgEp->epSet);
|
|
}
|
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
|
}
|
|
}
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
SUB_DECODE_OVER:
|
|
taosMemoryFreeClear(buf);
|
|
if (terrno != TSDB_CODE_SUCCESS) {
|
|
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
|
|
taosMemoryFreeClear(pRow);
|
|
return NULL;
|
|
}
|
|
|
|
mTrace("subscribe:%s, decode from raw:%p, row:%p", pSub->key, pRaw, pSub);
|
|
return pRow;
|
|
}
|
|
|
|
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
|
|
mTrace("subscribe:%s, perform insert action", pSub->key);
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
|
|
mTrace("subscribe:%s, perform delete action", pSub->key);
|
|
tDeleteSubscribeObj(pSub);
|
|
return 0;
|
|
}
|
|
|
|
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
|
|
mTrace("subscribe:%s, perform update action", pOldSub->key);
|
|
taosWLockLatch(&pOldSub->lock);
|
|
|
|
SHashObj *tmp = pOldSub->consumerHash;
|
|
pOldSub->consumerHash = pNewSub->consumerHash;
|
|
pNewSub->consumerHash = tmp;
|
|
|
|
SArray *tmp1 = pOldSub->unassignedVgs;
|
|
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
|
pNewSub->unassignedVgs = tmp1;
|
|
|
|
taosWUnLockLatch(&pOldSub->lock);
|
|
return 0;
|
|
}
|
|
|
|
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
|
|
int32_t tlen = strlen(cgroup);
|
|
memcpy(key, cgroup, tlen);
|
|
key[tlen] = TMQ_SEPARATOR;
|
|
strcpy(key + tlen + 1, topicName);
|
|
return 0;
|
|
}
|
|
|
|
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
|
mndMakeSubscribeKey(key, cgroup, topicName);
|
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
|
if (pSub == NULL) {
|
|
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
|
}
|
|
return pSub;
|
|
}
|
|
|
|
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
|
if (pSub == NULL) {
|
|
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
|
}
|
|
return pSub;
|
|
}
|
|
|
|
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
sdbRelease(pSdb, pSub);
|
|
}
|
|
|
|
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
|
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
|
if (pRedoRaw == NULL) return -1;
|
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
|
return 0;
|
|
}
|
|
|
|
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
|
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
|
|
if (pCommitRaw == NULL) return -1;
|
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
|
return 0;
|
|
}
|
|
|
|
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|
int32_t code = 0;
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
|
|
void *pIter = NULL;
|
|
SMqSubscribeObj *pSub = NULL;
|
|
while (1) {
|
|
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
|
if (pIter == NULL) break;
|
|
|
|
if (pSub->dbUid != pDb->uid) {
|
|
sdbRelease(pSdb, pSub);
|
|
continue;
|
|
}
|
|
|
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
|
sdbRelease(pSdb, pSub);
|
|
sdbCancelFetch(pSdb, pIter);
|
|
code = -1;
|
|
break;
|
|
}
|
|
|
|
sdbRelease(pSdb, pSub);
|
|
}
|
|
|
|
return code;
|
|
}
|
|
|
|
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
|
int32_t code = -1;
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
|
|
void *pIter = NULL;
|
|
SMqSubscribeObj *pSub = NULL;
|
|
while (1) {
|
|
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
|
if (pIter == NULL) break;
|
|
|
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
|
char cgroup[TSDB_CGROUP_LEN];
|
|
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
|
if (strcmp(topic, topicName) != 0) {
|
|
sdbRelease(pSdb, pSub);
|
|
continue;
|
|
}
|
|
|
|
// iter all vnode to delete handle
|
|
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
|
sdbRelease(pSdb, pSub);
|
|
terrno = TSDB_CODE_MND_IN_REBALANCE;
|
|
return -1;
|
|
}
|
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
|
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
|
pReq->head.vgId = htonl(pVgEp->vgId);
|
|
pReq->vgId = pVgEp->vgId;
|
|
pReq->consumerId = -1;
|
|
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
|
STransAction action = {0};
|
|
action.epSet = pVgEp->epSet;
|
|
action.pCont = pReq;
|
|
action.contLen = sizeof(SMqVDeleteReq);
|
|
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
taosMemoryFree(pReq);
|
|
sdbRelease(pSdb, pSub);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
|
sdbRelease(pSdb, pSub);
|
|
goto END;
|
|
}
|
|
|
|
sdbRelease(pSdb, pSub);
|
|
}
|
|
|
|
code = 0;
|
|
END:
|
|
return code;
|
|
}
|
|
|
|
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
|
SMnode *pMnode = pReq->info.node;
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
int32_t numOfRows = 0;
|
|
SMqSubscribeObj *pSub = NULL;
|
|
|
|
mDebug("mnd show subscriptions begin");
|
|
|
|
while (numOfRows < rowsCapacity) {
|
|
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
|
|
if (pShow->pIter == NULL) {
|
|
break;
|
|
}
|
|
|
|
taosRLockLatch(&pSub->lock);
|
|
|
|
if (numOfRows + pSub->vgNum > rowsCapacity) {
|
|
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
|
}
|
|
|
|
SMqConsumerEp *pConsumerEp = NULL;
|
|
void *pIter = NULL;
|
|
while (1) {
|
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
|
if (pIter == NULL) break;
|
|
pConsumerEp = (SMqConsumerEp *)pIter;
|
|
|
|
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
|
for (int32_t j = 0; j < sz; j++) {
|
|
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
|
|
|
SColumnInfoData *pColInfo;
|
|
int32_t cols = 0;
|
|
|
|
// topic and cgroup
|
|
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
|
|
// vg id
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
|
|
// consumer id
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
|
|
|
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
|
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
|
|
|
// offset
|
|
#if 0
|
|
// subscribe time
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
|
|
// rebalance time
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
#endif
|
|
|
|
numOfRows++;
|
|
}
|
|
}
|
|
|
|
// do not show for cleared subscription
|
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
|
|
|
SColumnInfoData *pColInfo;
|
|
int32_t cols = 0;
|
|
|
|
// topic and cgroup
|
|
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
|
|
// vg id
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
|
|
// consumer id
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, NULL, true);
|
|
|
|
mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
|
|
pVgEp->vgId);
|
|
|
|
// offset
|
|
#if 0
|
|
// subscribe time
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
|
|
// rebalance time
|
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
#endif
|
|
|
|
numOfRows++;
|
|
}
|
|
|
|
pBlock->info.rows = numOfRows;
|
|
|
|
taosRUnLockLatch(&pSub->lock);
|
|
sdbRelease(pSdb, pSub);
|
|
}
|
|
|
|
mDebug("mnd end show subscriptions");
|
|
|
|
pShow->numOfRows += numOfRows;
|
|
return numOfRows;
|
|
}
|
|
|
|
void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
|
|
SSdb *pSdb = pMnode->pSdb;
|
|
sdbCancelFetch(pSdb, pIter);
|
|
}
|