Merge pull request #17681 from taosdata/feature/stream

test(wal): rollback multiple file
This commit is contained in:
Liu Jicong 2022-10-27 09:05:05 +08:00 committed by GitHub
commit 465982fd8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 13 deletions

View File

@ -77,7 +77,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp);

View File

@ -516,17 +516,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
tDeleteSMqDataRsp(&dataRsp);
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pReq);
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
tDeleteSMqDataRsp(&dataRsp);
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
" in vg %d, subkey %s, reset none failed",

View File

@ -27,7 +27,7 @@ bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; }
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
int64_t FORCE_INLINE walGetSnapshotVer(SWal* pWal) { return pWal->vers.snapshotVer; }
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
@ -124,7 +124,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
SWalCkHead* logContent = (SWalCkHead*)candidate;
if (walValidHeadCksum(logContent) != 0) {
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + ((char*)(logContent)-buf), fnameStr);
offset + ((char*)(logContent)-buf), fnameStr);
haystack = candidate + 1;
if (firstTrial) {
break;
@ -163,7 +163,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
if (walValidBodyCksum(logContent) != 0) {
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
wWarn("vgId:%d, failed to validate checksum of wal entry body. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + ((char*)(logContent)-buf), fnameStr);
offset + ((char*)(logContent)-buf), fnameStr);
haystack = candidate + 1;
if (firstTrial) {
break;

View File

@ -257,6 +257,39 @@ TEST_F(WalCleanEnv, rollback) {
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanEnv, rollbackMultiFile) {
int code;
for (int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i + 1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
if (i == 5) {
walBeginSnapshot(pWal, i);
walEndSnapshot(pWal);
}
}
code = walRollback(pWal, 12);
ASSERT_NE(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 9);
code = walRollback(pWal, 9);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 8);
code = walRollback(pWal, 5);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 4);
code = walRollback(pWal, 3);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 2);
code = walWrite(pWal, 3, 3, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, 3);
code = walSaveMeta(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalCleanDeleteEnv, roll) {
int code;
int i;