diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index d70f16b461..df0ab9fe7e 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "streamBackendRocksdb.h" #include "streamSnapshot.h" #include "streamState.h" @@ -27,26 +28,62 @@ void *backendCreate() { return p; } -SStreamState *stateCreate(void *pBackend, char *keyidr) { - const char *streamPath = "/tmp"; +SStreamState *stateCreate(const char *path) { SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask)); pTask->ver = 1024; pTask->id.streamId = 1023; pTask->id.taskId = 1111111; + SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL); + pTask->pMeta = pMeta; - SStreamState *p = streamStateOpen((char *)streamPath, pTask, true, 32, 32 * 1024); + SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); ASSERT(p != NULL); return p; } void backendOpen() { - void *p = backendCreate(); + const char *path = "/tmp/backend"; + SStreamState *p = stateCreate(path); ASSERT(p != NULL); - taskDbDestroy(p, true); + + // write bacth + // default/state/fill/sess/func/parname/partag + int32_t size = 100; + std::vector tsArray; + for (int i = 0; i < size; i++) { + int64_t ts = taosGetTimestampMs(); + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + const char *val = "value data"; + int32_t vlen = strlen(val); + streamStatePut_rocksdb(p, &key, (char *)val, vlen); + + tsArray.push_back(ts); + } + for (int i = 0; i < size; i++) { + int64_t ts = tsArray[i]; + SWinKey key = {.groupId = (uint64_t)(i), .ts = ts}; + + const char *val = "value data"; + int32_t len = 0; + char *newVal = NULL; + streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); + ASSERT(len == strlen(val)); + } + int64_t ts = tsArray[0]; + SWinKey key = {.groupId = (uint64_t)(0), .ts = ts}; + streamStateDel_rocksdb(p, &key); + + + + // read + // iterator + // rebuild chkp, reload from chkp + // sync + // + streamStateClose((SStreamState *)p, true); + // taskDbDestroy(p, true); } -TEST_F(BackendEnv, checkOpen) { - backendOpen(); -} +TEST_F(BackendEnv, checkOpen) { backendOpen(); } TEST_F(BackendEnv, backendOpt) {} TEST_F(BackendEnv, backendDestroy) {}