From fa702212382d2a3edb0abd9f8b1597191b5f0826 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 Oct 2023 12:17:55 +0800 Subject: [PATCH] refact task backend --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/inc/streamBackendRocksdb.h | 23 +- source/libs/stream/inc/streamInt.h | 12 +- source/libs/stream/src/streamBackendRocksdb.c | 381 +++++++++--------- source/libs/stream/src/streamMeta.c | 7 +- source/libs/stream/src/streamTask.c | 6 +- 7 files changed, 230 insertions(+), 203 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 553e8f8217..9bc4a91f1c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -726,7 +726,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key); +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e95114fcfb..7cfe776378 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,7 +738,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); if (code != TSDB_CODE_SUCCESS) return code; - pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr); + pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, &pTask->backendRefId); if (pTask->pBackend == NULL) return -1; streamTaskOpenAllUpstreamInput(pTask); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 2c5eeb1fbe..f5c303b809 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -69,17 +69,20 @@ typedef struct { } STaskBackendWrapper; -void* streamBackendInit(const char* path, int64_t chkpId); -void streamBackendCleanup(void* arg); -void streamBackendHandleCleanup(void* arg); -int32_t streamBackendLoadCheckpointInfo(void* pMeta); -int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); -SListNode* streamBackendAddCompare(void* backend, void* arg); -void streamBackendDelCompare(void* backend, void* arg); -int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); -STaskBackendWrapper* taskBackendOpen(char* path, char* key); +void* streamBackendInit(const char* path, int64_t chkpId); +void streamBackendCleanup(void* arg); +void streamBackendHandleCleanup(void* arg); +int32_t streamBackendLoadCheckpointInfo(void* pMeta); +int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); +SListNode* streamBackendAddCompare(void* backend, void* arg); +void streamBackendDelCompare(void* backend, void* arg); +int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); -void taskBackendAddRef(void* pTaskBackend); +STaskBackendWrapper* taskBackendOpen(char* path, char* key); +void taskBackendDestroy(void* pBackend); + +void* taskBackendAddRef(void* pTaskBackend); +void taskBackendRemoveRef(void* pTaskBackend); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bbb7595e5a..54aefc8962 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -18,9 +18,9 @@ #include "executor.h" #include "query.h" -#include "tstream.h" #include "streamBackendRocksdb.h" #include "trpc.h" +#include "tstream.h" #ifdef __cplusplus extern "C" { @@ -41,8 +41,9 @@ typedef struct { } SStreamContinueExecInfo; extern SStreamGlobalEnv streamEnv; -extern int32_t streamBackendId; -extern int32_t streamBackendCfWrapperId; +extern int32_t streamBackendId; +extern int32_t streamBackendCfWrapperId; +extern int32_t taskBackendWrapperId; const char* streamGetBlockTypeStr(int32_t type); void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); @@ -68,10 +69,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, + int32_t* pLen); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 0d34970cb3..5589932fb8 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -770,39 +770,40 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { } int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) { - SArray* pHandle = taosArrayInit(16, POINTER_BYTES); - void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); - while (pIter) { - int64_t id = *(int64_t*)pIter; + return 0; + // SArray* pHandle = taosArrayInit(16, POINTER_BYTES); + // void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + // while (pIter) { + // int64_t id = *(int64_t*)pIter; - SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); - if (wrapper == NULL) { - pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); - continue; - } + // SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); + // if (wrapper == NULL) { + // pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + // continue; + // } - taosThreadRwlockRdlock(&wrapper->rwLock); - for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { - if (wrapper->pHandle[i]) { - rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; - taosArrayPush(pHandle, &p); - } - } - taosThreadRwlockUnlock(&wrapper->rwLock); + // taosThreadRwlockRdlock(&wrapper->rwLock); + // for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + // if (wrapper->pHandle[i]) { + // rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; + // taosArrayPush(pHandle, &p); + // } + // } + // taosThreadRwlockUnlock(&wrapper->rwLock); - taosArrayPush(refs, &id); - } + // taosArrayPush(refs, &id); + // } - int32_t nCf = taosArrayGetSize(pHandle); + // int32_t nCf = taosArrayGetSize(pHandle); - rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); - for (int i = 0; i < nCf; i++) { - ppCf[i] = taosArrayGetP(pHandle, i); - } - taosArrayDestroy(pHandle); + // rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + // for (int i = 0; i < nCf; i++) { + // ppCf[i] = taosArrayGetP(pHandle, i); + // } + // taosArrayDestroy(pHandle); - *ppHandle = ppCf; - return nCf; + // *ppHandle = ppCf; + // return nCf; } int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { int32_t code = -1; @@ -826,172 +827,176 @@ _ERROR: return code; } int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { - int code = 0; - char* err = NULL; + return 0; + // int code = 0; + // char* err = NULL; - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - rocksdb_flushoptions_set_wait(flushOpt, 1); + // rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + // rocksdb_flushoptions_set_wait(flushOpt, 1); - rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); - if (err != NULL) { - qError("failed to flush db before streamBackend clean up, reason:%s", err); - taosMemoryFree(err); - code = -1; - } - rocksdb_flushoptions_destroy(flushOpt); - return code; + // rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); + // if (err != NULL) { + // qError("failed to flush db before streamBackend clean up, reason:%s", err); + // taosMemoryFree(err); + // code = -1; + // } + // rocksdb_flushoptions_destroy(flushOpt); + // return code; } int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { - int32_t code = 0; - char* pChkpDir = taosMemoryCalloc(1, 256); - char* pChkpIdDir = taosMemoryCalloc(1, 256); - - sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); - code = taosMulModeMkDir(pChkpDir, 0755, true); - if (code != 0) { - qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); - taosMemoryFree(pChkpDir); - taosMemoryFree(pChkpIdDir); - code = -1; - return code; - } - - sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); - if (taosIsDir(pChkpIdDir)) { - qInfo("stream rm exist checkpoint%s", pChkpIdDir); - taosRemoveFile(pChkpIdDir); - } - *chkpDir = pChkpDir; - *chkpIdDir = pChkpIdDir; - return 0; + // int32_t code = 0; + // char* pChkpDir = taosMemoryCalloc(1, 256); + // char* pChkpIdDir = taosMemoryCalloc(1, 256); + + // sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); + // code = taosMulModeMkDir(pChkpDir, 0755, true); + // if (code != 0) { + // qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + // taosMemoryFree(pChkpDir); + // taosMemoryFree(pChkpIdDir); + // code = -1; + // return code; + // } + + // sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); + // if (taosIsDir(pChkpIdDir)) { + // qInfo("stream rm exist checkpoint%s", pChkpIdDir); + // taosRemoveFile(pChkpIdDir); + // } + // *chkpDir = pChkpDir; + // *chkpIdDir = pChkpIdDir; + + // return 0; } int32_t streamBackendTriggerChkp(void* arg, char* dst) { - SStreamMeta* pMeta = arg; - int64_t backendRid = pMeta->streamBackendRid; - int32_t code = -1; + return 0; + // SStreamMeta* pMeta = arg; + // int64_t backendRid = pMeta->streamBackendRid; + // int32_t code = -1; - SArray* refs = taosArrayInit(16, sizeof(int64_t)); - rocksdb_column_family_handle_t** ppCf = NULL; + // SArray* refs = taosArrayInit(16, sizeof(int64_t)); + // rocksdb_column_family_handle_t** ppCf = NULL; - int64_t st = taosGetTimestampMs(); - SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); + // int64_t st = taosGetTimestampMs(); + // SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); - if (pHandle == NULL || pHandle->db == NULL) { - goto _ERROR; - } - int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf); + // if (pHandle == NULL || pHandle->db == NULL) { + // goto _ERROR; + // } + // int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + // qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf); - code = chkpPreFlushDb(pHandle->db, ppCf, nCf); - if (code == 0) { - code = chkpDoDbCheckpoint(pHandle->db, dst); - if (code != 0) { - qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); - } else { - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, - taosGetTimestampMs() - st); - } - } else { - qError("stream backend:%p failed to flush db at:%s", pHandle, dst); - } + // code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + // if (code == 0) { + // code = chkpDoDbCheckpoint(pHandle->db, dst); + // if (code != 0) { + // qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); + // } else { + // qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, + // taosGetTimestampMs() - st); + // } + // } else { + // qError("stream backend:%p failed to flush db at:%s", pHandle, dst); + // } - // release all ref to cfWrapper; - for (int i = 0; i < taosArrayGetSize(refs); i++) { - int64_t id = *(int64_t*)taosArrayGet(refs, i); - taosReleaseRef(streamBackendCfWrapperId, id); - } + // // release all ref to cfWrapper; + // // for (int i = 0; i < taosArrayGetSize(refs); i++) { + // // int64_t id = *(int64_t*)taosArrayGet(refs, i); + // // taosReleaseRef(streamBackendCfWrapperId, id); + // // } -_ERROR: - taosReleaseRef(streamBackendId, backendRid); - taosArrayDestroy(refs); - return code; + // _ERROR: + // // taosReleaseRef(streamBackendId, backendRid); + // // taosArrayDestroy(refs); + // return code; } int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { - if (arg == NULL) return 0; + // if (arg == NULL) return 0; - SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->chkpDirLock); - taosArrayPush(pMeta->chkpInUse, &chkpId); - taosWUnLockLatch(&pMeta->chkpDirLock); + // SStreamMeta* pMeta = arg; + // taosWLockLatch(&pMeta->chkpDirLock); + // taosArrayPush(pMeta->chkpInUse, &chkpId); + // taosWUnLockLatch(&pMeta->chkpDirLock); return 0; } int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { - if (arg == NULL) return 0; - - SStreamMeta* pMeta = arg; - taosWLockLatch(&pMeta->chkpDirLock); - if (taosArrayGetSize(pMeta->chkpInUse) > 0) { - int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); - if (id == chkpId) { - taosArrayPopFrontBatch(pMeta->chkpInUse, 1); - } - } - taosWUnLockLatch(&pMeta->chkpDirLock); return 0; + // if (arg == NULL) return 0; + + // SStreamMeta* pMeta = arg; + // taosWLockLatch(&pMeta->chkpDirLock); + // if (taosArrayGetSize(pMeta->chkpInUse) > 0) { + // int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); + // if (id == chkpId) { + // taosArrayPopFrontBatch(pMeta->chkpInUse, 1); + // } + // } + // taosWUnLockLatch(&pMeta->chkpDirLock); } int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { - SStreamMeta* pMeta = arg; - int64_t backendRid = pMeta->streamBackendRid; - int64_t st = taosGetTimestampMs(); - int32_t code = -1; + return 0; + // SStreamMeta* pMeta = arg; + // int64_t backendRid = pMeta->streamBackendRid; + // int64_t st = taosGetTimestampMs(); + // int32_t code = -1; - SArray* refs = taosArrayInit(16, sizeof(int64_t)); + // SArray* refs = taosArrayInit(16, sizeof(int64_t)); - rocksdb_column_family_handle_t** ppCf = NULL; + // rocksdb_column_family_handle_t** ppCf = NULL; - char* pChkpDir = NULL; - char* pChkpIdDir = NULL; - if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - taosArrayDestroy(refs); - return code; - } + // char* pChkpDir = NULL; + // char* pChkpIdDir = NULL; + // if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { + // taosArrayDestroy(refs); + // return code; + // } - SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); - if (pHandle == NULL || pHandle->db == NULL) { - goto _ERROR; - } + // SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); + // if (pHandle == NULL || pHandle->db == NULL) { + // goto _ERROR; + // } - // Get all cf and acquire cfWrappter - int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); + // // Get all cf and acquire cfWrappter + // int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + // qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); - code = chkpPreFlushDb(pHandle->db, ppCf, nCf); - if (code == 0) { - code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); - if (code != 0) { - qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); - } else { - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, - taosGetTimestampMs() - st); - } - } else { - qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); - } - // release all ref to cfWrapper; - for (int i = 0; i < taosArrayGetSize(refs); i++) { - int64_t id = *(int64_t*)taosArrayGet(refs, i); - taosReleaseRef(streamBackendCfWrapperId, id); - } - if (code == 0) { - taosWLockLatch(&pMeta->chkpDirLock); - taosArrayPush(pMeta->chkpSaved, &checkpointId); - taosWUnLockLatch(&pMeta->chkpDirLock); + // code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + // if (code == 0) { + // code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); + // if (code != 0) { + // qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); + // } else { + // qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, + // taosGetTimestampMs() - st); + // } + // } else { + // qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); + // } + // // release all ref to cfWrapper; + // for (int i = 0; i < taosArrayGetSize(refs); i++) { + // int64_t id = *(int64_t*)taosArrayGet(refs, i); + // taosReleaseRef(streamBackendCfWrapperId, id); + // } + // if (code == 0) { + // taosWLockLatch(&pMeta->chkpDirLock); + // taosArrayPush(pMeta->chkpSaved, &checkpointId); + // taosWUnLockLatch(&pMeta->chkpDirLock); - // delete obsolte checkpoint - delObsoleteCheckpoint(arg, pChkpDir); - pMeta->chkpId = checkpointId; - } + // // delete obsolte checkpoint + // delObsoleteCheckpoint(arg, pChkpDir); + // pMeta->chkpId = checkpointId; + // } -_ERROR: - taosReleaseRef(streamBackendId, backendRid); - taosArrayDestroy(refs); - taosMemoryFree(ppCf); - taosMemoryFree(pChkpDir); - taosMemoryFree(pChkpIdDir); - return code; + // _ERROR: + // taosReleaseRef(streamBackendId, backendRid); + // taosArrayDestroy(refs); + // taosMemoryFree(ppCf); + // taosMemoryFree(pChkpDir); + // taosMemoryFree(pChkpIdDir); + // return code; } SListNode* streamBackendAddCompare(void* backend, void* arg) { @@ -1499,12 +1504,15 @@ _EXIT: taosMemoryFree(cfHandle); return code; } -void taskBackendAddRef(void* pTaskBackend) { +void* taskBackendAddRef(void* pTaskBackend) { STaskBackendWrapper* pBackend = pTaskBackend; - taosAcquireRef(streamBackendCfWrapperId, pBackend->refId); - return; + return taosAcquireRef(taskBackendWrapperId, pBackend->refId); } -void taskBackendDestroy(STaskBackendWrapper* wrapper); +void taskBackendRemoveRef(void* pTaskBackend) { + // STaskBackendWrapper* pBackend = pTaskBackend; + // taosReleaseRef(taskBackendWrapperId, pBackend->refId); +} +// void taskBackendDestroy(STaskBackendWrapper* wrapper); void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) { rocksdb_env_t* env = rocksdb_create_default_env(); @@ -1579,44 +1587,49 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) { char* taskPath = NULL; char* err = NULL; - int32_t code = taskBackendBuildFullPath(path, key, &taskPath); - if (code != 0) return NULL; - + char** cfNames = NULL; size_t nCf = 0; + if (0 != taskBackendBuildFullPath(path, key, &taskPath)) return NULL; + STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); taskBackendInitOpt(pTaskBackend); - char** cfs = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); - if (nCf == 0 || nCf == 1 || err != NULL) { - taosMemoryFreeClear(err); - pTaskBackend->db = rocksdb_open(pTaskBackend->dbOpt, taskPath, &err); - if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err); - taosMemoryFreeClear(err); - code = -1; - goto _EXIT; - } - } else { - code = taskBackendOpenCfs(pTaskBackend, taskPath, cfs, nCf); - if (code != 0) goto _EXIT; + cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); + if (nCf == 0) { + pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], taskPath, &err); + rocksdb_close(pTaskBackend->db); + + if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); + taosMemoryFree(err); + } + + cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); + ASSERT(err != NULL); + + if (0 != taskBackendOpenCfs(pTaskBackend, taskPath, cfNames, nCf)) { + goto _EXIT; } - if (cfs != NULL) rocksdb_list_column_families_destroy(cfs, nCf); taosThreadMutexInit(&pTaskBackend->mutex, NULL); qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); taosMemoryFree(taskPath); - pTaskBackend->refId = taosAddRef(streamBackendCfWrapperId, pTaskBackend); + if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); return pTaskBackend; _EXIT: taskBackendDestroy(pTaskBackend); + + if (err != NULL) taosMemoryFree(err); + if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); return NULL; } -void taskBackendDestroy(STaskBackendWrapper* wrapper) { +void taskBackendDestroy(void* pBackend) { + STaskBackendWrapper* wrapper = pBackend; + if (wrapper == NULL) return; if (wrapper->db && wrapper->pCf) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ad3ff82ec3..1892f58b9e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -30,6 +30,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t streamMetaId = 0; +int32_t taskBackendWrapperId = 0; static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); @@ -52,6 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); + taskBackendWrapperId = taosOpenRef(64, taskBackendDestroy); streamMetaId = taosOpenRef(64, streamMetaCloseImpl); @@ -220,19 +222,22 @@ int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) { return 0; } -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) { +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) { taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { taskBackendAddRef(*ppBackend); + *ref = ((STaskBackendWrapper*)*ppBackend)->refId; taosThreadMutexUnlock(&pMeta->backendMutex); return *ppBackend; } + void* pBackend = taskBackendOpen(pMeta->path, key); if (pBackend == NULL) { taosThreadMutexUnlock(&pMeta->backendMutex); return NULL; } + *ref = taosAddRef(taskBackendWrapperId, pBackend); taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); return pBackend; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e01f87788f..152b426628 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -354,7 +354,11 @@ void tFreeStreamTask(SStreamTask* pTask) { taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem); pTask->pUpstreamInfoList = NULL; } - + if (pTask->pBackend) { + taskBackendRemoveRef(pTask->pBackend); + + pTask->pBackend = NULL; + } taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask);