diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e09ffb416c..553e8f8217 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -383,6 +383,7 @@ struct SStreamTask { struct SStreamMeta* pMeta; SSHashObj* pNameMap; void* pBackend; + int64_t backendRefId; char reserve[256]; }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e136b9b265..12b89dbd9c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -736,12 +736,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); + if (code != TSDB_CODE_SUCCESS) return code; pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr); - - if (code != TSDB_CODE_SUCCESS) { - return code; - } + if (pTask->pBackend == NULL) return -1; streamTaskOpenAllUpstreamInput(pTask); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 3e07f2532e..a5533ef2f2 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -65,6 +65,8 @@ typedef struct { rocksdb_compactionfilterfactory_t* filterFactory; TdThreadMutex mutex; + int64_t refId; + } STaskBackendWrapper; void* streamBackendInit(const char* path, int64_t chkpId); @@ -75,7 +77,9 @@ int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointI SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); -STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key); +STaskBackendWrapper* taskBackendOpen(char* path, char* key); + +void taskBackendAddRef(void* pTaskBackend); int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index fcdb945aae..eed65d3f3a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -629,7 +629,7 @@ void streamBackendHandleCleanup(void* arg) { taosThreadRwlockDestroy(&wrapper->rwLock); wrapper->rocksdb = NULL; - taosReleaseRef(streamBackendId, wrapper->backendId); + // taosReleaseRef(streamBackendId, wrapper->backendId); qDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); taosMemoryFree(wrapper); @@ -776,7 +776,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* int64_t id = *(int64_t*)pIter; SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); - if (wrapper == NULL) continue; + if (wrapper == NULL) { + pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + continue; + } taosThreadRwlockRdlock(&wrapper->rwLock); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -788,7 +791,6 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* taosThreadRwlockUnlock(&wrapper->rwLock); taosArrayPush(refs, &id); - pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); } int32_t nCf = taosArrayGetSize(pHandle); @@ -1452,35 +1454,72 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { taosMemoryFree(inst); } -STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { - int32_t code = 0; - - char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); - sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); - if (!taosDirExist(taskPath)) { - code = taosMkDir(taskPath); - if (code != 0) { - qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); - taosMemoryFree(taskPath); - return NULL; +int32_t getCfIdx(const char* cfName) { + int idx = -1; + size_t len = strlen(cfName); + for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { + if (len == ginitDict[i].len && strncmp(cfName, ginitDict[i].key, strlen(cfName)) == 0) { + idx = i; + break; } } - char* err = NULL; + return idx; +} +int32_t taskBackendOpenCfs(STaskBackendWrapper* pTask, char* path, char** pCfNames, int32_t nCf) { + int32_t code = -1; + char* err = NULL; - STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(SBackendWrapper)); - rocksdb_env_t* env = rocksdb_create_default_env(); + rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + + for (int i = 0; i < nCf; i++) { + int32_t idx = getCfIdx(pCfNames[i]); + cfOpts[i] = pTask->pCfOpts[idx]; + } + + rocksdb_t* db = rocksdb_open_column_families(pTask->dbOpt, path, nCf, (const char* const*)pCfNames, + (const rocksdb_options_t* const*)cfOpts, cfHandle, &err); + + if (err != NULL) { + qError("failed to open cf path: %s", err); + taosMemoryFree(err); + goto _EXIT; + } + + for (int i = 0; i < nCf; i++) { + int32_t idx = getCfIdx(pCfNames[i]); + pTask->pCf[idx] = cfHandle[i]; + } + + pTask->db = db; + code = 0; + +_EXIT: + taosMemoryFree(cfOpts); + taosMemoryFree(cfHandle); + return code; +} +void taskBackendAddRef(void* pTaskBackend) { + STaskBackendWrapper* pBackend = pTaskBackend; + taosAcquireRef(streamBackendCfWrapperId, pBackend->refId); + return; +} +void taskBackendDestroy(STaskBackendWrapper* wrapper); + +void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) { + rocksdb_env_t* env = rocksdb_create_default_env(); rocksdb_cache_t* cache = rocksdb_cache_create_lru(256); rocksdb_options_t* opts = rocksdb_options_create(); - // rocksdb_options_set_env(opts, env); + rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); // rocksdb_options_set_max_total_wal_size(opts, dbMemLimit); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 3); rocksdb_options_set_info_log_level(opts, 1); - // rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit); - // rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2); + rocksdb_options_set_db_write_buffer_size(opts, 64 << 20); + rocksdb_options_set_write_buffer_size(opts, 32 << 20); rocksdb_options_set_atomic_flush(opts, 1); pTaskBackend->dbOpt = opts; @@ -1519,46 +1558,86 @@ STaskBackendWrapper* streamStateOpenTaskBackend(char* path, char* key) { pTaskBackend->pCfOpts[i] = opt; pTaskBackend->pCfParams[i].tableOpt = tableOpt; } + return; +} +int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) { + int32_t code = 0; + char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); + sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key); + if (!taosDirExist(taskPath)) { + code = taosMkDir(taskPath); + if (code != 0) { + qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); + taosMemoryFree(taskPath); + return code; + } + } + *fullPath = taskPath; + return 0; +} +STaskBackendWrapper* taskBackendOpen(char* path, char* key) { + char* taskPath = NULL; + char* err = NULL; - char** cfs = rocksdb_list_column_families(opts, taskPath, &nCf, &err); + int32_t code = taskBackendBuildFullPath(path, key, &taskPath); + if (code != 0) return NULL; + + size_t nCf = 0; + + STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); + taskBackendInitOpt(pTaskBackend); + + char** cfs = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); - pTaskBackend->db = rocksdb_open(opts, taskPath, &err); + pTaskBackend->db = rocksdb_open(pTaskBackend->dbOpt, taskPath, &err); if (err != NULL) { qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err); taosMemoryFreeClear(err); + code = -1; + goto _EXIT; } } else { + code = taskBackendOpenCfs(pTaskBackend, taskPath, cfs, nCf); + if (code != 0) goto _EXIT; } - if (cfs != NULL) { - rocksdb_list_column_families_destroy(cfs, nCf); - } + if (cfs != NULL) rocksdb_list_column_families_destroy(cfs, nCf); taosThreadMutexInit(&pTaskBackend->mutex, NULL); taosMemoryFree(taskPath); qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend); + + pTaskBackend->refId = taosAddRef(streamBackendCfWrapperId, pTaskBackend); + return pTaskBackend; +_EXIT: + + taskBackendDestroy(pTaskBackend); + return NULL; } -void streamStateCloseTaskBackend(STaskBackendWrapper* wrapper) { +void taskBackendDestroy(STaskBackendWrapper* wrapper) { if (wrapper == NULL) return; - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - rocksdb_flushoptions_set_wait(flushOpt, 1); + if (wrapper->db && wrapper->pCf) { + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); - int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); - char* err = NULL; - for (int i = 0; i < nCf; i++) { - if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); - if (err != NULL) { - qError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err); - taosMemoryFreeClear(err); + int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); + char* err = NULL; + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) rocksdb_flush_cf(wrapper->db, flushOpt, wrapper->pCf[i], &err); + if (err != NULL) { + qError("failed to flush cf:%s, reason:%s", ginitDict[i].key, err); + taosMemoryFreeClear(err); + } } rocksdb_flushoptions_destroy(flushOpt); - } - for (int i = 0; i < nCf; i++) { - if (wrapper->pCf[i] != NULL) { - rocksdb_column_family_handle_destroy(wrapper->pCf[i]); + + for (int i = 0; i < nCf; i++) { + if (wrapper->pCf[i] != NULL) { + rocksdb_column_family_handle_destroy(wrapper->pCf[i]); + } } } rocksdb_options_destroy(wrapper->dbOpt); @@ -1574,24 +1653,13 @@ void streamStateCloseTaskBackend(STaskBackendWrapper* wrapper) { taosThreadMutexDestroy(&wrapper->mutex); - rocksdb_close(wrapper->db); + if (wrapper->db) rocksdb_close(wrapper->db); taosMemoryFree(wrapper); return; } -int8_t getCfIdx(const char* key) { - int idx = -1; - size_t len = strlen(key); - for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { - if (len == ginitDict[i].len && strncmp(key, ginitDict[i].key, strlen(key)) == 0) { - idx = i; - break; - } - } - return idx; -} -int32_t taskBackendOpenCf(STaskBackendWrapper* pBackend, const char* key) { +int32_t taskBackendOpenCfByKey(STaskBackendWrapper* pBackend, const char* key) { int32_t code = 0; char* err = NULL; int8_t idx = getCfIdx(key); @@ -1658,14 +1726,14 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) { int32_t code = 0; - STaskBackendWrapper* pTaskBackend = streamStateOpenTaskBackend(path, key); + STaskBackendWrapper* pTaskBackend = taskBackendOpen(path, key); RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { rocksdb_column_family_handle_t* pSrcCf = pSrcBackend->pHandle[i]; if (pSrcCf == NULL) continue; - code = taskBackendOpenCf(pTaskBackend, ginitDict[i].key); + code = taskBackendOpenCfByKey(pTaskBackend, ginitDict[i].key); if (code != 0) goto _EXIT; code = copyDataAt(pSrcBackend, pTaskBackend, i); @@ -1673,7 +1741,7 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) { } _EXIT: - streamStateCloseTaskBackend(pTaskBackend); + taskBackendDestroy(pTaskBackend); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 99b3b9cca1..2da4b2f370 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -224,11 +224,12 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) { taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { - // add ref later + taskBackendAddRef(*ppBackend); taosThreadMutexUnlock(&pMeta->backendMutex); return *ppBackend; } - void* pBackend = streamStateOpenTaskBackend(pMeta->path, key); + void* pBackend = taskBackendOpen(pMeta->path, key); + taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexLock(&pMeta->backendMutex); return pBackend; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index f51181b0b4..d4ac1838d0 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -106,20 +106,12 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } SStreamTask* pStreamTask = pTask; - // char statePath[1024]; - // if (!specPath) { - // sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId); - // } else { - // memset(statePath, 0, 1024); - // tstrncpy(statePath, path, 1024); - // } - pState->taskId = pStreamTask->id.taskId; pState->streamId = pStreamTask->id.streamId; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); #ifdef USE_ROCKSDB - // SStreamMeta* pMeta = pStreamTask->pMeta; + SStreamMeta* pMeta = pStreamTask->pMeta; // pState->streamBackendRid = pMeta->streamBackendRid; // taosWLockLatch(&pMeta->lock); // taosThreadMutexLock(&pMeta->backendMutex); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9a76f661f8..e01f87788f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -388,7 +388,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); - //pTask->pBackend = streamStateOpenTaskBackend(pMeta->path, (char*)pTask->id.idStr); + // pTask->pBackend = taskBackendOpen(pMeta->path, (char*)pTask->id.idStr); return TSDB_CODE_SUCCESS; }