add test case

This commit is contained in:
yihaoDeng 2024-02-23 09:30:53 +00:00
parent c52f064670
commit c54e4aeaad
6 changed files with 137 additions and 54 deletions

View File

@ -76,8 +76,8 @@ char* idxInt2str(int64_t val, char* dst, int radix) {
return dst - 1; return dst - 1;
} }
__compar_fn_t idxGetCompar(int8_t type) { __compar_fn_t idxGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_NCHAR ||
type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_GEOMETRY) { type == TSDB_DATA_TYPE_GEOMETRY) {
return (__compar_fn_t)strcmp; return (__compar_fn_t)strcmp;
} }
return getComparFunc(type, 0); return getComparFunc(type, 0);
@ -108,8 +108,8 @@ static FORCE_INLINE TExeCond tCompareEqual(void* a, void* b, int8_t type) {
return tCompare(func, QUERY_TERM, a, b, type); return tCompare(func, QUERY_TERM, a, b, type);
} }
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY ||
dtype == TSDB_DATA_TYPE_VARBINARY || dtype == TSDB_DATA_TYPE_GEOMETRY) { dtype == TSDB_DATA_TYPE_GEOMETRY) {
return tDoCompare(func, cmptype, a, b); return tDoCompare(func, cmptype, a, b);
} }
#if 1 #if 1
@ -290,6 +290,7 @@ int idxUidCompare(const void* a, const void* b) {
uint64_t r = *(uint64_t*)b; uint64_t r = *(uint64_t*)b;
return l - r; return l - r;
} }
#ifdef BUILD_NO_CALL
int32_t idxConvertData(void* src, int8_t type, void** dst) { int32_t idxConvertData(void* src, int8_t type, void** dst) {
int tlen = -1; int tlen = -1;
switch (type) { switch (type) {
@ -372,6 +373,8 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
// indexMayFillNumbericData(*dst, tlen); // indexMayFillNumbericData(*dst, tlen);
return tlen; return tlen;
} }
#endif
int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
if (src == NULL) { if (src == NULL) {
*dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR)); *dst = strndup(INDEX_DATA_NULL_STR, (int)strlen(INDEX_DATA_NULL_STR));

View File

@ -17,7 +17,7 @@
#define _STREAM_BACKEDN_ROCKSDB_H_ #define _STREAM_BACKEDN_ROCKSDB_H_
#include "rocksdb/c.h" #include "rocksdb/c.h"
//#include "streamInt.h" // #include "streamInt.h"
#include "streamState.h" #include "streamState.h"
#include "tcommon.h" #include "tcommon.h"
@ -244,11 +244,6 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, SArray* pSnap); int32_t taskDbBuildSnap(void* arg, SArray* pSnap);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
// STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
// void taskDbDestroy(void* pDb, bool flush);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path); SBkdMgt* bkdMgtCreate(char* path);

View File

@ -70,7 +70,7 @@ static void streamMetaEnvInit() {
streamTimerInit(); streamTimerInit();
} }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);} void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { void streamMetaCleanup() {
taosCloseRef(streamBackendId); taosCloseRef(streamBackendId);
@ -1104,14 +1104,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
}; };
entry.inputRate = entry.inputQUsed * 100.0 / (2*STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
} }
if ((*pTask)->chkInfo.checkpointingId != 0) { if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0; entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointId = (*pTask)->chkInfo.checkpointingId; entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId; entry.chkpointTransId = (*pTask)->chkInfo.transId;
@ -1172,7 +1172,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
} }
_end: _end:
streamMetaClearHbMsg(&hbMsg); streamMetaClearHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1304,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId); // stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock); taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId); // stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock); int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else { } else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId); // stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
} }
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId); // stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock); taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); // stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId); // stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock); taosThreadRwlockUnlock(&pMeta->lock);
} }
@ -1395,7 +1395,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
pMeta->sendMsgBeforeClosing = true; pMeta->sendMsgBeforeClosing = true;
} }
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
if (isLeader) { if (isLeader) {
@ -1531,8 +1531,8 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId));
if (ppTask == NULL) { if (ppTask == NULL) {
continue; continue;
@ -1633,7 +1633,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs", ", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0); pStartInfo->elapsedTime / 1000.0);

