add stream backend test
This commit is contained in:
parent
e41e10bf35
commit
8d36445bb5
|
@ -3,6 +3,7 @@
|
||||||
#include <taoserror.h>
|
#include <taoserror.h>
|
||||||
#include <tglobal.h>
|
#include <tglobal.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <vector>
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "streamSnapshot.h"
|
#include "streamSnapshot.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
|
@ -27,26 +28,62 @@ void *backendCreate() {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamState *stateCreate(void *pBackend, char *keyidr) {
|
SStreamState *stateCreate(const char *path) {
|
||||||
const char *streamPath = "/tmp";
|
|
||||||
SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
pTask->ver = 1024;
|
pTask->ver = 1024;
|
||||||
pTask->id.streamId = 1023;
|
pTask->id.streamId = 1023;
|
||||||
pTask->id.taskId = 1111111;
|
pTask->id.taskId = 1111111;
|
||||||
|
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL);
|
||||||
|
pTask->pMeta = pMeta;
|
||||||
|
|
||||||
SStreamState *p = streamStateOpen((char *)streamPath, pTask, true, 32, 32 * 1024);
|
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
|
||||||
ASSERT(p != NULL);
|
ASSERT(p != NULL);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
void backendOpen() {
|
void backendOpen() {
|
||||||
void *p = backendCreate();
|
const char *path = "/tmp/backend";
|
||||||
|
SStreamState *p = stateCreate(path);
|
||||||
ASSERT(p != NULL);
|
ASSERT(p != NULL);
|
||||||
taskDbDestroy(p, true);
|
|
||||||
|
// write bacth
|
||||||
|
// default/state/fill/sess/func/parname/partag
|
||||||
|
int32_t size = 100;
|
||||||
|
std::vector<int64_t> tsArray;
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
int64_t ts = taosGetTimestampMs();
|
||||||
|
SWinKey key = {.groupId = (uint64_t)(i), .ts = ts};
|
||||||
|
const char *val = "value data";
|
||||||
|
int32_t vlen = strlen(val);
|
||||||
|
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
|
||||||
|
|
||||||
|
tsArray.push_back(ts);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
int64_t ts = tsArray[i];
|
||||||
|
SWinKey key = {.groupId = (uint64_t)(i), .ts = ts};
|
||||||
|
|
||||||
|
const char *val = "value data";
|
||||||
|
int32_t len = 0;
|
||||||
|
char *newVal = NULL;
|
||||||
|
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||||
|
ASSERT(len == strlen(val));
|
||||||
|
}
|
||||||
|
int64_t ts = tsArray[0];
|
||||||
|
SWinKey key = {.groupId = (uint64_t)(0), .ts = ts};
|
||||||
|
streamStateDel_rocksdb(p, &key);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// read
|
||||||
|
// iterator
|
||||||
|
// rebuild chkp, reload from chkp
|
||||||
|
// sync
|
||||||
|
//
|
||||||
|
streamStateClose((SStreamState *)p, true);
|
||||||
|
// taskDbDestroy(p, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BackendEnv, checkOpen) {
|
TEST_F(BackendEnv, checkOpen) { backendOpen(); }
|
||||||
backendOpen();
|
|
||||||
}
|
|
||||||
TEST_F(BackendEnv, backendOpt) {}
|
TEST_F(BackendEnv, backendOpt) {}
|
||||||
TEST_F(BackendEnv, backendDestroy) {}
|
TEST_F(BackendEnv, backendDestroy) {}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue