diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 59986a3b3c..fdb9f102f0 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -274,7 +274,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) IF (TD_LINUX) - option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) + option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) ELSE() option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) ENDIF() diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index e14c4e60d9..07fc2fd71b 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -42,27 +42,27 @@ IF (TD_LINUX) ) target_link_libraries(tmq - taos_static + taos ) target_link_libraries(stream_demo - taos_static + taos ) target_link_libraries(schemaless - taos_static + taos ) target_link_libraries(prepare - taos_static + taos ) target_link_libraries(demo - taos_static + taos ) target_link_libraries(asyncdemo - taos_static + taos ) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e015f4182e..c92ce254a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,6 +163,7 @@ typedef struct { int64_t checkPointId; int32_t taskId; int64_t streamId; + int64_t streamBackendRid; } SStreamState; typedef struct SFunctionStateStore { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 51f2de481d..73c88fae8d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -345,7 +345,6 @@ typedef struct SStreamMeta { SRWLatch lock; int32_t walScanCounter; void* streamBackend; - int32_t streamBackendId; int64_t streamBackendRid; SHashObj* pTaskBackendUnique; } SStreamMeta; diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 18cd0d9cc6..683d10e926 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -22,21 +22,20 @@ extern "C" { // If the error is in a third-party library, place this header file under the third-party library header file. // When you want to use this feature, you should find or add the same function in the following sectio -// #if !defined(WINDOWS) +#if !defined(WINDOWS) -// #ifndef ALLOW_FORBID_FUNC -// #define malloc MALLOC_FUNC_TAOS_FORBID -// #define calloc CALLOC_FUNC_TAOS_FORBID -// #define realloc REALLOC_FUNC_TAOS_FORBID -// #define free FREE_FUNC_TAOS_FORBID -// #ifdef strdup -// #undef strdup -// #define strdup STRDUP_FUNC_TAOS_FORBID -// #endif -// #endif // ifndef ALLOW_FORBID_FUNC -// #endif // if !defined(WINDOWS) +#ifndef ALLOW_FORBID_FUNC +#define malloc MALLOC_FUNC_TAOS_FORBID +#define calloc CALLOC_FUNC_TAOS_FORBID +#define realloc REALLOC_FUNC_TAOS_FORBID +#define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup +#define strdup STRDUP_FUNC_TAOS_FORBID +#endif +#endif // ifndef ALLOW_FORBID_FUNC +#endif // if !defined(WINDOWS) -// // #define taosMemoryFree malloc // #define taosMemoryMalloc malloc // #define taosMemoryCalloc calloc // #define taosMemoryRealloc realloc diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index b18cb8e282..b7bfc57cd5 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -103,7 +103,7 @@ target_link_libraries( # PUBLIC bdb # PUBLIC scalar - PUBLIC rocksdb-shared + PUBLIC rocksdb PUBLIC transport PUBLIC stream PUBLIC index diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index fa6c709c8f..d1ef7fe3c1 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -11,7 +11,7 @@ if(${BUILD_WITH_ROCKSDB}) IF (TD_LINUX) target_link_libraries( stream - PUBLIC rocksdb-shared tdb + PUBLIC rocksdb tdb PRIVATE os util transport qcom executor wal index ) ELSE() diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 2c1956998a..c7ee308b61 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv; int32_t streamDispatchStreamBlock(SStreamTask* pTask); SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); -void destroyStreamDataBlock(SStreamDataBlock* pBlock); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, + SArray* pRes); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); @@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +extern int32_t streamBackendId; + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b3995f020b..df045eef20 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -16,7 +16,9 @@ #include "streamBackendRocksdb.h" #include "executor.h" #include "query.h" +#include "streamInc.h" #include "tcommon.h" +#include "tref.h" typedef struct SCompactFilteFactory { void* status; @@ -79,8 +81,8 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { - qDebug("init stream backend"); - SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); + qDebug("start to init stream backend at %s", path); + SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL); @@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) { if (err != NULL) { qError("failed to open rocksdb, path:%s, reason:%s", path, err); taosMemoryFreeClear(err); + goto _EXIT; } } else { /* @@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } + qDebug("succ to init stream backend at %s, backend:%p", path, pHandle); return (void*)pHandle; _EXIT: @@ -140,7 +144,8 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); - free(pHandle); + taosMemoryFree(pHandle); + qDebug("failed to init stream backend at %s", path); return NULL; } void streamBackendCleanup(void* arg) { @@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) { rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); - taosThreadMutexDestroy(&pHandle->mutex); SListNode* head = tdListPopHead(pHandle->list); while (head != NULL) { streamStateDestroyCompar(head->data); taosMemoryFree(head); head = tdListPopHead(pHandle->list); } - // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); + tdListFree(pHandle->list); + taosThreadMutexDestroy(&pHandle->mutex); + taosThreadMutexDestroy(&pHandle->cfMutex); taosMemoryFree(pHandle); - + qDebug("destroy stream backend backend:%p", pHandle); return; } SListNode* streamBackendAddCompare(void* backend, void* arg) { @@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); + taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendHandle* handle = backend; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); @@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); - qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); return 0; } @@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; - qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, - pState->taskId); + qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle, + pState->streamId, pState->taskId); if (pState->pTdbState->rocksdb == NULL) { return; } @@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadRwlockDestroy(&pState->pTdbState->rwLock); pState->pTdbState->rocksdb = NULL; + taosReleaseRef(streamBackendId, pState->streamBackendRid); } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8c26052fdb..ed9f99cf78 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -20,7 +20,7 @@ #include "ttimer.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; -static int32_t streamBackendId = 0; +int32_t streamBackendId = 0; static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } @@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - pMeta->streamBackendId = streamBackendId; memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "state"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 71a21ac150..967c7733c9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } SStreamTask* pStreamTask = pTask; - char statePath[1024]; + char statePath[1024]; if (!specPath) { sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); } else { @@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; - taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid); + pState->streamBackendRid = pMeta->streamBackendRid; int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { - taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid); + taosReleaseRef(streamBackendId, pMeta->streamBackendRid); taosMemoryFree(pState); pState = NULL; } @@ -222,9 +222,7 @@ _err: void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; #ifdef USE_ROCKSDB - // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; @@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* } int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index bc84509728..bfaeca89f6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; maxCheckPointId = atol((char*)buf); taosMemoryFree(val); @@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; taosMemoryFree(val); diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 87b0d11d1c..71dfd710a5 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,35 +9,35 @@ add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) target_link_libraries( tmq_offset - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( create_table - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_demo - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_sim - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_taosx_ci - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -45,7 +45,7 @@ target_link_libraries( target_link_libraries( write_raw_block_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -53,7 +53,7 @@ target_link_libraries( target_link_libraries( sml_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -61,7 +61,7 @@ target_link_libraries( target_link_libraries( get_db_name_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os