Merge pull request #24916 from taosdata/addBackendTest

Add backend test
This commit is contained in:
Hongze Cheng 2024-02-28 19:16:14 +08:00 committed by GitHub
commit d9095d80de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 604 additions and 89 deletions

View File

@ -76,8 +76,8 @@ char* idxInt2str(int64_t val, char* dst, int radix) {
return dst - 1;
}
__compar_fn_t idxGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY ||
type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR ||
type == TSDB_DATA_TYPE_GEOMETRY) {
return (__compar_fn_t)strcmp;
}
return getComparFunc(type, 0);
@ -108,8 +108,8 @@ static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) {
return tCompare(func, QUERY_TERM, a, b, type);
}
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR ||
dtype == TSDB_DATA_TYPE_VARBINARY || dtype == TSDB_DATA_TYPE_GEOMETRY) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY ||
dtype == TSDB_DATA_TYPE_GEOMETRY) {
return tDoCompare(func, cmptype, a, b);
}
#if 1
@ -290,6 +290,7 @@ int idxUidCompare(const void* a, const void* b) {
uint64_t r = *(uint64_t*)b;
return l - r;
}
#ifdef BUILD_NO_CALL
int32_t idxConvertData(void* src, int8_t type, void** dst) {
int tlen = -1;
switch (type) {
@ -372,6 +373,8 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
// indexMayFillNumbericData(*dst, tlen);
return tlen;
}
#endif
int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
if (src == NULL) {
*dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR));

View File

