From c54e4aeaadc69e0104ce31d091826653a70dc081 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 23 Feb 2024 09:30:53 +0000 Subject: [PATCH 01/12] add test case --- source/libs/index/src/indexComm.c | 11 ++- source/libs/stream/inc/streamBackendRocksdb.h | 7 +- source/libs/stream/src/streamMeta.c | 28 +++--- source/libs/stream/test/CMakeLists.txt | 94 ++++++++++++++----- source/libs/stream/test/backendTest.cpp | 38 ++++++++ source/libs/stream/test/tstreamUpdateTest.cpp | 13 ++- 6 files changed, 137 insertions(+), 54 deletions(-) create mode 100644 source/libs/stream/test/backendTest.cpp diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 1313221952..b7b9f1cc9f 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -76,8 +76,8 @@ char* idxInt2str(int64_t val, char* dst, int radix) { return dst - 1; } __compar_fn_t idxGetCompar(int8_t type) { - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || - type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_GEOMETRY) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR || + type == TSDB_DATA_TYPE_GEOMETRY) { return (__compar_fn_t)strcmp; } return getComparFunc(type, 0); @@ -108,8 +108,8 @@ static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) { return tCompare(func, QUERY_TERM, a, b, type); } TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { - if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || - dtype == TSDB_DATA_TYPE_VARBINARY || dtype == TSDB_DATA_TYPE_GEOMETRY) { + if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY || + dtype == TSDB_DATA_TYPE_GEOMETRY) { return tDoCompare(func, cmptype, a, b); } #if 1 @@ -290,6 +290,7 @@ int idxUidCompare(const void* a, const void* b) { uint64_t r = *(uint64_t*)b; return l - r; } +#ifdef BUILD_NO_CALL int32_t idxConvertData(void* src, int8_t type, void** dst) { int tlen = -1; switch (type) { @@ -372,6 +373,8 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) { // indexMayFillNumbericData(*dst, tlen); return tlen; } +#endif + int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { if (src == NULL) { *dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR)); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 03f70604b7..1f8e99cd27 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -17,7 +17,7 @@ #define _STREAM_BACKEDN_ROCKSDB_H_ #include "rocksdb/c.h" -//#include "streamInt.h" +// #include "streamInt.h" #include "streamState.h" #include "tcommon.h" @@ -244,11 +244,6 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); -// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); - -// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); -// void taskDbDestroy(void* pDb, bool flush); - int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); SBkdMgt* bkdMgtCreate(char* path); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b35f401cb9..a09b940a19 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -70,7 +70,7 @@ static void streamMetaEnvInit() { streamTimerInit(); } -void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);} +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { taosCloseRef(streamBackendId); @@ -1104,14 +1104,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), }; - entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); + entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0; + entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; entry.checkpointId = (*pTask)->chkInfo.checkpointingId; entry.chkpointTransId = (*pTask)->chkInfo.transId; @@ -1172,7 +1172,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } - _end: +_end: streamMetaClearHbMsg(&hbMsg); return TSDB_CODE_SUCCESS; } @@ -1304,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-rlock", pMeta->vgId); + // stTrace("vgId:%d meta-rlock", pMeta->vgId); taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-runlock", pMeta->vgId); + // stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { -// stTrace("vgId:%d meta-runlock completed", pMeta->vgId); + // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); } } void streamMetaWLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wlock", pMeta->vgId); + // stTrace("vgId:%d meta-wlock", pMeta->vgId); taosThreadRwlockWrlock(&pMeta->lock); -// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); + // stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wunlock", pMeta->vgId); + // stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosThreadRwlockUnlock(&pMeta->lock); } @@ -1395,7 +1395,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) pMeta->sendMsgBeforeClosing = true; } - pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; + pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER; streamMetaWUnLock(pMeta); if (isLeader) { @@ -1531,8 +1531,8 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); - for(int32_t i = 0; i < num; ++i) { - STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + for (int32_t i = 0; i < num; ++i) { + STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); if (ppTask == NULL) { continue; @@ -1633,7 +1633,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 - ", readyTs:%" PRId64 " total elapsed time:%.2fs", + ", readyTs:%" PRId64 " total elapsed time:%.2fs", pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index c90e05bcf6..c18372a493 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -1,40 +1,82 @@ -MESSAGE(STATUS "build stream unit test") - -# GoogleTest requires at least C++11 -SET(CMAKE_CXX_STANDARD 11) -AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) # bloomFilterTest -ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") -TARGET_LINK_LIBRARIES(streamUpdateTest - PUBLIC os util common gtest gtest_main stream executor index - ) +#TARGET_LINK_LIBRARIES(streamUpdateTest + #PUBLIC os util common gtest gtest_main stream executor index + #) -TARGET_INCLUDE_DIRECTORIES( - streamUpdateTest +#TARGET_INCLUDE_DIRECTORIES( + #streamUpdateTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) +#TARGET_LINK_LIBRARIES( + #checkpointTest + #PUBLIC os common gtest stream executor qcom index transport util +#) + +#TARGET_INCLUDE_DIRECTORIES( + #checkpointTest + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#add_executable(backendTest "") + +#target_sources(backendTest + #PRIVATE + #"backendTest.cpp" +#) + +#TARGET_LINK_LIBRARIES( + #backendTest + #PUBLIC rocksdb + #PUBLIC os common gtest stream executor qcom index transport util +#) + +#TARGET_INCLUDE_DIRECTORIES( + #backendTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) + +#add_test( + #NAME streamUpdateTest + #COMMAND streamUpdateTest +#) + +#add_test( + #NAME checkpointTest + #COMMAND checkpointTest +#) +#add_test( + #NAME backendTest + #COMMAND backendTest +#) + + +add_executable(backendTest "") + +target_sources(backendTest + PRIVATE + "backendTest.cpp" +) + +target_include_directories( + backendTest PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" ) -ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) -TARGET_LINK_LIBRARIES( - checkpointTest +target_link_libraries( + backendTest + PUBLIC rocksdb PUBLIC os common gtest stream executor qcom index transport util ) -TARGET_INCLUDE_DIRECTORIES( - checkpointTest - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" -) - add_test( - NAME streamUpdateTest - COMMAND streamUpdateTest -) - -add_test( - NAME checkpointTest - COMMAND checkpointTest + NAME backendTest + COMMAND backendTest ) \ No newline at end of file diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp new file mode 100644 index 0000000000..5051337aee --- /dev/null +++ b/source/libs/stream/test/backendTest.cpp @@ -0,0 +1,38 @@ +#include + +#include +#include +#include +#include "streamBackendRocksdb.h" + +class BackendEnv : public ::testing::Test { + protected: + virtual void SetUp() {} + virtual void TearDown() {} +}; + +void *backendCreate() { + const char *streamPath = "/tmp"; + + char *absPath = NULL; + void *p = NULL; + // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); + // p = taskDbOpen((char *)streamPath, (char *)"test", -1); + // p = bkdMgtCreate((char *)streamPath); + + ASSERT(p != NULL); + return p; +} +void backendOpen() { + void *p = backendCreate(); + ASSERT(p != NULL); +} + +TEST_F(BackendEnv, checkOpen) { backendOpen(); } +TEST_F(BackendEnv, backendOpt) {} +TEST_F(BackendEnv, backendDestroy) {} + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 1b999e5fb0..59171876ff 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -14,10 +14,7 @@ class StreamStateEnv : public ::testing::Test { streamMetaInit(); backend = streamBackendInit(path, 0, 0); } - virtual void TearDown() { - streamMetaCleanup(); - // indexClose(index); - } + virtual void TearDown() { streamMetaCleanup(); } const char *path = TD_TMP_DIR_PATH "stream"; void *backend; @@ -50,6 +47,14 @@ bool equalSBF(SScalableBf *left, SScalableBf *right) { } TEST(TD_STREAM_UPDATE_TEST, update) { + const char *streamPath = "/tmp"; + + char *absPath = NULL; + void *p = NULL; + // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); + // p = taskDbOpen((char *)streamPath, (char *)"test", -1); + p = bkdMgtCreate((char *)streamPath); + // const int64_t interval = 20 * 1000; // const int64_t watermark = 10 * 60 * 1000; // SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); From e41e10bf35534fb6b6804f8f79ca356f31fc6c3e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 26 Feb 2024 11:51:46 +0000 Subject: [PATCH 02/12] add stream backend test --- source/libs/stream/inc/streamBackendRocksdb.h | 11 +++- source/libs/stream/test/CMakeLists.txt | 60 +++++++++++++------ source/libs/stream/test/backendTest.cpp | 30 ++++++++-- source/libs/stream/test/checkpointTest.cpp | 28 ++++----- 4 files changed, 89 insertions(+), 40 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1f8e99cd27..6e6f85d8f9 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -21,6 +21,10 @@ #include "streamState.h" #include "tcommon.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct SCfComparator { rocksdb_comparator_t** comp; int32_t numOfComp; @@ -253,4 +257,9 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); -#endif \ No newline at end of file + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index c18372a493..c472207b27 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -57,26 +57,48 @@ #) -add_executable(backendTest "") +#add_executable(backendTest "") -target_sources(backendTest - PRIVATE - "backendTest.cpp" -) +#target_sources(backendTest + #PUBLIC + #"backendTest.cpp" +#) -target_include_directories( - backendTest - PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" - PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" -) +#target_include_directories( + #backendTest + #PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + #PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +#) -target_link_libraries( - backendTest - PUBLIC rocksdb - PUBLIC os common gtest stream executor qcom index transport util -) +#target_link_libraries( + #backendTest + #PUBLIC rocksdb + #PUBLIC os common gtest stream executor qcom index transport util +#) -add_test( - NAME backendTest - COMMAND backendTest -) \ No newline at end of file + +MESSAGE(STATUS "build parser unit test") + +IF(NOT TD_DARWIN) + # GoogleTest requires at least C++11 + SET(CMAKE_CXX_STANDARD 11) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + + ADD_EXECUTABLE(backendTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES( + backendTest + PUBLIC rocksdb + PUBLIC os common gtest stream executor qcom index transport util vnode + ) + + TARGET_INCLUDE_DIRECTORIES( + backendTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" + ) + + ADD_TEST( + NAME backendTest + COMMAND backendTest + ) +ENDIF () \ No newline at end of file diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 5051337aee..d70f16b461 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -4,6 +4,11 @@ #include #include #include "streamBackendRocksdb.h" +#include "streamSnapshot.h" +#include "streamState.h" +#include "tstream.h" +#include "tstreamFileState.h" +#include "tstreamUpdate.h" class BackendEnv : public ::testing::Test { protected: @@ -13,22 +18,35 @@ class BackendEnv : public ::testing::Test { void *backendCreate() { const char *streamPath = "/tmp"; + void *p = NULL; - char *absPath = NULL; - void *p = NULL; - // SBackendWrapper *p = streamBackendInit(streamPath, -1, 2); - // p = taskDbOpen((char *)streamPath, (char *)"test", -1); - // p = bkdMgtCreate((char *)streamPath); + // char *absPath = NULL; + // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); + // STaskDbWrapper *p = taskDbOpen((char *)streamPath, (char *)"stream-backend", -1); + // ASSERT(p != NULL); + return p; +} +SStreamState *stateCreate(void *pBackend, char *keyidr) { + const char *streamPath = "/tmp"; + SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask)); + pTask->ver = 1024; + pTask->id.streamId = 1023; + pTask->id.taskId = 1111111; + + SStreamState *p = streamStateOpen((char *)streamPath, pTask, true, 32, 32 * 1024); ASSERT(p != NULL); return p; } void backendOpen() { void *p = backendCreate(); ASSERT(p != NULL); + taskDbDestroy(p, true); } -TEST_F(BackendEnv, checkOpen) { backendOpen(); } +TEST_F(BackendEnv, checkOpen) { + backendOpen(); +} TEST_F(BackendEnv, backendOpt) {} TEST_F(BackendEnv, backendDestroy) {} diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 0dc2cc13f5..eb7f38744f 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -25,24 +25,24 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" +#include "cos.h" #include "rsync.h" #include "streamInt.h" -#include "cos.h" -int main(int argc, char **argv) { - testing::InitGoogleTest(&argc, argv); +// int main(int argc, char **argv) { +// testing::InitGoogleTest(&argc, argv); - if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { - printf("error"); - } - if (s3Init() < 0) { - return -1; - } - strcpy(tsSnodeAddress, "127.0.0.1"); - int ret = RUN_ALL_TESTS(); - s3CleanUp(); - return ret; -} +// if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { +// printf("error"); +// } +// if (s3Init() < 0) { +// return -1; +// } +// strcpy(tsSnodeAddress, "127.0.0.1"); +// int ret = RUN_ALL_TESTS(); +// s3CleanUp(); +// return ret; +// } TEST(testCase, checkpointUpload_Test) { stopRsync(); From 8d36445bb58b89e8c219fe8a844be7c9997d7c96 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Feb 2024 05:29:58 +0000 Subject: [PATCH 03/12] add stream backend test --- source/libs/stream/test/backendTest.cpp | 53 +++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index d70f16b461..df0ab9fe7e 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "streamBackendRocksdb.h" #include "streamSnapshot.h" #include "streamState.h" @@ -27,26 +28,62 @@ void *backendCreate() { return p; } -SStreamState *stateCreate(void *pBackend, char *keyidr) { - const char *streamPath = "/tmp"; +SStreamState *stateCreate(const char *path) { SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask)); pTask->ver = 1024; pTask->id.streamId = 1023; pTask->id.taskId = 1111111; + SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); + pTask->pMeta = pMeta; - SStreamState *p = streamStateOpen((char *)streamPath, pTask, true, 32, 32 * 1024); + SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); ASSERT(p != NULL); return p; } void backendOpen() { - void *p = backendCreate(); + const char *path = "/tmp/backend"; + SStreamState *p = stateCreate(path); ASSERT(p != NULL); - taskDbDestroy(p, true); + + // write bacth + // default/state/fill/sess/func/parname/partag + int32_t size = 100; + std::vector tsArray; + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + const char *val = "value data"; + int32_t vlen = strlen(val); + streamStatePut_rocksdb(p, &key, (char *)val, vlen); + + tsArray.push_back(ts); + } + for (int i = 0; i < size; i++) { + int64_t ts = tsArray[i]; + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + + const char *val = "value data"; + int32_t len = 0; + char *newVal = NULL; + streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(len == strlen(val)); + } + int64_t ts = tsArray[0]; + SWinKey key = {.groupId = (uint64_t)(0), .ts = ts}; + streamStateDel_rocksdb(p, &key); + + + + // read + // iterator + // rebuild chkp, reload from chkp + // sync + // + streamStateClose((SStreamState *)p, true); + // taskDbDestroy(p, true); } -TEST_F(BackendEnv, checkOpen) { - backendOpen(); -} +TEST_F(BackendEnv, checkOpen) { backendOpen(); } TEST_F(BackendEnv, backendOpt) {} TEST_F(BackendEnv, backendDestroy) {} From 6b6f5aa5d1d8d132ff7635b091926ff395b0f0d2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Feb 2024 05:51:32 +0000 Subject: [PATCH 04/12] add test case --- source/libs/stream/test/backendTest.cpp | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index df0ab9fe7e..1ccc0b4285 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -72,7 +72,18 @@ void backendOpen() { SWinKey key = {.groupId = (uint64_t)(0), .ts = ts}; streamStateDel_rocksdb(p, &key); - + streamStateClear_rocksdb(p); + + for (int i = 0; i < size; i++) { + int64_t ts = tsArray[i]; + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + const char *val = "value data"; + int32_t len = 0; + char *newVal = NULL; + int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(code != 0); + } + tsArray.clear(); // read // iterator From b3c18c31da19f3cb5b72001351eae62273067f57 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Feb 2024 12:19:37 +0000 Subject: [PATCH 05/12] add test case to stream backend --- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 12 +- source/libs/stream/test/backendTest.cpp | 309 +++++++++++++++++- 3 files changed, 305 insertions(+), 17 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6e6f85d8f9..1dc1db8e9c 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -258,6 +258,7 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); +uint32_t nextPow2(uint32_t x); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f173157da6..910fd93989 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2788,7 +2788,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } -#ifdef BUILD_NO_CALL SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { stDebug("streamStateGetCur_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -2838,7 +2837,6 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { STREAM_STATE_DEL_ROCKSDB(pState, "func", key); return 0; } -#endif // session cf int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { @@ -3432,7 +3430,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* SSessionKey tmpKey = *key; int32_t valSize = *pVLen; void* tmp = taosMemoryMalloc(valSize); - // tdbRealloc(NULL, valSize); if (!tmp) { return -1; } @@ -3506,13 +3503,11 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } -#ifdef BUILD_NO_CALL int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); return code; } -#endif int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { int code = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); @@ -3535,10 +3530,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co if (pIter == NULL) { return -1; } - + size_t klen = 0; rocksdb_iter_seek(pIter, start, strlen(start)); while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); + const char* key = rocksdb_iter_key(pIter, &klen); int32_t vlen = 0; const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen); char* val = NULL; @@ -3700,6 +3695,8 @@ uint32_t nextPow2(uint32_t x) { x = x | (x >> 16); return x + 1; } + +#ifdef BUILD_NO_CALL int32_t copyFiles(const char* src, const char* dst) { int32_t code = 0; // opt later, just hard link @@ -3739,6 +3736,7 @@ _err: taosCloseDir(&pDir); return code >= 0 ? 0 : -1; } +#endif int32_t isBkdDataMeta(char* name, int32_t len) { const char* pCurrent = "CURRENT"; diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 1ccc0b4285..83adaeb21c 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -40,7 +40,8 @@ SStreamState *stateCreate(const char *path) { ASSERT(p != NULL); return p; } -void backendOpen() { +void *backendOpen() { + streamMetaInit(); const char *path = "/tmp/backend"; SStreamState *p = stateCreate(path); ASSERT(p != NULL); @@ -85,18 +86,306 @@ void backendOpen() { } tsArray.clear(); - // read - // iterator - // rebuild chkp, reload from chkp - // sync - // + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + tsArray.push_back(ts); + + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + const char *val = "value data"; + int32_t vlen = strlen(val); + streamStatePut_rocksdb(p, &key, (char *)val, vlen); + } + + SWinKey winkey; + int32_t code = streamStateGetFirst_rocksdb(p, &key); + ASSERT(code == 0); + ASSERT(key.ts == tsArray[0]); + + SStreamStateCur *pCurr = streamStateSeekToLast_rocksdb(p); + ASSERT(pCurr != NULL); + streamStateFreeCur(pCurr); + + winkey.groupId = 0; + winkey.ts = tsArray[0]; + char *val = NULL; + int32_t len = 0; + + pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); + ASSERT(pCurr != NULL); + + streamStateFreeCur(pCurr); + + tsArray.clear(); + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + tsArray.push_back(ts); + STupleKey key = {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i}; + + const char *val = "Value"; + int32_t len = strlen(val); + streamStateFuncPut_rocksdb(p, &key, val, len); + } + for (int i = 0; i < size; i++) { + STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + char *val = NULL; + int32_t len = 0; + streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); + ASSERT(len == strlen("Value")); + } + for (int i = 0; i < size; i++) { + STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + char *val = NULL; + int32_t len = 0; + streamStateFuncDel_rocksdb(p, &key); + } + + // session put + tsArray.clear(); + + for (int i = 0; i < size; i++) { + SSessionKey key = {.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)}; + tsArray.push_back(i); + + const char *val = "Value"; + int32_t len = strlen(val); + streamStateSessionPut_rocksdb(p, &key, val, len); + + char *pval = NULL; + ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); + ASSERT(strncmp(pval, val, len) == 0); + } + + for (int i = 0; i < size; i++) { + SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + + const char *val = "Value"; + int32_t len = strlen(val); + + char *pval = NULL; + ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); + ASSERT(strncmp(pval, val, len) == 0); + taosMemoryFreeClear(pval); + } + + pCurr = streamStateSessionSeekToLast_rocksdb(p, 0); + ASSERT(pCurr != NULL); + + { + SSessionKey key; + char *val = NULL; + int32_t vlen = 0; + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key); + + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + + ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]); + + pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key); + code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); + ASSERT(code == 0); + ASSERT(vlen == strlen("Value")); + ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]); + + ASSERT(0 == streamStateSessionAddIfNotExist_rocksdb(p, &key, 10, (void **)&val, &len)); + + ASSERT(0 == + streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len)); + } + for (int i = 0; i < size; i++) { + SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + + const char *val = "Value"; + int32_t len = strlen(val); + + char *pval = NULL; + ASSERT(0 == streamStateSessionDel_rocksdb(p, &key)); + } + + for (int i = 0; i < size; i++) { + SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + const char *val = "Value"; + int32_t vlen = strlen(val); + ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0); + } + for (int i = 0; i < size; i++) { + SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + char *val = NULL; + int32_t vlen = 0; + ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); + taosMemoryFreeClear(val); + } + { + SWinKey key = {.groupId = (uint64_t)(0), .ts = tsArray[0]}; + SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); + ASSERT(pCurr != NULL); + + char *val = NULL; + int32_t vlen = 0; + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + ASSERT(vlen == strlen("Value")); + streamStateFreeCur(pCurr); + + pCurr = streamStateFillSeekKeyNext_rocksdb(p, &key); + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + ASSERT(vlen == strlen("Value") && key.groupId == 1 && key.ts == tsArray[1]); + + key.groupId = 1; + key.ts = tsArray[1]; + + pCurr = streamStateFillSeekKeyPrev_rocksdb(p, &key); + ASSERT(pCurr != NULL); + ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); + + ASSERT(vlen == strlen("Value") && key.groupId == 0 && key.ts == tsArray[0]); + } + + for (int i = 0; i < size - 1; i++) { + SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + char *val = NULL; + int32_t vlen = 0; + ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); + taosMemoryFreeClear(val); + } + streamStateSessionClear_rocksdb(p); + + for (int i = 0; i < size; i++) { + char tbname[TSDB_TABLE_NAME_LEN] = {0}; + sprintf(tbname, "%s_%d", "tbname", i); + ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname)); + } + for (int i = 0; i < size; i++) { + char *val = NULL; + ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val)); + ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0); + taosMemoryFree(val); + } + + for (int i = 0; i < size; i++) { + char tbname[TSDB_TABLE_NAME_LEN] = {0}; + sprintf(tbname, "%s_%d", "tbname", i); + ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname)); + } + for (int i = 0; i < size; i++) { + char *val = NULL; + ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val)); + ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0); + taosMemoryFree(val); + } + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "tbname_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + code = streamDefaultPut_rocksdb(p, key, val, strlen(val)); + ASSERT(code == 0); + } + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "tbname_%d", i); + + char *val = NULL; + int32_t len = 0; + code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); + ASSERT(code == 0); + } + SArray *result = taosArrayInit(8, sizeof(void *)); + streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result); + ASSERT(taosArrayGetSize(result) >= 0); + + return p; + // streamStateClose((SStreamState *)p, true); +} +TEST_F(BackendEnv, checkOpen) { + SStreamState *p = (SStreamState *)backendOpen(); + int64_t tsStart = taosGetTimestampMs(); + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + char valBuf[256] = {0}; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + // do checkpoint 2 + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); + { + void *pBatch = streamStateCreateBatch(); + int32_t size = 0; + char valBuf[256] = {0}; + for (int i = 0; i < size; i++) { + char key[128] = {0}; + sprintf(key, "key_%d", i); + char val[128] = {0}; + sprintf(val, "val_%d", i); + streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, + (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); + } + streamStatePutBatch_rocksdb(p, pBatch); + streamStateDestroyBatch(pBatch); + } + + taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3); + + const char *path = "/tmp/backend/stream"; + const char *dump = "/tmp/backend/stream/dump"; + // taosMkDir(dump); + taosMulMkDir(dump); + SBkdMgt *mgt = bkdMgtCreate((char *)path); + SArray *result = taosArrayInit(4, sizeof(void *)); + bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); + + bkdMgtDestroy(mgt); streamStateClose((SStreamState *)p, true); - // taskDbDestroy(p, true); } -TEST_F(BackendEnv, checkOpen) { backendOpen(); } -TEST_F(BackendEnv, backendOpt) {} -TEST_F(BackendEnv, backendDestroy) {} +TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; } + +typedef struct BdKV { + uint32_t k; + uint32_t v; +} BdKV; + +BdKV kvDict[] = {{0, 2}, + {1, 2}, + {.k = 15, .v = 16}, + {.k = 31, .v = 32}, + {.k = 56, .v = 64}, + {.k = 100, .v = 128}, + {.k = 200, .v = 256}, + {.k = 500, .v = 512}, + {.k = 1000, .v = 1024}, + {.k = 2000, .v = 2048}, + {.k = 3000, .v = 4096}}; + +TEST_F(BackendEnv, backendUtil) { + for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) { + ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v); + } +} int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); From e1f11d1639e1d3214706cf1272e9c0ea1d2f9dd2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 00:48:20 +0000 Subject: [PATCH 06/12] add test case to stream backend --- source/libs/stream/test/checkpointTest.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index eb7f38744f..0caad479e5 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -25,6 +25,9 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" +// tsSnodeAddress = ""; +// tsS3StreamEnabled = 0; + #include "cos.h" #include "rsync.h" #include "streamInt.h" @@ -45,26 +48,26 @@ // } TEST(testCase, checkpointUpload_Test) { - stopRsync(); - startRsync(); + // stopRsync(); + // startRsync(); taosSsleep(5); char* id = "2013892036"; - uploadCheckpoint(id, "/root/offset/"); + // uploadCheckpoint(id, "/root/offset/"); } TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - downloadCheckpoint(id, "/root/offset/download/"); + // downloadCheckpoint(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) { char* id = "2013892036"; - deleteCheckpoint(id); + // deleteCheckpoint(id); } TEST(testCase, checkpointDeleteFile_Test) { char* id = "2013892036"; - deleteCheckpointFile(id, "offset-ver0"); + // deleteCheckpointFile(id, "offset-ver0"); } From 9f983a07d78f5548f623aa92319be5afe75704be Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 10:48:24 +0800 Subject: [PATCH 07/12] add test case --- source/libs/stream/test/backendTest.cpp | 33 +++++++++++++------------ 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 83adaeb21c..8143c6dbaf 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -19,7 +19,7 @@ class BackendEnv : public ::testing::Test { void *backendCreate() { const char *streamPath = "/tmp"; - void *p = NULL; + void * p = NULL; // char *absPath = NULL; // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); @@ -42,7 +42,7 @@ SStreamState *stateCreate(const char *path) { } void *backendOpen() { streamMetaInit(); - const char *path = "/tmp/backend"; + const char * path = "/tmp/backend"; SStreamState *p = stateCreate(path); ASSERT(p != NULL); @@ -65,7 +65,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char *newVal = NULL; + char * newVal = NULL; streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(len == strlen(val)); } @@ -80,7 +80,7 @@ void *backendOpen() { SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; const char *val = "value data"; int32_t len = 0; - char *newVal = NULL; + char * newVal = NULL; int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(code != 0); } @@ -107,7 +107,7 @@ void *backendOpen() { winkey.groupId = 0; winkey.ts = tsArray[0]; - char *val = NULL; + char * val = NULL; int32_t len = 0; pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); @@ -127,14 +127,14 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char *val = NULL; + char * val = NULL; int32_t len = 0; streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); ASSERT(len == strlen("Value")); } for (int i = 0; i < size; i++) { STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char *val = NULL; + char * val = NULL; int32_t len = 0; streamStateFuncDel_rocksdb(p, &key); } @@ -172,7 +172,7 @@ void *backendOpen() { { SSessionKey key; - char *val = NULL; + char * val = NULL; int32_t vlen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); @@ -212,7 +212,7 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; - char *val = NULL; + char * val = NULL; int32_t vlen = 0; ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); taosMemoryFreeClear(val); @@ -222,7 +222,7 @@ void *backendOpen() { SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); ASSERT(pCurr != NULL); - char *val = NULL; + char * val = NULL; int32_t vlen = 0; ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); ASSERT(vlen == strlen("Value")); @@ -244,7 +244,7 @@ void *backendOpen() { for (int i = 0; i < size - 1; i++) { SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; - char *val = NULL; + char * val = NULL; int32_t vlen = 0; ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); taosMemoryFreeClear(val); @@ -286,7 +286,7 @@ void *backendOpen() { char key[128] = {0}; sprintf(key, "tbname_%d", i); - char *val = NULL; + char * val = NULL; int32_t len = 0; code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); ASSERT(code == 0); @@ -302,7 +302,7 @@ TEST_F(BackendEnv, checkOpen) { SStreamState *p = (SStreamState *)backendOpen(); int64_t tsStart = taosGetTimestampMs(); { - void *pBatch = streamStateCreateBatch(); + void * pBatch = streamStateCreateBatch(); int32_t size = 0; for (int i = 0; i < size; i++) { char key[128] = {0}; @@ -316,7 +316,7 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } { - void *pBatch = streamStateCreateBatch(); + void * pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -333,7 +333,7 @@ TEST_F(BackendEnv, checkOpen) { // do checkpoint 2 taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); { - void *pBatch = streamStateCreateBatch(); + void * pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -355,11 +355,12 @@ TEST_F(BackendEnv, checkOpen) { // taosMkDir(dump); taosMulMkDir(dump); SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray *result = taosArrayInit(4, sizeof(void *)); + SArray * result = taosArrayInit(4, sizeof(void *)); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); bkdMgtDestroy(mgt); streamStateClose((SStreamState *)p, true); + taosRemoveDir(path); } TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; } From d7f14e916d65754b79e46f0d38ba8a0c74c8dfbf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 06:01:50 +0000 Subject: [PATCH 08/12] add test case --- source/libs/stream/test/backendTest.cpp | 38 ++++++++++++++----------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 8143c6dbaf..3245979633 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -11,6 +11,12 @@ #include "tstreamFileState.h" #include "tstreamUpdate.h" +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + class BackendEnv : public ::testing::Test { protected: virtual void SetUp() {} @@ -19,7 +25,7 @@ class BackendEnv : public ::testing::Test { void *backendCreate() { const char *streamPath = "/tmp"; - void * p = NULL; + void *p = NULL; // char *absPath = NULL; // // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2); @@ -42,7 +48,7 @@ SStreamState *stateCreate(const char *path) { } void *backendOpen() { streamMetaInit(); - const char * path = "/tmp/backend"; + const char *path = "/tmp/backend"; SStreamState *p = stateCreate(path); ASSERT(p != NULL); @@ -65,7 +71,7 @@ void *backendOpen() { const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(len == strlen(val)); } @@ -80,7 +86,7 @@ void *backendOpen() { SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; const char *val = "value data"; int32_t len = 0; - char * newVal = NULL; + char *newVal = NULL; int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); ASSERT(code != 0); } @@ -107,7 +113,7 @@ void *backendOpen() { winkey.groupId = 0; winkey.ts = tsArray[0]; - char * val = NULL; + char *val = NULL; int32_t len = 0; pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey); @@ -127,14 +133,14 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); ASSERT(len == strlen("Value")); } for (int i = 0; i < size; i++) { STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char * val = NULL; + char *val = NULL; int32_t len = 0; streamStateFuncDel_rocksdb(p, &key); } @@ -172,7 +178,7 @@ void *backendOpen() { { SSessionKey key; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); @@ -212,7 +218,7 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); taosMemoryFreeClear(val); @@ -222,7 +228,7 @@ void *backendOpen() { SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); ASSERT(pCurr != NULL); - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen)); ASSERT(vlen == strlen("Value")); @@ -244,7 +250,7 @@ void *backendOpen() { for (int i = 0; i < size - 1; i++) { SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; - char * val = NULL; + char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); taosMemoryFreeClear(val); @@ -286,7 +292,7 @@ void *backendOpen() { char key[128] = {0}; sprintf(key, "tbname_%d", i); - char * val = NULL; + char *val = NULL; int32_t len = 0; code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len); ASSERT(code == 0); @@ -302,7 +308,7 @@ TEST_F(BackendEnv, checkOpen) { SStreamState *p = (SStreamState *)backendOpen(); int64_t tsStart = taosGetTimestampMs(); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; for (int i = 0; i < size; i++) { char key[128] = {0}; @@ -316,7 +322,7 @@ TEST_F(BackendEnv, checkOpen) { streamStateDestroyBatch(pBatch); } { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -333,7 +339,7 @@ TEST_F(BackendEnv, checkOpen) { // do checkpoint 2 taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2); { - void * pBatch = streamStateCreateBatch(); + void *pBatch = streamStateCreateBatch(); int32_t size = 0; char valBuf[256] = {0}; for (int i = 0; i < size; i++) { @@ -355,7 +361,7 @@ TEST_F(BackendEnv, checkOpen) { // taosMkDir(dump); taosMulMkDir(dump); SBkdMgt *mgt = bkdMgtCreate((char *)path); - SArray * result = taosArrayInit(4, sizeof(void *)); + SArray *result = taosArrayInit(4, sizeof(void *)); bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); bkdMgtDestroy(mgt); From d5c5dc70269cb8fa498d799d6797361d73b09962 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 06:45:35 +0000 Subject: [PATCH 09/12] add test case --- source/libs/stream/test/backendTest.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 3245979633..9165beaaec 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -178,8 +178,9 @@ void *backendOpen() { { SSessionKey key; - char *val = NULL; - int32_t vlen = 0; + memset(&key, 0, sizeof(key)); + char *val = NULL; + int32_t vlen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen); ASSERT(code == 0); pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key); From 249141a9a25ba5c04cef6a8ab4a6b1b8c9590983 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 08:16:02 +0000 Subject: [PATCH 10/12] add compress dict --- source/libs/stream/test/backendTest.cpp | 35 ++++++++++++++++++------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 9165beaaec..b07c860da8 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -16,6 +16,10 @@ #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wformat" +#pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +#pragma GCC diagnostic ignored "-Wpointer-arith" class BackendEnv : public ::testing::Test { protected: @@ -56,18 +60,22 @@ void *backendOpen() { // default/state/fill/sess/func/parname/partag int32_t size = 100; std::vector tsArray; - for (int i = 0; i < size; i++) { - int64_t ts = taosGetTimestampMs(); - SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + for (int32_t i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + SWinKey key; // = {.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; const char *val = "value data"; int32_t vlen = strlen(val); streamStatePut_rocksdb(p, &key, (char *)val, vlen); tsArray.push_back(ts); } - for (int i = 0; i < size; i++) { + for (int32_t i = 0; i < size; i++) { int64_t ts = tsArray[i]; - SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; const char *val = "value data"; int32_t len = 0; @@ -76,14 +84,20 @@ void *backendOpen() { ASSERT(len == strlen(val)); } int64_t ts = tsArray[0]; - SWinKey key = {.groupId = (uint64_t)(0), .ts = ts}; + SWinKey key = {0}; // {.groupId = (uint64_t)(0), .ts = ts}; + key.groupId = (uint64_t)(0); + key.ts = ts; + streamStateDel_rocksdb(p, &key); streamStateClear_rocksdb(p); for (int i = 0; i < size; i++) { - int64_t ts = tsArray[i]; - SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + int64_t ts = tsArray[i]; + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + const char *val = "value data"; int32_t len = 0; char *newVal = NULL; @@ -96,7 +110,10 @@ void *backendOpen() { int64_t ts = taosGetTimestampMs(); tsArray.push_back(ts); - SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts}; + key.groupId = (uint64_t)(i); + key.ts = ts; + const char *val = "value data"; int32_t vlen = strlen(val); streamStatePut_rocksdb(p, &key, (char *)val, vlen); From 8c46c71c98f0eb190be98e0bd556f0a6d848ff11 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 09:02:55 +0000 Subject: [PATCH 11/12] add test case --- source/libs/stream/test/backendTest.cpp | 56 ++++++++++++++++++------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index b07c860da8..701ea0a75b 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -142,23 +142,34 @@ void *backendOpen() { for (int i = 0; i < size; i++) { int64_t ts = taosGetTimestampMs(); tsArray.push_back(ts); - STupleKey key = {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i}; + STupleKey key = {0}; + key.groupId = (uint64_t)(0); //= {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i}; + key.ts = ts; + key.exprIdx = i; const char *val = "Value"; int32_t len = strlen(val); streamStateFuncPut_rocksdb(p, &key, val, len); } for (int i = 0; i < size; i++) { - STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char *val = NULL; - int32_t len = 0; + STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[i]; + key.exprIdx = i; + + char *val = NULL; + int32_t len = 0; streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); ASSERT(len == strlen("Value")); } for (int i = 0; i < size; i++) { - STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; - char *val = NULL; - int32_t len = 0; + STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[i]; + key.exprIdx = i; + + char *val = NULL; + int32_t len = 0; streamStateFuncDel_rocksdb(p, &key); } @@ -166,7 +177,10 @@ void *backendOpen() { tsArray.clear(); for (int i = 0; i < size; i++) { - SSessionKey key = {.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)}; + SSessionKey key = {0}; //{.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)}; + key.win.skey = i; + key.win.ekey = i; + key.groupId = (uint64_t)(0); tsArray.push_back(i); const char *val = "Value"; @@ -179,7 +193,10 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { - SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + key.win.skey = tsArray[i]; + key.win.ekey = tsArray[i]; + key.groupId = (uint64_t)(0); const char *val = "Value"; int32_t len = strlen(val); @@ -219,7 +236,10 @@ void *backendOpen() { streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len)); } for (int i = 0; i < size; i++) { - SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)}; + key.win.skey = tsArray[i]; + key.win.ekey = tsArray[i]; + key.groupId = (uint64_t)(0); const char *val = "Value"; int32_t len = strlen(val); @@ -229,20 +249,26 @@ void *backendOpen() { } for (int i = 0; i < size; i++) { - SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; const char *val = "Value"; int32_t vlen = strlen(val); ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0); } for (int i = 0; i < size; i++) { - SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0); taosMemoryFreeClear(val); } { - SWinKey key = {.groupId = (uint64_t)(0), .ts = tsArray[0]}; + SWinKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[0]}; + key.groupId = (uint64_t)(0); + key.ts = tsArray[0]; SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key); ASSERT(pCurr != NULL); @@ -267,7 +293,9 @@ void *backendOpen() { } for (int i = 0; i < size - 1; i++) { - SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]}; + key.groupId = (uint64_t)(i); + key.ts = tsArray[i]; char *val = NULL; int32_t vlen = 0; ASSERT(streamStateFillDel_rocksdb(p, &key) == 0); From 17e9f1d747c4e5371f2af813dabe70c120eafbea Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Feb 2024 09:33:48 +0000 Subject: [PATCH 12/12] add test case --- source/libs/stream/test/backendTest.cpp | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 701ea0a75b..a949748eb5 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -422,17 +422,8 @@ typedef struct BdKV { uint32_t v; } BdKV; -BdKV kvDict[] = {{0, 2}, - {1, 2}, - {.k = 15, .v = 16}, - {.k = 31, .v = 32}, - {.k = 56, .v = 64}, - {.k = 100, .v = 128}, - {.k = 200, .v = 256}, - {.k = 500, .v = 512}, - {.k = 1000, .v = 1024}, - {.k = 2000, .v = 2048}, - {.k = 3000, .v = 4096}}; +BdKV kvDict[] = {{0, 2}, {1, 2}, {15, 16}, {31, 32}, {56, 64}, {100, 128}, + {200, 256}, {500, 512}, {1000, 1024}, {2000, 2048}, {3000, 4096}}; TEST_F(BackendEnv, backendUtil) { for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) {