From 387b4d365e0d6d06f7506940d3b99b00348bbcbb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 21 Aug 2023 19:07:15 +0800 Subject: [PATCH] refactor checkpoint --- source/dnode/vnode/src/tq/tq.c | 10 ++-- source/libs/stream/src/streamBackendRocksdb.c | 54 +++++++++++-------- source/libs/stream/src/streamMeta.c | 26 ++++++--- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0ce4aa0d1e..897a53ac81 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1691,7 +1691,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { // todo refactor. int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { - STQ* pTq = pVnode->pTq; + STQ* pTq = pVnode->pTq; int32_t vgId = pVnode->config.vgId; SMsgHead* msgStr = pMsg->pCont; @@ -1710,7 +1710,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId); + tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { @@ -1883,7 +1883,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); - streamMetaSaveTask(pMeta, pTask); + { + taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + } streamTaskStop(pTask); taosWLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 37b4f45df6..dca58b9bdc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -466,16 +466,16 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { taosThreadMutexInit(&pHandle->cfMutex, NULL); pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + // rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; - rocksdb_env_set_low_priority_background_threads(env, nBGThread); - rocksdb_env_set_high_priority_background_threads(env, nBGThread); + // int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2; + // rocksdb_env_set_low_priority_background_threads(env, nBGThread); + // rocksdb_env_set_high_priority_background_threads(env, nBGThread); rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2); rocksdb_options_t* opts = rocksdb_options_create(); - rocksdb_options_set_env(opts, env); + // rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); @@ -485,7 +485,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); - pHandle->env = env; + // pHandle->env = env; pHandle->dbOpt = opts; pHandle->cache = cache; pHandle->filterFactory = rocksdb_compactionfilterfactory_create( @@ -520,7 +520,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { _EXIT: rocksdb_options_destroy(opts); rocksdb_cache_destroy(cache); - rocksdb_env_destroy(env); + // rocksdb_env_destroy(env); taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->cfMutex); taosHashCleanup(pHandle->cfInst); @@ -556,7 +556,7 @@ void streamBackendCleanup(void* arg) { rocksdb_close(pHandle->db); } rocksdb_options_destroy(pHandle->dbOpt); - rocksdb_env_destroy(pHandle->env); + // rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); SListNode* head = tdListPopHead(pHandle->list); @@ -570,15 +570,18 @@ void streamBackendCleanup(void* arg) { taosThreadMutexDestroy(&pHandle->mutex); taosThreadMutexDestroy(&pHandle->cfMutex); - qDebug("destroy stream backend backend:%p", pHandle); + qDebug("destroy stream backend :%p", pHandle); taosMemoryFree(pHandle); return; } void streamBackendHandleCleanup(void* arg) { SBackendCfWrapper* wrapper = arg; bool remove = wrapper->remove; + taosThreadRwlockWrlock(&wrapper->rwLock); + qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); if (wrapper->rocksdb == NULL) { + taosThreadRwlockUnlock(&wrapper->rwLock); return; } @@ -589,7 +592,7 @@ void streamBackendHandleCleanup(void* arg) { for (int i = 0; i < cfLen; i++) { if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, wrapper->pHandle[i], &err); if (err != NULL) { - // qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); + qError("failed to drop cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); } } @@ -600,7 +603,7 @@ void streamBackendHandleCleanup(void* arg) { for (int i = 0; i < cfLen; i++) { if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err); if (err != NULL) { - qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); + qError("failed to flush cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); } } @@ -628,6 +631,7 @@ void streamBackendHandleCleanup(void* arg) { wrapper->readOpts = NULL; taosMemoryFreeClear(wrapper->cfOpts); taosMemoryFreeClear(wrapper->param); + taosThreadRwlockUnlock(&wrapper->rwLock); taosThreadRwlockDestroy(&wrapper->rwLock); wrapper->rocksdb = NULL; @@ -783,6 +787,11 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (wrapper->pHandle[i]) { rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; + size_t len = 0; + char* name = rocksdb_column_family_handle_get_name(p, &len); + qError("column name: name: %d", (int)len); + taosMemoryFree(name); + taosArrayPush(pHandle, &p); } } @@ -887,10 +896,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } // Get all cf and acquire cfWrappter - int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); + int32_t nCf = 0; // chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0); - code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + // code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + code = 0; if (code == 0) { code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); if (code != 0) { @@ -903,10 +913,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } // release all ref to cfWrapper; - for (int i = 0; i < taosArrayGetSize(refs); i++) { - int64_t id = *(int64_t*)taosArrayGet(refs, i); - taosReleaseRef(streamBackendCfWrapperId, id); - } + // for (int i = 0; i < taosArrayGetSize(refs); i++) { + // int64_t id = *(int64_t*)taosArrayGet(refs, i); + // taosReleaseRef(streamBackendCfWrapperId, id); + // } if (code == 0) { taosWLockLatch(&pMeta->chkpDirLock); taosArrayPush(pMeta->chkpSaved, &checkpointId); @@ -1431,8 +1441,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } // close default cf if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) { - rocksdb_column_family_handle_destroy(cfHandle[0]); - cfHandle[0] = NULL; + // rocksdb_column_family_handle_destroy(cfHandle[0]); + // cfHandle[0] = NULL; } rocksdb_options_destroy(cfOpts[0]); @@ -1464,7 +1474,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1585,7 +1595,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + // rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 899ffdcd4d..979b9be384 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -25,8 +25,8 @@ int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); -static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); +static void metaHbToMnode(void* param, void* tmrId); +static void streamMetaClear(SStreamMeta* pMeta); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); @@ -100,9 +100,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpId = chkpId; pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); - if (pMeta->streamBackend == NULL) { - goto _err; + while (pMeta->streamBackend == NULL) { + taosMsleep(2 * 1000); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + if (pMeta->streamBackend == NULL) { + qError("vgId:%d failed to init stream backend", pMeta->vgId); + } } + // if (pMeta->streamBackend == NULL) { + // goto _err; + // } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); code = streamBackendLoadCheckpointInfo(pMeta); @@ -158,9 +165,13 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { } pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); - if (pMeta->streamBackend == NULL) { - qError("vgId:%d failed to init stream backend", pMeta->vgId); - return -1; + while (pMeta->streamBackend == NULL) { + taosMsleep(2 * 1000); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + if (pMeta->streamBackend == NULL) { + qError("vgId:%d failed to init stream backend", pMeta->vgId); + // return -1; + } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); @@ -523,7 +534,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { } else { tdbFree(pKey); tdbFree(pVal); - tdbTbcClose(pCur); taosMemoryFree(pTask); continue; }