feat:[TD-26056] add replay logic

This commit is contained in:
wangmm0220 2023-10-09 18:36:39 +08:00
parent 33045e63ae
commit 3e2e924e98
2 changed files with 23 additions and 14 deletions

View File

@ -126,31 +126,37 @@ void mndRebCntDec() {
}
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
int32_t numOfTopics = taosArrayGetSize(pTopicList);
SMqTopicObj *pTopic = NULL;
int32_t code = 0;
int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function
return -1;
code = -1;
goto FAILED;
}
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
code = -1;
goto FAILED;
}
if(enableReplay){
if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){
return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT;
goto FAILED;
}else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
if (pDb == NULL) {
mndReleaseTopic(pMnode, pTopic);
return -1;
code = -1;
goto FAILED;
}
if (pDb->cfg.numOfVgroups != 1) {
return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
mndReleaseDb(pMnode, pDb);
code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP;
goto FAILED;
}
mndReleaseDb(pMnode, pDb);
}
@ -158,13 +164,16 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
mndTransSetDbName(pTrans, pOneTopic, NULL);
if(mndTransCheckConflict(pMnode, pTrans) != 0){
mndReleaseTopic(pMnode, pTopic);
return -1;
code = -1;
goto FAILED;
}
mndReleaseTopic(pMnode, pTopic);
}
return 0;
FAILED:
mndReleaseTopic(pMnode, pTopic);
return code;
}
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {

View File

@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) {
tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage);
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) {
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = withTbName;
pRsp->withTbName = 1;
pRsp->withSchema = 1;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName);
tqInitTaosxRsp(&taosxRsp, *offset);
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {