trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-14 16:20:14 +08:00
parent 83dde6a212
commit 52ca94ea6d
5 changed files with 64 additions and 56 deletions

View File

@ -1,9 +1,11 @@
# rocksdb
ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb.git
GIT_TAG v8.1.1
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""

View File

@ -265,7 +265,7 @@ static void *mndThreadFp(void *param) {
}
if (sec % tsStreamCheckpointTickInterval == 0) {
// mndStreamCheckpointTick(pMnode, sec);
mndStreamCheckpointTick(pMnode, sec);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {

View File

@ -850,61 +850,61 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
// listEleSize();
// iterate all stream obj
SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
// SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
taosRLockLatch(&pStream->lock);
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
if (list == NULL) {
SList tlist;
tdListInit(&tlist, TSDB_STREAM_FNAME_LEN);
taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist));
list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
}
tdListAppend(list, (void *)pStream->name);
}
}
taosRUnLockLatch(&pStream->lock);
// taosRLockLatch(&pStream->lock);
// for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
// if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
// int32_t sz = taosArrayGetSize(pLevel);
// SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
// if (list == NULL) {
// SList tlist;
// tdListInit(&tlist, TSDB_STREAM_FNAME_LEN);
// taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist));
// list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
// }
// tdListAppend(list, (void *)pStream->name);
// }
// }
// taosRUnLockLatch(&pStream->lock);
// if (pIter == NULL) break;
// // incr tick
// int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1);
// // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q
// // if (currentTick >= pStream->checkpointFreq) {
// atomic_store_64(&pStream->currentTick, 0);
// SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
if (pIter == NULL) break;
// incr tick
int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1);
// if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q
// if (currentTick >= pStream->checkpointFreq) {
atomic_store_64(&pStream->currentTick, 0);
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
// pMsg->streamId = pStream->uid;
// pMsg->checkpointId = tGenIdPI64();
// memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN);
pMsg->streamId = pStream->uid;
pMsg->checkpointId = tGenIdPI64();
memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN);
// SRpcMsg rpcMsg = {
// .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
// .pCont = pMsg,
// .contLen = sizeof(SMStreamDoCheckpointMsg),
// };
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
.pCont = pMsg,
.contLen = sizeof(SMStreamDoCheckpointMsg),
};
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
void *vgIter = taosHashIterate(vgIds, NULL);
size_t klen = 0;
int64_t checkpointId = tGenIdPI64();
while (vgIter) {
int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen);
SList *val = (SList *)vgIter;
// void *vgIter = taosHashIterate(vgIds, NULL);
// size_t klen = 0;
// int64_t checkpointId = tGenIdPI64();
// while (vgIter) {
// int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen);
// SList *val = (SList *)vgIter;
mndCreateCheckpoint(pMnode, *key, val);
vgIter = taosHashIterate(vgIds, vgIter);
}
taosHashCleanup(vgIds);
// mndCreateCheckpoint(pMnode, *key, val);
// vgIter = taosHashIterate(vgIds, vgIter);
// }
// taosHashCleanup(vgIds);
return 0;
}

View File

@ -190,17 +190,21 @@ void streamBackendCleanup(void* arg) {
qDebug("destroy stream backend backend:%p", pHandle);
return;
}
int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1;
SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid);
static int checkpointSuffix = 1;
static int checkpointSuffix = 0;
char newDir[256] = {0};
char oldDir[256] = {0};
sprintf(oldDir, "%s/checkpoint_%d", path, checkpointSuffix);
sprintf(newDir, "%s/checkpoint_%d", path, 1 - checkpointSuffix);
if (pHandle == NULL) {
return -1;
}
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path);
if (pHandle->db != NULL) {
char* err = NULL;
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err);
@ -210,10 +214,8 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
code = -1;
goto _ERROR;
}
char buf[256] = {0};
sprintf(buf, "%s/checkpoint_%d", path, checkpointSuffix);
rocksdb_checkpoint_create(cp, buf, 64 << 20, &err);
rocksdb_checkpoint_create(cp, newDir, 64 << 20, &err);
if (err != NULL) {
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err);
taosMemoryFreeClear(err);
@ -224,7 +226,11 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
}
rocksdb_checkpoint_object_destroy(cp);
}
checkpointSuffix += 1;
if (taosIsDir(oldDir)) {
taosRemoveDir(oldDir);
}
taosRenameFile(newDir, oldDir);
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
return code;

View File

@ -405,7 +405,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
int32_t streamDoCheckpoint(SStreamMeta* pMeta) {
int code = -1;
char buf[256];
char buf[256] = {0};
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(buf, 0755);