fix recover error

This commit is contained in:
yihaoDeng 2023-07-13 20:46:58 +08:00
parent 9e128b92f8
commit e588640e02
1 changed files with 99 additions and 6 deletions

View File

@ -1009,7 +1009,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosRUnLockLatch(&pStream->lock);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return -1;
}
@ -1019,7 +1019,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosRUnLockLatch(&pStream->lock);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return -1;
}
@ -1034,7 +1034,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
taosRUnLockLatch(&pStream->lock);
taosWUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
@ -1079,6 +1079,78 @@ _ERR:
mndTransDrop(pTrans);
return -1;
}
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode,
int64_t checkpointId) {
taosWLockLatch(&pStream->lock);
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
return -1;
}
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
return -1;
}
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
}
pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0);
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
return -1;
}
return 0;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -1089,16 +1161,37 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mndTransSetDbName(pTrans, "checkpoint", "checkpoint");
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId);
code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId);
if (code == -1) {
mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name);
sdbRelease(pSdb, pStream);
break;
}
sdbRelease(pSdb, pStream);
}
return 0;
if (code == 0) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepre trans rebalance since %s", terrstr());
}
}
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {