add test case to stream backend
This commit is contained in:
parent
6b6f5aa5d1
commit
b3c18c31da
|
@ -258,6 +258,7 @@ void bkdMgtDestroy(SBkdMgt* bm);
|
|||
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
|
||||
|
||||
uint32_t nextPow2(uint32_t x);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -2788,7 +2788,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
|
|||
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
|
||||
return pCur;
|
||||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
stDebug("streamStateGetCur_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
|
@ -2838,7 +2837,6 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
|
|||
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
// session cf
|
||||
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
|
||||
|
@ -3432,7 +3430,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
|
|||
SSessionKey tmpKey = *key;
|
||||
int32_t valSize = *pVLen;
|
||||
void* tmp = taosMemoryMalloc(valSize);
|
||||
// tdbRealloc(NULL, valSize);
|
||||
if (!tmp) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -3506,13 +3503,11 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
|
|||
return code;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
|
||||
int code = 0;
|
||||
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
|
||||
|
@ -3535,10 +3530,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
|
|||
if (pIter == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t klen = 0;
|
||||
rocksdb_iter_seek(pIter, start, strlen(start));
|
||||
while (rocksdb_iter_valid(pIter)) {
|
||||
const char* key = rocksdb_iter_key(pIter, NULL);
|
||||
const char* key = rocksdb_iter_key(pIter, &klen);
|
||||
int32_t vlen = 0;
|
||||
const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
|
||||
char* val = NULL;
|
||||
|
@ -3700,6 +3695,8 @@ uint32_t nextPow2(uint32_t x) {
|
|||
x = x | (x >> 16);
|
||||
return x + 1;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t copyFiles(const char* src, const char* dst) {
|
||||
int32_t code = 0;
|
||||
// opt later, just hard link
|
||||
|
@ -3739,6 +3736,7 @@ _err:
|
|||
taosCloseDir(&pDir);
|
||||
return code >= 0 ? 0 : -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t isBkdDataMeta(char* name, int32_t len) {
|
||||
const char* pCurrent = "CURRENT";
|
||||
|
|
|
@ -40,7 +40,8 @@ SStreamState *stateCreate(const char *path) {
|
|||
ASSERT(p != NULL);
|
||||
return p;
|
||||
}
|
||||
void backendOpen() {
|
||||
void *backendOpen() {
|
||||
streamMetaInit();
|
||||
const char *path = "/tmp/backend";
|
||||
SStreamState *p = stateCreate(path);
|
||||
ASSERT(p != NULL);
|
||||
|
@ -85,18 +86,306 @@ void backendOpen() {
|
|||
}
|
||||
tsArray.clear();
|
||||
|
||||
// read
|
||||
// iterator
|
||||
// rebuild chkp, reload from chkp
|
||||
// sync
|
||||
//
|
||||
for (int i = 0; i < size; i++) {
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
tsArray.push_back(ts);
|
||||
|
||||
SWinKey key = {.groupId = (uint64_t)(i), .ts = ts};
|
||||
const char *val = "value data";
|
||||
int32_t vlen = strlen(val);
|
||||
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
|
||||
}
|
||||
|
||||
SWinKey winkey;
|
||||
int32_t code = streamStateGetFirst_rocksdb(p, &key);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(key.ts == tsArray[0]);
|
||||
|
||||
SStreamStateCur *pCurr = streamStateSeekToLast_rocksdb(p);
|
||||
ASSERT(pCurr != NULL);
|
||||
streamStateFreeCur(pCurr);
|
||||
|
||||
winkey.groupId = 0;
|
||||
winkey.ts = tsArray[0];
|
||||
char *val = NULL;
|
||||
int32_t len = 0;
|
||||
|
||||
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
|
||||
ASSERT(pCurr != NULL);
|
||||
|
||||
streamStateFreeCur(pCurr);
|
||||
|
||||
tsArray.clear();
|
||||
for (int i = 0; i < size; i++) {
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
tsArray.push_back(ts);
|
||||
STupleKey key = {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i};
|
||||
|
||||
const char *val = "Value";
|
||||
int32_t len = strlen(val);
|
||||
streamStateFuncPut_rocksdb(p, &key, val, len);
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
|
||||
char *val = NULL;
|
||||
int32_t len = 0;
|
||||
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
|
||||
ASSERT(len == strlen("Value"));
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
STupleKey key = {.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
|
||||
char *val = NULL;
|
||||
int32_t len = 0;
|
||||
streamStateFuncDel_rocksdb(p, &key);
|
||||
}
|
||||
|
||||
// session put
|
||||
tsArray.clear();
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
SSessionKey key = {.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)};
|
||||
tsArray.push_back(i);
|
||||
|
||||
const char *val = "Value";
|
||||
int32_t len = strlen(val);
|
||||
streamStateSessionPut_rocksdb(p, &key, val, len);
|
||||
|
||||
char *pval = NULL;
|
||||
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
|
||||
ASSERT(strncmp(pval, val, len) == 0);
|
||||
}
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
|
||||
|
||||
const char *val = "Value";
|
||||
int32_t len = strlen(val);
|
||||
|
||||
char *pval = NULL;
|
||||
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
|
||||
ASSERT(strncmp(pval, val, len) == 0);
|
||||
taosMemoryFreeClear(pval);
|
||||
}
|
||||
|
||||
pCurr = streamStateSessionSeekToLast_rocksdb(p, 0);
|
||||
ASSERT(pCurr != NULL);
|
||||
|
||||
{
|
||||
SSessionKey key;
|
||||
char *val = NULL;
|
||||
int32_t vlen = 0;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key);
|
||||
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
|
||||
ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]);
|
||||
|
||||
pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(vlen == strlen("Value"));
|
||||
ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]);
|
||||
|
||||
ASSERT(0 == streamStateSessionAddIfNotExist_rocksdb(p, &key, 10, (void **)&val, &len));
|
||||
|
||||
ASSERT(0 ==
|
||||
streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len));
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
SSessionKey key = {.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
|
||||
|
||||
const char *val = "Value";
|
||||
int32_t len = strlen(val);
|
||||
|
||||
char *pval = NULL;
|
||||
ASSERT(0 == streamStateSessionDel_rocksdb(p, &key));
|
||||
}
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||
const char *val = "Value";
|
||||
int32_t vlen = strlen(val);
|
||||
ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0);
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||
char *val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
|
||||
taosMemoryFreeClear(val);
|
||||
}
|
||||
{
|
||||
SWinKey key = {.groupId = (uint64_t)(0), .ts = tsArray[0]};
|
||||
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
|
||||
ASSERT(pCurr != NULL);
|
||||
|
||||
char *val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
||||
ASSERT(vlen == strlen("Value"));
|
||||
streamStateFreeCur(pCurr);
|
||||
|
||||
pCurr = streamStateFillSeekKeyNext_rocksdb(p, &key);
|
||||
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
||||
ASSERT(vlen == strlen("Value") && key.groupId == 1 && key.ts == tsArray[1]);
|
||||
|
||||
key.groupId = 1;
|
||||
key.ts = tsArray[1];
|
||||
|
||||
pCurr = streamStateFillSeekKeyPrev_rocksdb(p, &key);
|
||||
ASSERT(pCurr != NULL);
|
||||
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
||||
|
||||
ASSERT(vlen == strlen("Value") && key.groupId == 0 && key.ts == tsArray[0]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < size - 1; i++) {
|
||||
SWinKey key = {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||
char *val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
|
||||
taosMemoryFreeClear(val);
|
||||
}
|
||||
streamStateSessionClear_rocksdb(p);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
char tbname[TSDB_TABLE_NAME_LEN] = {0};
|
||||
sprintf(tbname, "%s_%d", "tbname", i);
|
||||
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
char *val = NULL;
|
||||
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
|
||||
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
char tbname[TSDB_TABLE_NAME_LEN] = {0};
|
||||
sprintf(tbname, "%s_%d", "tbname", i);
|
||||
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
char *val = NULL;
|
||||
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
|
||||
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
sprintf(key, "tbname_%d", i);
|
||||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
code = streamDefaultPut_rocksdb(p, key, val, strlen(val));
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
sprintf(key, "tbname_%d", i);
|
||||
|
||||
char *val = NULL;
|
||||
int32_t len = 0;
|
||||
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
SArray *result = taosArrayInit(8, sizeof(void *));
|
||||
streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result);
|
||||
ASSERT(taosArrayGetSize(result) >= 0);
|
||||
|
||||
return p;
|
||||
// streamStateClose((SStreamState *)p, true);
|
||||
}
|
||||
TEST_F(BackendEnv, checkOpen) {
|
||||
SStreamState *p = (SStreamState *)backendOpen();
|
||||
int64_t tsStart = taosGetTimestampMs();
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
sprintf(key, "key_%d", i);
|
||||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000);
|
||||
}
|
||||
streamStatePutBatch_rocksdb(p, pBatch);
|
||||
streamStateDestroyBatch(pBatch);
|
||||
}
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
char valBuf[256] = {0};
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
sprintf(key, "key_%d", i);
|
||||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
}
|
||||
streamStatePutBatch_rocksdb(p, pBatch);
|
||||
streamStateDestroyBatch(pBatch);
|
||||
}
|
||||
// do checkpoint 2
|
||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
char valBuf[256] = {0};
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
sprintf(key, "key_%d", i);
|
||||
char val[128] = {0};
|
||||
sprintf(val, "val_%d", i);
|
||||
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
|
||||
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
|
||||
}
|
||||
streamStatePutBatch_rocksdb(p, pBatch);
|
||||
streamStateDestroyBatch(pBatch);
|
||||
}
|
||||
|
||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3);
|
||||
|
||||
const char *path = "/tmp/backend/stream";
|
||||
const char *dump = "/tmp/backend/stream/dump";
|
||||
// taosMkDir(dump);
|
||||
taosMulMkDir(dump);
|
||||
SBkdMgt *mgt = bkdMgtCreate((char *)path);
|
||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||
|
||||
bkdMgtDestroy(mgt);
|
||||
streamStateClose((SStreamState *)p, true);
|
||||
// taskDbDestroy(p, true);
|
||||
}
|
||||
|
||||
TEST_F(BackendEnv, checkOpen) { backendOpen(); }
|
||||
TEST_F(BackendEnv, backendOpt) {}
|
||||
TEST_F(BackendEnv, backendDestroy) {}
|
||||
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
|
||||
|
||||
typedef struct BdKV {
|
||||
uint32_t k;
|
||||
uint32_t v;
|
||||
} BdKV;
|
||||
|
||||
BdKV kvDict[] = {{0, 2},
|
||||
{1, 2},
|
||||
{.k = 15, .v = 16},
|
||||
{.k = 31, .v = 32},
|
||||
{.k = 56, .v = 64},
|
||||
{.k = 100, .v = 128},
|
||||
{.k = 200, .v = 256},
|
||||
{.k = 500, .v = 512},
|
||||
{.k = 1000, .v = 1024},
|
||||
{.k = 2000, .v = 2048},
|
||||
{.k = 3000, .v = 4096}};
|
||||
|
||||
TEST_F(BackendEnv, backendUtil) {
|
||||
for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) {
|
||||
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
|
Loading…
Reference in New Issue