refactor stream checkpoint

This commit is contained in:
yihaoDeng 2023-10-19 15:19:52 +08:00
parent c6ba2ca205
commit 67dc52d2d6
1 changed files with 97 additions and 128 deletions

View File

@ -961,107 +961,102 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return 0; return 0;
} }
// static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) { static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
// int64_t timestampMs = taosGetTimestampMs(); int32_t code = -1;
// if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) { int64_t timestampMs = taosGetTimestampMs();
// return -1; if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
// } return -1;
}
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
// if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
// mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
// if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
// mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name,
// checkpointId,
// tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
// mndTransDrop(pTrans);
// return -1;
// }
// mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// atomic_store_64(&pStream->currentTick, 1);
// taosWLockLatch(&pStream->lock);
// // 1. redo action: broadcast checkpoint source msg for all source vg
// int32_t totLevel = taosArrayGetSize(pStream->tasks);
// for (int32_t i = 0; i < totLevel; i++) {
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
// if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// int32_t sz = taosArrayGetSize(pLevel);
// for (int32_t j = 0; j < sz; j++) {
// SStreamTask *pTask = taosArrayGetP(pLevel, j);
// /*A(pTask->info.nodeId > 0);*/
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
// if (pVgObj == NULL) {
// taosWUnLockLatch(&pStream->lock);
// mndTransDrop(pTrans);
// return -1;
// }
// void *buf; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
// int32_t tlen; if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
// pTask->id.taskId) < 0) { tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
// mndReleaseVgroup(pMnode, pVgObj); goto _ERR;
// taosWUnLockLatch(&pStream->lock); }
// mndTransDrop(pTrans);
// return -1;
// }
// STransAction action = {0}; mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj); taosWLockLatch(&pStream->lock);
// action.pCont = buf; pStream->currentTick = 1;
// action.contLen = tlen; // 1. redo action: broadcast checkpoint source msg for all source vg
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// mndReleaseVgroup(pMnode, pVgObj); int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
// if (mndTransAppendRedoAction(pTrans, &action) != 0) { void *buf;
// taosMemoryFree(buf); int32_t tlen;
// taosWUnLockLatch(&pStream->lock); if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
// mndReleaseStream(pMnode, pStream); pTask->id.taskId) < 0) {
// mndTransDrop(pTrans); mndReleaseVgroup(pMnode, pVgObj);
// return -1; taosWUnLockLatch(&pStream->lock);
// } goto _ERR;
// } }
// }
// }
// // 2. reset tick
// pStream->checkpointFreq = checkpointId;
// pStream->checkpointId = checkpointId;
// pStream->checkpointFreq = taosGetTimestampMs();
// atomic_store_64(&pStream->currentTick, 0);
// // 3. commit log: stream checkpoint info
// pStream->version = pStream->version + 1;
// taosWUnLockLatch(&pStream->lock);
// // // code condtion STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); mndReleaseVgroup(pMnode, pVgObj);
// if (pCommitRaw == NULL) {
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
// sdbFreeRaw(pCommitRaw);
// mError("failed to prepare trans rebalance since %s", terrstr());
// goto _ERR;
// }
// if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
// mError("failed to prepare trans rebalance since %s", terrstr()); taosMemoryFree(buf);
// goto _ERR; taosWUnLockLatch(&pStream->lock);
// } goto _ERR;
// mndTransDrop(pTrans); }
// return 0; }
// _ERR: }
// mndTransDrop(pTrans); }
// return -1; // 2. reset tick
// } pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
pStream->currentTick = 0;
// 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1;
taosWUnLockLatch(&pStream->lock);
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
sdbFreeRaw(pCommitRaw);
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto _ERR;
}
code = 0;
_ERR:
mndTransDrop(pTrans);
return code;
}
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode,
int64_t checkpointId) { int64_t checkpointId) {
@ -1111,7 +1106,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
pStream->checkpointId = checkpointId; pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs(); pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0); pStream->currentTick = 0;
// 3. commit log: stream checkpoint info // 3. commit log: stream checkpoint info
pStream->version = pStream->version + 1; pStream->version = pStream->version + 1;
@ -1212,43 +1207,17 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId; int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, "checkpoint");
taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break; if (pIter == NULL) break;
code = mndAddStreamCheckpointToTrans(pTrans, pStream, pMnode, checkpointId); code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
if (code == -1) { if (code == -1) {
break; break;
} }
} }
if (code == 0) {
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepre trans rebalance since %s", terrstr());
}
}
mndTransDrop(pTrans);
return code; return code;
} }