refactor checkpoint

This commit is contained in:
yihaoDeng 2023-06-17 14:05:49 +08:00
parent 8937192799
commit b6fe5d917b
3 changed files with 11 additions and 7 deletions

View File

@ -346,6 +346,7 @@ typedef struct SStreamMeta {
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
int64_t checkpointTs;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);

View File

@ -944,8 +944,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return 0;
}
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds,
int64_t checkpointId) {
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
int64_t timestampMs = taosGetTimestampMs();
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointTickInterval * 1000) {
return -1;
@ -967,7 +966,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE && NULL == taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId))) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
@ -1003,7 +1002,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
mndTransDrop(pTrans);
return -1;
}
taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &pTask->nodeId, sizeof(pTask->nodeId));
}
}
}
@ -1054,17 +1052,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
SHashObj *vgIds = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId);
code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId);
if (code == -1) {
mInfo("stream:%s failed to do checkpoint, reason: last checkpoint not finished", pStream->name);
}
sdbRelease(pSdb, pStream);
}
taosHashCleanup(vgIds);
return 0;
}

View File

@ -407,6 +407,13 @@ int32_t streamDoCheckpoint(SStreamMeta* pMeta) {
int code = -1;
char buf[256] = {0};
int64_t ts = taosGetTimestampMs();
if (ts - pMeta->checkpointTs <= tsStreamCheckpointTickInterval * 1000) {
// avoid do checkpoint freq
return 0;
}
pMeta->checkpointTs = ts;
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(buf, 0755);
if (code != 0) {