refactor(mnode): remove unused code
This commit is contained in:
parent
320202e60e
commit
dcf2c980c5
|
@ -1,50 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_MND_OFFSET_H_
|
|
||||||
#define _TD_MND_OFFSET_H_
|
|
||||||
|
|
||||||
#include "mndInt.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t mndInitOffset(SMnode *pMnode);
|
|
||||||
void mndCleanupOffset(SMnode *pMnode);
|
|
||||||
|
|
||||||
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key);
|
|
||||||
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
|
|
||||||
|
|
||||||
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
|
|
||||||
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
|
|
||||||
|
|
||||||
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
|
|
||||||
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|
||||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
|
||||||
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey);
|
|
||||||
|
|
||||||
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_MND_OFFSET_H_*/
|
|
|
@ -31,6 +31,10 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
|
||||||
|
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
@ -408,12 +407,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
|
||||||
.offset = -1,
|
.offset = -1,
|
||||||
};
|
};
|
||||||
|
|
||||||
// 2.2.2 fetch vg offset
|
|
||||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
|
|
||||||
if (pOffsetObj != NULL) {
|
|
||||||
vgEp.offset = atomic_load_64(&pOffsetObj->offset);
|
|
||||||
mndReleaseOffset(pMnode, pOffsetObj);
|
|
||||||
}
|
|
||||||
taosArrayPush(topicEp.vgs, &vgEp);
|
taosArrayPush(topicEp.vgs, &vgEp);
|
||||||
}
|
}
|
||||||
taosArrayPush(rsp.topics, &topicEp);
|
taosArrayPush(rsp.topics, &topicEp);
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSma.h"
|
#include "mndSma.h"
|
||||||
|
|
|
@ -23,7 +23,6 @@
|
||||||
#include "mndGrant.h"
|
#include "mndGrant.h"
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndPerfSchema.h"
|
#include "mndPerfSchema.h"
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndProfile.h"
|
#include "mndProfile.h"
|
||||||
|
@ -305,7 +304,6 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
||||||
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
|
|
||||||
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
|
||||||
|
|
|
@ -1,370 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndDb.h"
|
|
||||||
#include "mndDnode.h"
|
|
||||||
#include "mndMnode.h"
|
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndShow.h"
|
|
||||||
#include "mndStb.h"
|
|
||||||
#include "mndTopic.h"
|
|
||||||
#include "mndTrans.h"
|
|
||||||
#include "mndUser.h"
|
|
||||||
#include "mndVgroup.h"
|
|
||||||
#include "tname.h"
|
|
||||||
|
|
||||||
#define MND_OFFSET_VER_NUMBER 1
|
|
||||||
#define MND_OFFSET_RESERVE_SIZE 64
|
|
||||||
|
|
||||||
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset);
|
|
||||||
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset);
|
|
||||||
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset);
|
|
||||||
static int32_t mndProcessCommitOffsetReq(SRpcMsg *pReq);
|
|
||||||
|
|
||||||
int32_t mndInitOffset(SMnode *pMnode) {
|
|
||||||
SSdbTable table = {
|
|
||||||
.sdbType = SDB_OFFSET,
|
|
||||||
.keyType = SDB_KEY_BINARY,
|
|
||||||
.encodeFp = (SdbEncodeFp)mndOffsetActionEncode,
|
|
||||||
.decodeFp = (SdbDecodeFp)mndOffsetActionDecode,
|
|
||||||
.insertFp = (SdbInsertFp)mndOffsetActionInsert,
|
|
||||||
.updateFp = (SdbUpdateFp)mndOffsetActionUpdate,
|
|
||||||
.deleteFp = (SdbDeleteFp)mndOffsetActionDelete,
|
|
||||||
};
|
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq);
|
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndCleanupOffset(SMnode *pMnode) {}
|
|
||||||
|
|
||||||
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
|
|
||||||
int32_t i = 0;
|
|
||||||
while (pOffset->key[i] != ':') i++;
|
|
||||||
while (pOffset->key[i] != ':') i++;
|
|
||||||
if (strcmp(&pOffset->key[i + 1], topic) == 0) return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool mndOffsetFromSubKey(SMqOffsetObj *pOffset, const char *subKey) {
|
|
||||||
int32_t i = 0;
|
|
||||||
while (pOffset->key[i] != ':') i++;
|
|
||||||
if (strcmp(&pOffset->key[i + 1], subKey) == 0) return true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
void *buf = NULL;
|
|
||||||
int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset);
|
|
||||||
int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE;
|
|
||||||
|
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size);
|
|
||||||
if (pRaw == NULL) goto OFFSET_ENCODE_OVER;
|
|
||||||
|
|
||||||
buf = taosMemoryMalloc(tlen);
|
|
||||||
if (buf == NULL) goto OFFSET_ENCODE_OVER;
|
|
||||||
|
|
||||||
void *abuf = buf;
|
|
||||||
tEncodeSMqOffsetObj(&abuf, pOffset);
|
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER);
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER);
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER);
|
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER);
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
OFFSET_ENCODE_OVER:
|
|
||||||
taosMemoryFreeClear(buf);
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr());
|
|
||||||
sdbFreeRaw(pRaw);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset);
|
|
||||||
return pRaw;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
void *buf = NULL;
|
|
||||||
|
|
||||||
int8_t sver = 0;
|
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER;
|
|
||||||
|
|
||||||
if (sver != MND_OFFSET_VER_NUMBER) {
|
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
|
||||||
goto OFFSET_DECODE_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t size = sizeof(SMqOffsetObj);
|
|
||||||
SSdbRow *pRow = sdbAllocRow(size);
|
|
||||||
if (pRow == NULL) goto OFFSET_DECODE_OVER;
|
|
||||||
|
|
||||||
SMqOffsetObj *pOffset = sdbGetRowObj(pRow);
|
|
||||||
if (pOffset == NULL) goto OFFSET_DECODE_OVER;
|
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
|
||||||
int32_t tlen;
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER);
|
|
||||||
buf = taosMemoryMalloc(tlen + 1);
|
|
||||||
if (buf == NULL) goto OFFSET_DECODE_OVER;
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER);
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER);
|
|
||||||
|
|
||||||
if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) {
|
|
||||||
goto OFFSET_DECODE_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
OFFSET_DECODE_OVER:
|
|
||||||
taosMemoryFreeClear(buf);
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr());
|
|
||||||
taosMemoryFreeClear(pRow);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset);
|
|
||||||
return pRow;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
|
|
||||||
int32_t sz = taosArrayGetSize(vgs);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
int32_t vgId = *(int32_t *)taosArrayGet(vgs, i);
|
|
||||||
SMqOffsetObj offsetObj = {0};
|
|
||||||
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// TODO assign db
|
|
||||||
offsetObj.offset = -1;
|
|
||||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
|
|
||||||
if (pOffsetRaw == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
(void)sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
|
||||||
// commit log or redo log?
|
|
||||||
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
|
|
||||||
char key[TSDB_PARTITION_KEY_LEN];
|
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
|
||||||
char *msgStr = pMsg->pCont;
|
|
||||||
SMqCMCommitOffsetReq commitOffsetReq;
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, msgStr, pMsg->contLen);
|
|
||||||
|
|
||||||
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset");
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
|
|
||||||
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
|
|
||||||
mInfo("commit offset %" PRId64 " to vgId:%d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId,
|
|
||||||
pOffset->cgroup, pOffset->topicName);
|
|
||||||
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
|
|
||||||
mError("submit offset to topic %s failed", pOffset->topicName);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
bool create = false;
|
|
||||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
|
|
||||||
if (pOffsetObj == NULL) {
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
|
|
||||||
if (pTopic == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
|
||||||
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
|
|
||||||
pOffsetObj->dbUid = pTopic->dbUid;
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
|
|
||||||
create = true;
|
|
||||||
}
|
|
||||||
pOffsetObj->offset = pOffset->offset;
|
|
||||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
|
|
||||||
(void)sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
|
|
||||||
mndTransAppendCommitlog(pTrans, pOffsetRaw);
|
|
||||||
if (create) {
|
|
||||||
taosMemoryFree(pOffsetObj);
|
|
||||||
} else {
|
|
||||||
mndReleaseOffset(pMnode, pOffsetObj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) {
|
|
||||||
mTrace("offset:%s, perform insert action", pOffset->key);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
|
|
||||||
mTrace("offset:%s, perform delete action", pOffset->key);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
|
|
||||||
mTrace("offset:%s, perform update action", pOldOffset->key);
|
|
||||||
atomic_store_64(&pOldOffset->offset, pNewOffset->offset);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key);
|
|
||||||
if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
|
||||||
terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST;
|
|
||||||
}
|
|
||||||
return pOffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
|
||||||
SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset);
|
|
||||||
if (pCommitRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
|
||||||
SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset);
|
|
||||||
if (pRedoRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
void *pIter = NULL;
|
|
||||||
SMqOffsetObj *pOffset = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (pOffset->dbUid != pDb->uid) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
code = -1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
|
||||||
int32_t code = -1;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
void *pIter = NULL;
|
|
||||||
SMqOffsetObj *pOffset = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (!mndOffsetFromTopic(pOffset, topic)) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
END:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey) {
|
|
||||||
int32_t code = -1;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
void *pIter = NULL;
|
|
||||||
SMqOffsetObj *pOffset = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
if (!mndOffsetFromSubKey(pOffset, subKey)) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbRelease(pSdb, pOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
END:
|
|
||||||
return code;
|
|
||||||
}
|
|
|
@ -18,7 +18,6 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSnode.h"
|
#include "mndSnode.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndScheduler.h"
|
#include "mndScheduler.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
@ -683,12 +682,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
|
|
||||||
if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
|
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndOffset.h"
|
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
|
|
@ -671,8 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
|
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
}
|
}
|
||||||
// TODO batch optimization:
|
|
||||||
// TODO continue scan until meeting batch requirement
|
|
||||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue