From 52ca94ea6d3a43956089d9089cf4c1dd0fd74c70 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 14 Jun 2023 16:20:14 +0800 Subject: [PATCH] trigger checkpoint --- cmake/rocksdb_CMakeLists.txt.in | 8 +- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 88 +++++++++---------- source/libs/stream/src/streamBackendRocksdb.c | 20 +++-- source/libs/stream/src/streamMeta.c | 2 +- 5 files changed, 64 insertions(+), 56 deletions(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index ba4a404af6..16017923a7 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,9 +1,11 @@ # rocksdb ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/facebook/rocksdb.git - GIT_TAG v8.1.1 - SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" + URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz + URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b + DOWNLOAD_NO_PROGRESS 1 + DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" + SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" CONFIGURE_COMMAND "" BUILD_COMMAND "" INSTALL_COMMAND "" diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f78675f103..da4095d7ff 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -265,7 +265,7 @@ static void *mndThreadFp(void *param) { } if (sec % tsStreamCheckpointTickInterval == 0) { - // mndStreamCheckpointTick(pMnode, sec); + mndStreamCheckpointTick(pMnode, sec); } if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5e3d4916a4..bcd31c2906 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -850,61 +850,61 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { // listEleSize(); // iterate all stream obj - SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); + // SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - taosRLockLatch(&pStream->lock); - for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - SStreamTask *pTask = taosArrayGetP(pLevel, 0); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int32_t sz = taosArrayGetSize(pLevel); - SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); - if (list == NULL) { - SList tlist; - tdListInit(&tlist, TSDB_STREAM_FNAME_LEN); - taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist)); - list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); - } - tdListAppend(list, (void *)pStream->name); - } - } - taosRUnLockLatch(&pStream->lock); + // taosRLockLatch(&pStream->lock); + // for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { + // SArray *pLevel = taosArrayGetP(pStream->tasks, i); + // SStreamTask *pTask = taosArrayGetP(pLevel, 0); + // if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + // int32_t sz = taosArrayGetSize(pLevel); + // SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); + // if (list == NULL) { + // SList tlist; + // tdListInit(&tlist, TSDB_STREAM_FNAME_LEN); + // taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist)); + // list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId)); + // } + // tdListAppend(list, (void *)pStream->name); + // } + // } + // taosRUnLockLatch(&pStream->lock); - // if (pIter == NULL) break; - // // incr tick - // int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1); - // // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q - // // if (currentTick >= pStream->checkpointFreq) { - // atomic_store_64(&pStream->currentTick, 0); - // SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); + if (pIter == NULL) break; + // incr tick + int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1); + // if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q + // if (currentTick >= pStream->checkpointFreq) { + atomic_store_64(&pStream->currentTick, 0); + SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - // pMsg->streamId = pStream->uid; - // pMsg->checkpointId = tGenIdPI64(); - // memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN); + pMsg->streamId = pStream->uid; + pMsg->checkpointId = tGenIdPI64(); + memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN); - // SRpcMsg rpcMsg = { - // .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, - // .pCont = pMsg, - // .contLen = sizeof(SMStreamDoCheckpointMsg), - // }; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, + .pCont = pMsg, + .contLen = sizeof(SMStreamDoCheckpointMsg), + }; - // tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } - void *vgIter = taosHashIterate(vgIds, NULL); - size_t klen = 0; - int64_t checkpointId = tGenIdPI64(); - while (vgIter) { - int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen); - SList *val = (SList *)vgIter; + // void *vgIter = taosHashIterate(vgIds, NULL); + // size_t klen = 0; + // int64_t checkpointId = tGenIdPI64(); + // while (vgIter) { + // int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen); + // SList *val = (SList *)vgIter; - mndCreateCheckpoint(pMnode, *key, val); - vgIter = taosHashIterate(vgIds, vgIter); - } - taosHashCleanup(vgIds); + // mndCreateCheckpoint(pMnode, *key, val); + // vgIter = taosHashIterate(vgIds, vgIter); + // } + // taosHashCleanup(vgIds); return 0; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 82a2aff3f0..cc8634a692 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -190,17 +190,21 @@ void streamBackendCleanup(void* arg) { qDebug("destroy stream backend backend:%p", pHandle); return; } - int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { int64_t st = taosGetTimestampMs(); int32_t code = -1; SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid); - static int checkpointSuffix = 1; + static int checkpointSuffix = 0; + + char newDir[256] = {0}; + char oldDir[256] = {0}; + sprintf(oldDir, "%s/checkpoint_%d", path, checkpointSuffix); + sprintf(newDir, "%s/checkpoint_%d", path, 1 - checkpointSuffix); + if (pHandle == NULL) { return -1; } qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path); - if (pHandle->db != NULL) { char* err = NULL; rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); @@ -210,10 +214,8 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { code = -1; goto _ERROR; } - char buf[256] = {0}; - sprintf(buf, "%s/checkpoint_%d", path, checkpointSuffix); - rocksdb_checkpoint_create(cp, buf, 64 << 20, &err); + rocksdb_checkpoint_create(cp, newDir, 64 << 20, &err); if (err != NULL) { qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); taosMemoryFreeClear(err); @@ -224,7 +226,11 @@ int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) { } rocksdb_checkpoint_object_destroy(cp); } - checkpointSuffix += 1; + if (taosIsDir(oldDir)) { + taosRemoveDir(oldDir); + } + taosRenameFile(newDir, oldDir); + _ERROR: taosReleaseRef(streamBackendId, backendRid); return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a572159af1..c840876884 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -405,7 +405,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { int32_t streamDoCheckpoint(SStreamMeta* pMeta) { int code = -1; - char buf[256]; + char buf[256] = {0}; sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(buf, 0755);