View File

@ -1,40 +1,82 @@
MESSAGE(STATUS "build stream unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# bloomFilterTest # bloomFilterTest
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(streamUpdateTest #TARGET_LINK_LIBRARIES(streamUpdateTest
PUBLIC os util common gtest gtest_main stream executor index #PUBLIC os util common gtest gtest_main stream executor index
) #)
TARGET_INCLUDE_DIRECTORIES( #TARGET_INCLUDE_DIRECTORIES(
streamUpdateTest #streamUpdateTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
#TARGET_LINK_LIBRARIES(
#checkpointTest
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#checkpointTest
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_executable(backendTest "")
#target_sources(backendTest
#PRIVATE
#"backendTest.cpp"
#)
#TARGET_LINK_LIBRARIES(
#backendTest
#PUBLIC rocksdb
#PUBLIC os common gtest stream executor qcom index transport util
#)
#TARGET_INCLUDE_DIRECTORIES(
#backendTest
#PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
#PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
#)
#add_test(
#NAME streamUpdateTest
#COMMAND streamUpdateTest
#)
#add_test(
#NAME checkpointTest
#COMMAND checkpointTest
#)
#add_test(
#NAME backendTest
#COMMAND backendTest
#)
add_executable(backendTest "")
target_sources(backendTest
PRIVATE
"backendTest.cpp"
)
target_include_directories(
backendTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
) )
ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) target_link_libraries(
TARGET_LINK_LIBRARIES( backendTest
checkpointTest PUBLIC rocksdb
PUBLIC os common gtest stream executor qcom index transport util PUBLIC os common gtest stream executor qcom index transport util
) )
TARGET_INCLUDE_DIRECTORIES(
checkpointTest
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test( add_test(
NAME streamUpdateTest NAME backendTest
COMMAND streamUpdateTest COMMAND backendTest
)
add_test(
NAME checkpointTest
COMMAND checkpointTest
) )

View File

@ -0,0 +1,38 @@
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include "streamBackendRocksdb.h"
class BackendEnv : public ::testing::Test {
protected:
virtual void SetUp() {}
virtual void TearDown() {}
};
void *backendCreate() {
const char *streamPath = "/tmp";
char *absPath = NULL;
void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1);
// p = bkdMgtCreate((char *)streamPath);
ASSERT(p != NULL);
return p;
}
void backendOpen() {
void *p = backendCreate();
ASSERT(p != NULL);
}
TEST_F(BackendEnv, checkOpen) { backendOpen(); }
TEST_F(BackendEnv, backendOpt) {}
TEST_F(BackendEnv, backendDestroy) {}
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -14,10 +14,7 @@ class StreamStateEnv : public ::testing::Test {
streamMetaInit(); streamMetaInit();
backend = streamBackendInit(path, 0, 0); backend = streamBackendInit(path, 0, 0);
} }
virtual void TearDown() { virtual void TearDown() { streamMetaCleanup(); }
streamMetaCleanup();
// indexClose(index);
}
const char *path = TD_TMP_DIR_PATH "stream"; const char *path = TD_TMP_DIR_PATH "stream";
void *backend; void *backend;
@ -50,6 +47,14 @@ bool equalSBF(SScalableBf *left, SScalableBf *right) {
} }
TEST(TD_STREAM_UPDATE_TEST, update) { TEST(TD_STREAM_UPDATE_TEST, update) {
const char *streamPath = "/tmp";
char *absPath = NULL;
void *p = NULL;
// SBackendWrapper *p = streamBackendInit(streamPath, -1, 2);
// p = taskDbOpen((char *)streamPath, (char *)"test", -1);
p = bkdMgtCreate((char *)streamPath);
// const int64_t interval = 20 * 1000; // const int64_t interval = 20 * 1000;
// const int64_t watermark = 10 * 60 * 1000; // const int64_t watermark = 10 * 60 * 1000;
// SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); // SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);