@ -17,10 +17,14 @@
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "rocksdb/c.h"
//#include "streamInt.h"
// #include "streamInt.h"
#include "streamState.h"
#include "tcommon.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SCfComparator {
rocksdb_comparator_t** comp;
int32_t numOfComp;
@ -244,11 +248,6 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
// void taskDbDestroy(void* pDb, bool flush);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path);
@ -258,4 +257,10 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
uint32_t nextPow2(uint32_t x);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -2788,7 +2788,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur;
}
#ifdef BUILD_NO_CALL
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateGetCur_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
@ -2838,7 +2837,6 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) {
STREAM_STATE_DEL_ROCKSDB(pState, "func", key);
return 0;
}
#endif
// session cf
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
@ -3432,7 +3430,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
SSessionKey tmpKey = *key;
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
// tdbRealloc(NULL, valSize);
if (!tmp) {
return -1;
}
@ -3506,13 +3503,11 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
return code;
}
#ifdef BUILD_NO_CALL
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen);
return code;
}
#endif
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen);
@ -3535,10 +3530,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
if (pIter == NULL) {
return -1;
}
size_t klen = 0;
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
const char* key = rocksdb_iter_key(pIter, &klen);
int32_t vlen = 0;
const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen);
char* val = NULL;
@ -3700,6 +3695,8 @@ uint32_t nextPow2(uint32_t x) {
x = x | (x >> 16);
return x + 1;
}
#ifdef BUILD_NO_CALL
int32_t copyFiles(const char* src, const char* dst) {
int32_t code = 0;
// opt later, just hard link
@ -3739,6 +3736,7 @@ _err:
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
#endif
int32_t isBkdDataMeta(char* name, int32_t len) {
const char* pCurrent = "CURRENT";

View File

@ -70,7 +70,7 @@ static void streamMetaEnvInit() {
streamTimerInit();
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
@ -1104,14 +1104,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
};
entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0;
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
@ -1172,7 +1172,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
_end:
_end:
streamMetaClearHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}
@ -1304,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}
@ -1395,7 +1395,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
pMeta->sendMsgBeforeClosing = true;
}
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
streamMetaWUnLock(pMeta);
if (isLeader) {
@ -1531,7 +1531,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) {
for (int32_t i = 0; i < num; ++i) {
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
if (ppTask == NULL) {

View File

@ -1,40 +1,104 @@
MESSAGE(STATUS "build stream unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# bloomFilterTest
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(streamUpdateTest
PUBLIC os util common gtest gtest_main stream executor index
#TARGET_LINK_LIBRARIES(streamUpdateTest
#PUBLIC os util common gtest gtest_main stream executor index
#)
#TARGET_INCLUDE_DIRECTORIES(
#streamUpdateTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
#TARGET_LINK_LIBRARIES(
#checkpointTest
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#checkpointTest
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_executable(backendTest "")
#target_sources(backendTest
#PRIVATE
#"backendTest.cpp"
#)
#TARGET_LINK_LIBRARIES(
#backendTest
#PUBLIC rocksdb
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#backendTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_test(
#NAME streamUpdateTest
#COMMAND streamUpdateTest
#)
#add_test(
#NAME checkpointTest
#COMMAND checkpointTest
#)
#add_test(
#NAME backendTest
#COMMAND backendTest
#)
#add_executable(backendTest "")
#target_sources(backendTest
#PUBLIC
#"backendTest.cpp"
#)
#target_include_directories(
#backendTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#target_link_libraries(
#backendTest
#PUBLIC rocksdb
#PUBLIC os common gtest stream executor qcom index transport util
#)
MESSAGE(STATUS "build parser unit test")
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(backendTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
backendTest
PUBLIC rocksdb
PUBLIC os common gtest stream executor qcom index transport util vnode
)
TARGET_INCLUDE_DIRECTORIES(
streamUpdateTest
TARGET_INCLUDE_DIRECTORIES(
backendTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
)
ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
TARGET_LINK_LIBRARIES(
checkpointTest
PUBLIC os common gtest stream executor qcom index transport util
)
TARGET_INCLUDE_DIRECTORIES(
checkpointTest
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test(
NAME streamUpdateTest
COMMAND streamUpdateTest
)
add_test(
NAME checkpointTest
COMMAND checkpointTest
)
ADD_TEST(
NAME backendTest
COMMAND backendTest
)
ENDIF ()

View File

@ -0,0 +1,437 @@
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <vector>
#include "streamBackendRocksdb.h"
#include "streamSnapshot.h"
#include "streamState.h"
#include "tstream.h"
#include "tstreamFileState.h"
#include "tstreamUpdate.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
class BackendEnv : public ::testing::Test {
protected:
virtual void SetUp() {}
virtual void TearDown() {}
};
void *backendCreate() {
const char *streamPath = "/tmp";
void *p = NULL;
// char *absPath = NULL;
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
// STaskDbWrapper *p = taskDbOpen((char *)streamPath, (char *)"stream-backend", -1);
// ASSERT(p != NULL);
return p;
}
SStreamState *stateCreate(const char *path) {
SStreamTask *pTask = (SStreamTask *)taosMemoryCalloc(1, sizeof(SStreamTask));
pTask->ver = 1024;
pTask->id.streamId = 1023;
pTask->id.taskId = 1111111;
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, 0, 0, NULL);
pTask->pMeta = pMeta;
SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024);
ASSERT(p != NULL);
return p;
}
void *backendOpen() {
streamMetaInit();
const char *path = "/tmp/backend";
SStreamState *p = stateCreate(path);
ASSERT(p != NULL);
// write bacth
// default/state/fill/sess/func/parname/partag
int32_t size = 100;
std::vector<int64_t> tsArray;
for (int32_t i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
SWinKey key; // = {.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
tsArray.push_back(ts);
}
for (int32_t i = 0; i < size; i++) {
int64_t ts = tsArray[i];
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t len = 0;
char *newVal = NULL;
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(len == strlen(val));
}
int64_t ts = tsArray[0];
SWinKey key = {0}; // {.groupId = (uint64_t)(0), .ts = ts};
key.groupId = (uint64_t)(0);
key.ts = ts;
streamStateDel_rocksdb(p, &key);
streamStateClear_rocksdb(p);
for (int i = 0; i < size; i++) {
int64_t ts = tsArray[i];
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t len = 0;
char *newVal = NULL;
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code != 0);
}
tsArray.clear();
for (int i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
tsArray.push_back(ts);
SWinKey key = {0}; //{.groupId = (uint64_t)(i), .ts = ts};
key.groupId = (uint64_t)(i);
key.ts = ts;
const char *val = "value data";
int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen);
}
SWinKey winkey;
int32_t code = streamStateGetFirst_rocksdb(p, &key);
ASSERT(code == 0);
ASSERT(key.ts == tsArray[0]);
SStreamStateCur *pCurr = streamStateSeekToLast_rocksdb(p);
ASSERT(pCurr != NULL);
streamStateFreeCur(pCurr);
winkey.groupId = 0;
winkey.ts = tsArray[0];
char *val = NULL;
int32_t len = 0;
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
ASSERT(pCurr != NULL);
streamStateFreeCur(pCurr);
tsArray.clear();
for (int i = 0; i < size; i++) {
int64_t ts = taosGetTimestampMs();
tsArray.push_back(ts);
STupleKey key = {0};
key.groupId = (uint64_t)(0); //= {.groupId = (uint64_t)(0), .ts = ts, .exprIdx = i};
key.ts = ts;
key.exprIdx = i;
const char *val = "Value";
int32_t len = strlen(val);
streamStateFuncPut_rocksdb(p, &key, val, len);
}
for (int i = 0; i < size; i++) {
STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
key.groupId = (uint64_t)(0);
key.ts = tsArray[i];
key.exprIdx = i;
char *val = NULL;
int32_t len = 0;
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
ASSERT(len == strlen("Value"));
}
for (int i = 0; i < size; i++) {
STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
key.groupId = (uint64_t)(0);
key.ts = tsArray[i];
key.exprIdx = i;
char *val = NULL;
int32_t len = 0;
streamStateFuncDel_rocksdb(p, &key);
}
// session put
tsArray.clear();
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = i, .ekey = i}, .groupId = (uint64_t)(0)};
key.win.skey = i;
key.win.ekey = i;
key.groupId = (uint64_t)(0);
tsArray.push_back(i);
const char *val = "Value";
int32_t len = strlen(val);
streamStateSessionPut_rocksdb(p, &key, val, len);
char *pval = NULL;
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
ASSERT(strncmp(pval, val, len) == 0);
}
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
key.win.skey = tsArray[i];
key.win.ekey = tsArray[i];
key.groupId = (uint64_t)(0);
const char *val = "Value";
int32_t len = strlen(val);
char *pval = NULL;
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
ASSERT(strncmp(pval, val, len) == 0);
taosMemoryFreeClear(pval);
}
pCurr = streamStateSessionSeekToLast_rocksdb(p, 0);
ASSERT(pCurr != NULL);
{
SSessionKey key;
memset(&key, 0, sizeof(key));
char *val = NULL;
int32_t vlen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
pCurr = streamStateSessionSeekKeyPrev_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(key.groupId == 0 && key.win.ekey == tsArray[tsArray.size() - 2]);
pCurr = streamStateSessionSeekKeyNext_rocksdb(p, &key);
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
ASSERT(code == 0);
ASSERT(vlen == strlen("Value"));
ASSERT(key.groupId == 0 && key.win.skey == tsArray[tsArray.size() - 1]);
ASSERT(0 == streamStateSessionAddIfNotExist_rocksdb(p, &key, 10, (void **)&val, &len));
ASSERT(0 ==
streamStateStateAddIfNotExist_rocksdb(p, &key, (char *)"key", strlen("key"), NULL, (void **)&val, &len));
}
for (int i = 0; i < size; i++) {
SSessionKey key = {0}; //{.win = {.skey = tsArray[i], .ekey = tsArray[i]}, .groupId = (uint64_t)(0)};
key.win.skey = tsArray[i];
key.win.ekey = tsArray[i];
key.groupId = (uint64_t)(0);
const char *val = "Value";
int32_t len = strlen(val);
char *pval = NULL;
ASSERT(0 == streamStateSessionDel_rocksdb(p, &key));
}
for (int i = 0; i < size; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
const char *val = "Value";
int32_t vlen = strlen(val);
ASSERT(streamStateFillPut_rocksdb(p, &key, val, vlen) == 0);
}
for (int i = 0; i < size; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
char *val = NULL;
int32_t vlen = 0;
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
taosMemoryFreeClear(val);
}
{
SWinKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[0]};
key.groupId = (uint64_t)(0);
key.ts = tsArray[0];
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
ASSERT(pCurr != NULL);
char *val = NULL;
int32_t vlen = 0;
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value"));
streamStateFreeCur(pCurr);
pCurr = streamStateFillSeekKeyNext_rocksdb(p, &key);
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value") && key.groupId == 1 && key.ts == tsArray[1]);
key.groupId = 1;
key.ts = tsArray[1];
pCurr = streamStateFillSeekKeyPrev_rocksdb(p, &key);
ASSERT(pCurr != NULL);
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
ASSERT(vlen == strlen("Value") && key.groupId == 0 && key.ts == tsArray[0]);
}
for (int i = 0; i < size - 1; i++) {
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
key.groupId = (uint64_t)(i);
key.ts = tsArray[i];
char *val = NULL;
int32_t vlen = 0;
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
taosMemoryFreeClear(val);
}
streamStateSessionClear_rocksdb(p);
for (int i = 0; i < size; i++) {
char tbname[TSDB_TABLE_NAME_LEN] = {0};
sprintf(tbname, "%s_%d", "tbname", i);
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
}
for (int i = 0; i < size; i++) {
char *val = NULL;
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
taosMemoryFree(val);
}
for (int i = 0; i < size; i++) {
char tbname[TSDB_TABLE_NAME_LEN] = {0};
sprintf(tbname, "%s_%d", "tbname", i);
ASSERT(0 == streamStatePutParName_rocksdb(p, i, tbname));
}
for (int i = 0; i < size; i++) {
char *val = NULL;
ASSERT(0 == streamStateGetParName_rocksdb(p, i, (void **)&val));
ASSERT(strncmp(val, "tbname", strlen("tbname")) == 0);
taosMemoryFree(val);
}
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "tbname_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
code = streamDefaultPut_rocksdb(p, key, val, strlen(val));
ASSERT(code == 0);
}
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "tbname_%d", i);
char *val = NULL;
int32_t len = 0;
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
ASSERT(code == 0);
}
SArray *result = taosArrayInit(8, sizeof(void *));
streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result);
ASSERT(taosArrayGetSize(result) >= 0);
return p;
// streamStateClose((SStreamState *)p, true);
}
TEST_F(BackendEnv, checkOpen) {
SStreamState *p = (SStreamState *)backendOpen();
int64_t tsStart = taosGetTimestampMs();
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
char valBuf[256] = {0};
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
// do checkpoint 2
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
{
void *pBatch = streamStateCreateBatch();
int32_t size = 0;
char valBuf[256] = {0};
for (int i = 0; i < size; i++) {
char key[128] = {0};
sprintf(key, "key_%d", i);
char val[128] = {0};
sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
}
streamStatePutBatch_rocksdb(p, pBatch);
streamStateDestroyBatch(pBatch);
}
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3);
const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump);
taosMulMkDir(dump);
SBkdMgt *mgt = bkdMgtCreate((char *)path);
SArray *result = taosArrayInit(4, sizeof(void *));
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
bkdMgtDestroy(mgt);
streamStateClose((SStreamState *)p, true);
taosRemoveDir(path);
}
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
typedef struct BdKV {
uint32_t k;
uint32_t v;
} BdKV;
BdKV kvDict[] = {{0, 2}, {1, 2}, {15, 16}, {31, 32}, {56, 64}, {100, 128},
{200, 256}, {500, 512}, {1000, 1024}, {2000, 2048}, {3000, 4096}};
TEST_F(BackendEnv, backendUtil) {
for (int i = 0; i < sizeof(kvDict) / sizeof(kvDict[0]); i++) {
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
}
}
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -25,46 +25,49 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
// tsSnodeAddress = "";
// tsS3StreamEnabled = 0;
#include "cos.h"
#include "rsync.h"
#include "streamInt.h"
#include "cos.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
// int main(int argc, char **argv) {
// testing::InitGoogleTest(&argc, argv);
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
printf("error");
}
if (s3Init() < 0) {
return -1;
}
strcpy(tsSnodeAddress, "127.0.0.1");
int ret = RUN_ALL_TESTS();
s3CleanUp();
return ret;
}
// if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
// printf("error");
// }
// if (s3Init() < 0) {
// return -1;
// }
// strcpy(tsSnodeAddress, "127.0.0.1");
// int ret = RUN_ALL_TESTS();
// s3CleanUp();
// return ret;
// }
TEST(testCase, checkpointUpload_Test) {
stopRsync();
startRsync();
// stopRsync();
// startRsync();
taosSsleep(5);
char* id = "2013892036";
uploadCheckpoint(id, "/root/offset/");
// uploadCheckpoint(id, "/root/offset/");
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadCheckpoint(id, "/root/offset/download/");
// downloadCheckpoint(id, "/root/offset/download/");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteCheckpoint(id);
// deleteCheckpoint(id);
}
TEST(testCase, checkpointDeleteFile_Test) {
char* id = "2013892036";
deleteCheckpointFile(id, "offset-ver0");
// deleteCheckpointFile(id, "offset-ver0");
}

View File

@ -14,10 +14,7 @@ class StreamStateEnv : public ::testing::Test {
streamMetaInit();
backend = streamBackendInit(path, 0, 0);
}
virtual void TearDown() {
streamMetaCleanup();
// indexClose(index);
}
virtual void TearDown() { streamMetaCleanup(); }
const char *path = TD_TMP_DIR_PATH "stream";
void *backend;
@ -50,6 +47,14 @@ bool equalSBF(SScalableBf *left, SScalableBf *right) {
}
TEST(TD_STREAM_UPDATE_TEST, update) {
const char *streamPath = "/tmp";
char *absPath = NULL;
void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1);
p = bkdMgtCreate((char *)streamPath);
// const int64_t interval = 20 * 1000;
// const int64_t watermark = 10 * 60 * 1000;
// SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);