Merge pull request #23899 from taosdata/fix/liaohj
other: merge the patch of stream from 3.0 branch.
This commit is contained in:
commit
4d9211dcf1
|
@ -1,13 +0,0 @@
|
|||
|
||||
# bdb
|
||||
ExternalProject_Add(bdb
|
||||
GIT_REPOSITORY https://github.com/berkeleydb/libdb.git
|
||||
GIT_TAG v5.3.28
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/bdb"
|
||||
BINARY_DIR "${TD_CONTRIB_DIR}/bdb"
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
CONFIGURE_COMMAND COMMAND ./dist/configure --enable-debug
|
||||
BUILD_COMMAND "$(MAKE)"
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -151,6 +151,7 @@ ELSE ()
|
|||
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
|
||||
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
|
||||
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
|
||||
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
|
||||
|
||||
IF (COMPILER_SUPPORT_SSE42)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
|
||||
|
@ -158,11 +159,11 @@ ELSE ()
|
|||
ENDIF()
|
||||
|
||||
IF ("${SIMD_SUPPORT}" MATCHES "true")
|
||||
IF (COMPILER_SUPPORT_FMA)
|
||||
IF (COMPILER_SUPPORT_FMA)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma")
|
||||
ENDIF()
|
||||
IF (COMPILER_SUPPORT_AVX)
|
||||
ENDIF()
|
||||
IF (COMPILER_SUPPORT_AVX)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx")
|
||||
ENDIF()
|
||||
|
@ -175,7 +176,13 @@ ELSE ()
|
|||
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi")
|
||||
MESSAGE(STATUS "avx512 supported by gcc")
|
||||
MESSAGE(STATUS "avx512f/avx512bmi supported by compiler")
|
||||
ENDIF()
|
||||
|
||||
IF (COMPILER_SUPPORT_AVX512VL)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512vl")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl")
|
||||
MESSAGE(STATUS "avx512vl supported by compiler")
|
||||
ENDIF()
|
||||
ENDIF()
|
||||
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
|
||||
# leveldb
|
||||
ExternalProject_Add(leveldb
|
||||
GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git
|
||||
GIT_TAG master
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/leveldb"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
CONFIGURE_COMMAND ""
|
||||
BUILD_COMMAND ""
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -1,12 +0,0 @@
|
|||
|
||||
# lucene
|
||||
ExternalProject_Add(lucene
|
||||
GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/lucene"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
CONFIGURE_COMMAND ""
|
||||
BUILD_COMMAND ""
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -1,12 +0,0 @@
|
|||
|
||||
# NuRaft
|
||||
ExternalProject_Add(NuRaft
|
||||
GIT_REPOSITORY https://github.com/eBay/NuRaft.git
|
||||
GIT_TAG v1.3.0
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/nuraft"
|
||||
BINARY_DIR "${TD_CONTRIB_DIR}/nuraft"
|
||||
CONFIGURE_COMMAND "./prepare.sh"
|
||||
BUILD_COMMAND ""
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -109,11 +109,6 @@ cat("${TD_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
|||
# cJson
|
||||
cat("${TD_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
||||
# leveldb
|
||||
if(${BUILD_WITH_LEVELDB})
|
||||
cat("${TD_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_WITH_LEVELDB})
|
||||
|
||||
if (${BUILD_CONTRIB})
|
||||
if(${BUILD_WITH_ROCKSDB})
|
||||
cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -132,28 +127,11 @@ else()
|
|||
endif()
|
||||
endif()
|
||||
|
||||
# canonical-raft
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
cat("${TD_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# traft
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
cat("${TD_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE)
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
#libuv
|
||||
if(${BUILD_WITH_UV})
|
||||
cat("${TD_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_WITH_UV})
|
||||
|
||||
# bdb
|
||||
if(${BUILD_WITH_BDB})
|
||||
cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_WITH_BDB})
|
||||
|
||||
# sqlite
|
||||
if(${BUILD_WITH_SQLITE})
|
||||
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -178,17 +156,6 @@ elseif(${BUILD_WITH_COS})
|
|||
|
||||
endif()
|
||||
|
||||
# lucene
|
||||
if(${BUILD_WITH_LUCENE})
|
||||
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
add_definitions(-DUSE_LUCENE)
|
||||
endif(${BUILD_WITH_LUCENE})
|
||||
|
||||
# NuRaft
|
||||
if(${BUILD_WITH_NURAFT})
|
||||
cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_WITH_NURAFT})
|
||||
|
||||
# crashdump
|
||||
if(${BUILD_CRASHDUMP})
|
||||
cat("${TD_SUPPORT_DIR}/crashdump_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -437,23 +404,6 @@ elseif(${BUILD_WITH_COS})
|
|||
|
||||
endif()
|
||||
|
||||
# lucene
|
||||
# To support build on ubuntu: sudo apt-get install libboost-all-dev
|
||||
if(${BUILD_WITH_LUCENE})
|
||||
option(ENABLE_TEST "Enable the tests" OFF)
|
||||
add_subdirectory(lucene EXCLUDE_FROM_ALL)
|
||||
target_include_directories(
|
||||
lucene++
|
||||
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include>
|
||||
)
|
||||
|
||||
endif(${BUILD_WITH_LUCENE})
|
||||
|
||||
# NuRaft
|
||||
if(${BUILD_WITH_NURAFT})
|
||||
add_subdirectory(nuraft EXCLUDE_FROM_ALL)
|
||||
endif(${BUILD_WITH_NURAFT})
|
||||
|
||||
# pthread
|
||||
if(${BUILD_PTHREAD})
|
||||
set(CMAKE_BUILD_TYPE debug)
|
||||
|
@ -524,30 +474,6 @@ if(${BUILD_WCWIDTH})
|
|||
SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth)
|
||||
endif(${BUILD_WCWIDTH})
|
||||
|
||||
# CRAFT
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
add_library(craft STATIC IMPORTED GLOBAL)
|
||||
set_target_properties(craft PROPERTIES
|
||||
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft/include"
|
||||
)
|
||||
# target_link_libraries(craft
|
||||
# INTERFACE pthread
|
||||
# )
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# TRAFT
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
add_library(traft STATIC IMPORTED GLOBAL)
|
||||
set_target_properties(traft PROPERTIES
|
||||
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include"
|
||||
)
|
||||
# target_link_libraries(craft
|
||||
# INTERFACE pthread
|
||||
# )
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
# LIBUV
|
||||
if(${BUILD_WITH_UV})
|
||||
if (TD_WINDOWS)
|
||||
|
@ -559,18 +485,6 @@ if(${BUILD_WITH_UV})
|
|||
add_subdirectory(libuv EXCLUDE_FROM_ALL)
|
||||
endif(${BUILD_WITH_UV})
|
||||
|
||||
# BDB
|
||||
if(${BUILD_WITH_BDB})
|
||||
add_library(bdb STATIC IMPORTED GLOBAL)
|
||||
set_target_properties(bdb PROPERTIES
|
||||
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
|
||||
)
|
||||
target_link_libraries(bdb
|
||||
INTERFACE pthread
|
||||
)
|
||||
endif(${BUILD_WITH_BDB})
|
||||
|
||||
# SQLite
|
||||
# see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334
|
||||
if(${BUILD_WITH_SQLITE})
|
||||
|
|
|
@ -3271,7 +3271,7 @@ typedef struct {
|
|||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
|
||||
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
|
|
|
@ -271,6 +271,7 @@ enum { // WARN: new msg should be appended to segment tail
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
||||
|
|
|
@ -305,6 +305,7 @@ typedef struct SCheckpointInfo {
|
|||
int64_t processedVer; // already processed ver, that has generated results version.
|
||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||
int64_t failedId; // record the latest failed checkpoint id
|
||||
bool dispatchCheckpointTrigger;
|
||||
} SCheckpointInfo;
|
||||
|
||||
typedef struct SStreamStatus {
|
||||
|
@ -390,6 +391,7 @@ typedef struct SHistoryTaskInfo {
|
|||
int32_t retryTimes;
|
||||
int32_t waitInterval;
|
||||
int64_t haltVer; // offset in wal when halt the stream task
|
||||
bool operatorOpen; // false by default
|
||||
} SHistoryTaskInfo;
|
||||
|
||||
typedef struct STaskOutputInfo {
|
||||
|
@ -654,17 +656,21 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
|
|||
typedef struct STaskStatusEntry {
|
||||
STaskId id;
|
||||
int32_t status;
|
||||
int32_t stage;
|
||||
int32_t statusLastDuration; // to record the last duration of current status
|
||||
int64_t stage;
|
||||
int32_t nodeId;
|
||||
int64_t verStart; // start version in WAL, only valid for source task
|
||||
int64_t verEnd; // end version in WAL, only valid for source task
|
||||
int64_t processedVer; // only valid for source task
|
||||
int32_t relatedHTask; // has related fill-history task
|
||||
int64_t activeCheckpointId; // current active checkpoint id
|
||||
bool checkpointFailed; // denote if the checkpoint is failed or not
|
||||
bool inputQChanging; // inputQ is changing or not
|
||||
int64_t inputQUnchangeCounter;
|
||||
double inputQUsed; // in MiB
|
||||
double inputRate;
|
||||
double sinkQuota; // existed quota size for sink task
|
||||
double sinkDataSize; // sink to dest data size
|
||||
double sinkDataSize; // sink to dst data size
|
||||
} STaskStatusEntry;
|
||||
|
||||
typedef struct SStreamHbMsg {
|
||||
|
@ -676,6 +682,7 @@ typedef struct SStreamHbMsg {
|
|||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
|
||||
void streamMetaClearHbMsg(SStreamHbMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
|
|
|
@ -139,6 +139,8 @@ int32_t getWordLength(char type);
|
|||
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
|
||||
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
|
||||
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
|
||||
int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian);
|
||||
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian);
|
||||
|
||||
/*************************************************************************
|
||||
* STREAM COMPRESSION
|
||||
|
|
|
@ -126,9 +126,9 @@ void queryCallback(void* param, void* res, int32_t code) {
|
|||
taos_fetch_raw_block_a(res, fetchCallback, param);
|
||||
}
|
||||
|
||||
void createNewTable(TAOS* pConn, int32_t index) {
|
||||
void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t startTs, const char* pVarchar) {
|
||||
char str[1024] = {0};
|
||||
sprintf(str, "create table tu%d using st2 tags(%d)", index, index);
|
||||
sprintf(str, "create table if not exists tu%d using st2 tags(%d)", index, index);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, str);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
|
@ -136,22 +136,43 @@ void createNewTable(TAOS* pConn, int32_t index) {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
for (int32_t i = 0; i < 10000; i += 20) {
|
||||
char sql[1024] = {0};
|
||||
sprintf(sql,
|
||||
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
|
||||
index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7,
|
||||
i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14,
|
||||
i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
|
||||
TAOS_RES* p = taos_query(pConn, sql);
|
||||
if (taos_errno(p) != 0) {
|
||||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
}
|
||||
if (startTs == 0) {
|
||||
for (int32_t i = 0; i < numOfRows; i += 20) {
|
||||
char sql[1024] = {0};
|
||||
sprintf(sql,
|
||||
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
|
||||
index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7,
|
||||
i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14,
|
||||
i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
|
||||
TAOS_RES* p = taos_query(pConn, sql);
|
||||
if (taos_errno(p) != 0) {
|
||||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
}
|
||||
|
||||
taos_free_result(p);
|
||||
taos_free_result(p);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfRows; i += 20) {
|
||||
char sql[1024*50] = {0};
|
||||
sprintf(sql,
|
||||
"insert into tu%d values(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, "
|
||||
"%d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, "
|
||||
"'%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')",
|
||||
index, startTs, i, pVarchar, startTs + 1, i + 1, pVarchar, startTs + 2, i + 2, pVarchar, startTs + 3, i + 3, pVarchar, startTs + 4, i + 4,
|
||||
pVarchar, startTs + 5, i + 5, pVarchar, startTs + 6, i + 6, pVarchar, startTs + 7, i + 7, pVarchar, startTs + 8, i + 8, pVarchar, startTs + 9, i + 9,
|
||||
pVarchar, startTs + 10, i + 10, pVarchar, startTs + 11, i + 11, pVarchar, startTs + 12, i + 12, pVarchar, startTs + 13, i + 13, pVarchar, startTs + 14,
|
||||
i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18,
|
||||
pVarchar, startTs + 19, i + 19, pVarchar);
|
||||
TAOS_RES* p = taos_query(pConn, sql);
|
||||
if (taos_errno(p) != 0) {
|
||||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
}
|
||||
|
||||
taos_free_result(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -808,14 +829,7 @@ TEST(clientCase, projection_query_tables) {
|
|||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
|
||||
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
@ -828,28 +842,32 @@ TEST(clientCase, projection_query_tables) {
|
|||
taos_free_result(pRes);
|
||||
|
||||
int64_t start = 1685959190000;
|
||||
const char* pstr =
|
||||
"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefgh"
|
||||
"ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnop"
|
||||
"qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwx"
|
||||
"yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef"
|
||||
"ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:"
|
||||
"QWERTYUIOP{}";
|
||||
|
||||
int32_t code = -1;
|
||||
for(int32_t i = 0; i < 1000000; ++i) {
|
||||
char t[512] = {0};
|
||||
for(int32_t i = 0; i < 10000; ++i) {
|
||||
char str[1024] = {0};
|
||||
sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i);
|
||||
|
||||
sprintf(t, "insert into t1 values(now, %d)", i);
|
||||
while(1) {
|
||||
void* p = taos_query(pConn, t);
|
||||
code = taos_errno(p);
|
||||
taos_free_result(p);
|
||||
if (code != 0) {
|
||||
printf("insert data error, retry\n");
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
TAOS_RES* px = taos_query(pConn, str);
|
||||
if (taos_errno(px) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(px);
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < 5000; ++j) {
|
||||
start += 20;
|
||||
for (int32_t i = 0; i < 10000; ++i) {
|
||||
createNewTable(pConn, i, 20, start, pstr);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
printf("create table :%d\n", i);
|
||||
createNewTable(pConn, i);
|
||||
}
|
||||
//
|
||||
// pRes = taos_query(pConn, "select * from tu");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
|
|
|
@ -83,6 +83,7 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
|
|
@ -831,6 +831,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -52,7 +52,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
|
||||
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb);
|
||||
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock);
|
||||
|
||||
// for sma
|
||||
// TODO refactor
|
||||
|
|
|
@ -78,6 +78,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
|||
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||
|
||||
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||
|
||||
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
|
||||
|
@ -146,6 +148,7 @@ void mndCleanupStream(SMnode *pMnode) {
|
|||
taosArrayDestroy(execInfo.pTaskList);
|
||||
taosHashCleanup(execInfo.pTaskMap);
|
||||
taosHashCleanup(execInfo.transMgmt.pDBTrans);
|
||||
// taosHashCleanup(execInfo.transMgmt.pWaitingList);
|
||||
taosThreadMutexDestroy(&execInfo.lock);
|
||||
mDebug("mnd stream exec info cleanup");
|
||||
}
|
||||
|
@ -739,13 +742,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
|||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -754,11 +759,11 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
|||
|
||||
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
SStreamObj *pStream = NULL;
|
||||
SStreamObj streamObj = {0};
|
||||
char *sql = NULL;
|
||||
int32_t sqlLen = 0;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
SCMCreateStreamReq createStreamReq = {0};
|
||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
||||
|
@ -781,7 +786,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
if (pStream != NULL) {
|
||||
if (createStreamReq.igExists) {
|
||||
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
|
||||
|
@ -804,8 +808,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = checkForNumOfStreams(pMnode, &streamObj);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -868,8 +871,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
saveStreamTasksInfo(&streamObj, &execInfo);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
SName dbname = {0};
|
||||
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
|
@ -886,7 +887,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
}
|
||||
|
||||
|
@ -896,7 +897,8 @@ _OVER:
|
|||
if (sql != NULL) {
|
||||
taosMemoryFreeClear(sql);
|
||||
}
|
||||
return code;
|
||||
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int64_t mndStreamGenChkpId(SMnode *pMnode) {
|
||||
|
@ -1344,7 +1346,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
|
||||
if (conflict) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
|
@ -1568,6 +1570,123 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static void setTaskAttrInResBlock(SStreamObj* pStream, SStreamTask* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// stream name
|
||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
||||
|
||||
// task id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
||||
char idstr[128] = {0};
|
||||
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
||||
idstr[2] = '0';
|
||||
idstr[3] = 'x';
|
||||
varDataSetLen(idstr, len + 2);
|
||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||
|
||||
// node type
|
||||
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
varDataSetLen(nodeType, 5);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (pTask->info.nodeId > 0) {
|
||||
memcpy(varDataVal(nodeType), "vnode", 5);
|
||||
} else {
|
||||
memcpy(varDataVal(nodeType), "snode", 5);
|
||||
}
|
||||
colDataSetVal(pColInfo, numOfRows, nodeType, false);
|
||||
|
||||
// node id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
|
||||
|
||||
// level
|
||||
char level[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
memcpy(varDataVal(level), "source", 6);
|
||||
varDataSetLen(level, 6);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
memcpy(varDataVal(level), "agg", 3);
|
||||
varDataSetLen(level, 3);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
memcpy(varDataVal(level), "sink", 4);
|
||||
varDataSetLen(level, 4);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
|
||||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
|
||||
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||
if (pe == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
const char *pStatus = streamTaskGetStatusStr(pe->status);
|
||||
STR_TO_VARSTR(status, pStatus);
|
||||
|
||||
// status
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
|
||||
|
||||
// stage
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
|
||||
|
||||
// input queue
|
||||
char vbuf[30] = {0};
|
||||
char buf[25] = {0};
|
||||
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
|
||||
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
|
||||
STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||
|
||||
// output queue
|
||||
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
||||
// STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
const char *sinkStr = "%.2fMiB";
|
||||
sprintf(buf, sinkStr, pe->sinkDataSize);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// offset info
|
||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
|
||||
}
|
||||
|
||||
STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||
}
|
||||
|
||||
static int32_t getNumOfTasks(SArray* pTaskList) {
|
||||
int32_t numOfLevels = taosArrayGetSize(pTaskList);
|
||||
|
||||
int32_t count = 0;
|
||||
for (int32_t i = 0; i < numOfLevels; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pTaskList, i);
|
||||
count += taosArrayGetSize(pLevel);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode * pMnode = pReq->info.node;
|
||||
SSdb * pSdb = pMnode->pSdb;
|
||||
|
@ -1583,137 +1702,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
// lock
|
||||
taosRLockLatch(&pStream->lock);
|
||||
|
||||
// count task num
|
||||
int32_t sz = taosArrayGetSize(pStream->tasks);
|
||||
|
||||
int32_t count = 0;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
count += taosArrayGetSize(pLevel);
|
||||
}
|
||||
|
||||
int32_t count = getNumOfTasks(pStream->tasks);
|
||||
if (numOfRows + count > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + count);
|
||||
}
|
||||
|
||||
// add row for each task
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t levelCnt = taosArrayGetSize(pLevel);
|
||||
|
||||
for (int32_t j = 0; j < levelCnt; j++) {
|
||||
int32_t numOfLevels = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < numOfLevels; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
|
||||
SColumnInfoData *pColInfo;
|
||||
int32_t cols = 0;
|
||||
|
||||
// stream name
|
||||
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
|
||||
|
||||
// task id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
||||
char idstr[128] = {0};
|
||||
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
||||
idstr[2] = '0';
|
||||
idstr[3] = 'x';
|
||||
varDataSetLen(idstr, len + 2);
|
||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||
|
||||
// node type
|
||||
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
varDataSetLen(nodeType, 5);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
if (pTask->info.nodeId > 0) {
|
||||
memcpy(varDataVal(nodeType), "vnode", 5);
|
||||
} else {
|
||||
memcpy(varDataVal(nodeType), "snode", 5);
|
||||
}
|
||||
colDataSetVal(pColInfo, numOfRows, nodeType, false);
|
||||
|
||||
// node id
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
|
||||
|
||||
// level
|
||||
char level[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
memcpy(varDataVal(level), "source", 6);
|
||||
varDataSetLen(level, 6);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
memcpy(varDataVal(level), "agg", 3);
|
||||
varDataSetLen(level, 3);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
memcpy(varDataVal(level), "sink", 4);
|
||||
varDataSetLen(level, 4);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)level, false);
|
||||
|
||||
// status
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||
if (pe == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const char *pStatus = streamTaskGetStatusStr(pe->status);
|
||||
STR_TO_VARSTR(status, pStatus);
|
||||
|
||||
// status
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
|
||||
|
||||
// stage
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
|
||||
|
||||
// input queue
|
||||
char vbuf[30] = {0};
|
||||
char buf[25] = {0};
|
||||
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
|
||||
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
|
||||
STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||
|
||||
// output queue
|
||||
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
|
||||
// STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
const char *sinkStr = "%.2fMiB";
|
||||
sprintf(buf, sinkStr, pe->sinkDataSize);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// offset info
|
||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||
sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd);
|
||||
}
|
||||
|
||||
STR_TO_VARSTR(vbuf, buf);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
|
||||
|
||||
setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
|
||||
numOfRows++;
|
||||
}
|
||||
}
|
||||
|
||||
// unlock
|
||||
taosRUnLockLatch(&pStream->lock);
|
||||
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
|
@ -1822,7 +1829,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
|
||||
if (conflict) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
|
@ -1957,7 +1964,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
|
||||
if (conflict) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
|
@ -2704,7 +2711,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
|
||||
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
|
||||
STrans* pTrans = mndAcquireTrans(pMnode, transId);
|
||||
if (pTrans != NULL) {
|
||||
mInfo("kill checkpoint transId:%d to reset task status", transId);
|
||||
|
@ -2722,7 +2729,7 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
|
|||
break;
|
||||
}
|
||||
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false);
|
||||
if (conflict) {
|
||||
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
|
||||
pStream->name, pStream->sourceDb, pStream->targetDb);
|
||||
|
@ -2740,6 +2747,91 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
|
||||
int32_t numOfLevels = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < numOfLevels; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
if (pTask->id.taskId == pId->taskId) {
|
||||
return pTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
|
||||
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
|
||||
if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
|
||||
int32_t numOfReady = 0;
|
||||
int32_t numOfTotal = 0;
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
|
||||
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
|
||||
if (pTaskEntry->id.streamId == pId->streamId) {
|
||||
numOfTotal++;
|
||||
|
||||
if (pTaskEntry->id.taskId != pId->taskId) {
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
||||
if (pEntry->status == TASK_STATUS__READY) {
|
||||
numOfReady++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfReady > 0) {
|
||||
mDebug("stream:0x%" PRIx64
|
||||
" %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
|
||||
pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// currently only handle the sink task
|
||||
// 1. sink task, drop related fill-history task msg is missing
|
||||
// 2. other tasks are in ready state for at least 3 * hb_interval
|
||||
static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) {
|
||||
SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream);
|
||||
if (pTask == NULL) {
|
||||
mError("failed to get the stream task:0x%x, may have been dropped", (int32_t) pTaskEntry->id.taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SRpcMsg msg = {.info.noResp = 1};
|
||||
|
||||
initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq));
|
||||
|
||||
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
tmsgSendReq(&epset, &msg);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||
int32_t num = taosArrayGetSize(pNodeList);
|
||||
mInfo("set node expired for %d nodes", num);
|
||||
|
@ -2768,7 +2860,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
|
|||
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||
mInfo("vgId:%d stage updated from %" PRId64 " to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||
|
||||
pNodeEntry->stageUpdated = true;
|
||||
|
@ -2789,6 +2881,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||
|
||||
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
||||
streamMetaClearHbMsg(&req);
|
||||
tDecoderClear(&decoder);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
|
@ -2815,6 +2908,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
|
||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||
|
||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||
if (pTaskEntry == NULL) {
|
||||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||
|
@ -2823,7 +2917,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
|
||||
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
|
||||
updateStageInfo(pTaskEntry, p->stage);
|
||||
// NOTE: uncomment this when merging 3.0 to main.
|
||||
if(pTaskEntry->nodeId == SNODE_HANDLE) {
|
||||
// snodeChanged = true;
|
||||
}
|
||||
} else {
|
||||
// task is idle for more than 50 sec.
|
||||
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
|
||||
if (!pTaskEntry->inputQChanging) {
|
||||
pTaskEntry->inputQUnchangeCounter++;
|
||||
} else {
|
||||
pTaskEntry->inputQChanging = false;
|
||||
}
|
||||
} else {
|
||||
pTaskEntry->inputQChanging = true;
|
||||
pTaskEntry->inputQUnchangeCounter = 0;
|
||||
}
|
||||
|
||||
streamTaskStatusCopy(pTaskEntry, p);
|
||||
if (p->activeCheckpointId != 0) {
|
||||
if (activeCheckpointId != 0) {
|
||||
|
@ -2838,9 +2948,28 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pTaskEntry->status = p->status;
|
||||
if (p->status == pTaskEntry->status) {
|
||||
pTaskEntry->statusLastDuration++;
|
||||
} else {
|
||||
pTaskEntry->status = p->status;
|
||||
pTaskEntry->statusLastDuration = 0;
|
||||
}
|
||||
|
||||
if (p->status != TASK_STATUS__READY) {
|
||||
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
|
||||
|
||||
if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
|
||||
bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
|
||||
if(drop) {
|
||||
SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
|
||||
if (pStreamObj == NULL) {
|
||||
mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
|
||||
} else {
|
||||
mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
|
||||
mndReleaseStream(pMnode, pStreamObj);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2862,8 +2991,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMetaClearHbMsg(&req);
|
||||
|
||||
taosArrayDestroy(req.pTaskStatus);
|
||||
taosArrayDestroy(req.pUpdateNodes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||
if (pStream->uid == streamId) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return pStream;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -35,17 +35,15 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pS
|
|||
}
|
||||
|
||||
int32_t clearFinishedTrans(SMnode* pMnode) {
|
||||
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
|
||||
size_t keyLen = 0;
|
||||
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
|
||||
void* pIter = NULL;
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
void* pIter = NULL;
|
||||
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
|
||||
SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter;
|
||||
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
|
||||
SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter;
|
||||
|
||||
// let's clear the finished trans
|
||||
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
|
||||
if (pTrans == NULL) {
|
||||
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
||||
// key is the name of src/dst db name
|
||||
|
@ -60,44 +58,55 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
|
|||
}
|
||||
|
||||
size_t num = taosArrayGetSize(pList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SKeyInfo* pKey = taosArrayGet(pList, i);
|
||||
taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen);
|
||||
}
|
||||
|
||||
mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
taosArrayDestroy(pList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) {
|
||||
clearFinishedTrans(pMnode);
|
||||
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) {
|
||||
if (lock) {
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
|
||||
if (num <= 0) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
clearFinishedTrans(pMnode);
|
||||
|
||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
|
||||
if (pEntry != NULL) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
|
||||
return true;
|
||||
}
|
||||
|
||||
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
|
||||
if (pEntry != NULL) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
|
||||
return true;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (lock) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
|
|||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -93,7 +93,11 @@ typedef struct SQueryNode SQueryNode;
|
|||
#define VNODE_RSMA2_DIR "rsma2"
|
||||
#define VNODE_TQ_STREAM "stream"
|
||||
|
||||
#if SUSPEND_RESUME_TEST // only for test purpose
|
||||
#define VNODE_BUFPOOL_SEGMENTS 1
|
||||
#else
|
||||
#define VNODE_BUFPOOL_SEGMENTS 3
|
||||
#endif
|
||||
|
||||
#define VND_INFO_FNAME "vnode.json"
|
||||
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
||||
|
@ -232,6 +236,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
|
||||
int32_t tqRestartStreamTasks(STQ* pTq);
|
||||
|
|
|
@ -2017,3 +2017,37 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SVDropHTaskReq* pReq = (SVDropHTaskReq*) pMsg->pCont;
|
||||
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, pReq->taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
|
||||
if (pTask->hTaskInfo.id.taskId == 0) {
|
||||
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY);
|
||||
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
|
||||
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
|
||||
|
||||
// clear the scheduler status
|
||||
streamTaskSetSchedStatusInactive(pTask);
|
||||
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
|
|||
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||
STsdbReader* pReader);
|
||||
|
||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
|
||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
||||
int8_t* pLevel);
|
||||
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
||||
|
@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea
|
|||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
|
||||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
||||
|
@ -168,7 +169,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCostSummary* pCost = &pReader->cost;
|
||||
SReadCostSummary* pCost = &pReader->cost;
|
||||
|
||||
pIter->pLastBlockReader->uid = 0;
|
||||
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
||||
|
@ -291,11 +292,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
|||
}
|
||||
|
||||
static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
|
||||
int32_t code = -1;
|
||||
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
|
||||
|
||||
int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL);
|
||||
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
return code;
|
||||
|
@ -324,22 +321,14 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
|
||||
int32_t code = -1;
|
||||
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexTryLock(&pReader->readerMutex);
|
||||
|
||||
int32_t code = taosThreadMutexTryLock(&pReader->readerMutex);
|
||||
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReleaseReader(STsdbReader* pReader) {
|
||||
int32_t code = -1;
|
||||
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
code = taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
|
||||
int32_t code = taosThreadMutexUnlock(&pReader->readerMutex);
|
||||
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
||||
|
||||
return code;
|
||||
|
@ -432,6 +421,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
|
|||
}
|
||||
|
||||
tsdbInitReaderLock(pReader);
|
||||
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
@ -1015,8 +1005,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
// check if current block are all handled
|
||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
|
||||
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
if (outOfTimeWindow(ts,
|
||||
&pReader->info.window)) { // the remain data has out of query time window, ignore current block
|
||||
if (outOfTimeWindow(ts, &pReader->info.window)) {
|
||||
// the remain data has out of query time window, ignore current block
|
||||
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
|
||||
}
|
||||
} else {
|
||||
|
@ -1123,16 +1113,12 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
|
|||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
// *nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
STableDataBlockIdx* pTableDataBlockIdx =
|
||||
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||
memcpy(pRecord, &p->record, sizeof(SBrinRecord));
|
||||
|
||||
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||
|
||||
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1376,23 +1362,19 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
|
||||
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
|
||||
|
||||
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
|
||||
pBlock->info.id.uid = pBlockScanInfo->uid;
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
updateComposedBlockInfo(pReader, el, pBlockScanInfo);
|
||||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
|
||||
" - %" PRId64 ", uid:%" PRIu64 ", %s",
|
||||
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
||||
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
||||
pBlockScanInfo->uid, pReader->idStr);
|
||||
|
||||
pReader->cost.buildmemBlock += elapsedTime;
|
||||
pReader->cost.buildmemBlock += el;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2293,13 +2275,12 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
|
||||
void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
|
||||
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
|
||||
pResBlock->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
|
||||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
||||
pReader->cost.composedBlocks += 1;
|
||||
|
@ -2356,7 +2337,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
pBlockScanInfo = *pReader->status.pTableIter;
|
||||
if (pReader->pIgnoreTables &&
|
||||
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
|
||||
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -2436,7 +2416,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
|
|||
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
|
||||
}
|
||||
|
||||
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) {
|
||||
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost) {
|
||||
int32_t code = 0;
|
||||
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData);
|
||||
if (newDelDataInFile == 0 &&
|
||||
|
@ -2935,6 +2915,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableUidList* pUidList = &pStatus->uidList;
|
||||
|
||||
tsdbDebug("seq load data blocks from cache, %s", pReader->idStr);
|
||||
|
||||
while (1) {
|
||||
if (pReader->code != TSDB_CODE_SUCCESS) {
|
||||
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
|
||||
|
@ -3043,6 +3025,8 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
|
|||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
|
||||
tsdbDebug("seq load data blocks from stt files %s", pReader->idStr);
|
||||
|
||||
while (1) {
|
||||
terrno = 0;
|
||||
|
||||
|
@ -3774,7 +3758,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
do {
|
||||
// SRow* pTSRow = NULL;
|
||||
TSDBROW row = {.type = -1};
|
||||
bool freeTSRow = false;
|
||||
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
|
||||
|
@ -3783,6 +3766,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
}
|
||||
|
||||
if (row.type == TSDBROW_ROW_FMT) {
|
||||
int64_t ts = row.pTSRow->ts;;
|
||||
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
|
||||
|
||||
if (freeTSRow) {
|
||||
|
@ -3792,13 +3776,17 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
|||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pBlockScanInfo->lastProcKey = ts;
|
||||
} else {
|
||||
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
|
||||
}
|
||||
|
||||
|
||||
// no data in buffer, return immediately
|
||||
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
|
||||
break;
|
||||
|
@ -4107,7 +4095,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
SCostSummary* pCost = &pReader->cost;
|
||||
SReadCostSummary* pCost = &pReader->cost;
|
||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||
if (pFilesetIter->pLastBlockReader != NULL) {
|
||||
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
|
||||
|
@ -4122,6 +4110,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
|
||||
pReader->pReadSnap = NULL;
|
||||
|
||||
tsem_destroy(&pReader->resumeAfterSuspend);
|
||||
tsdbReleaseReader(pReader);
|
||||
tsdbUninitReaderLock(pReader);
|
||||
|
||||
|
@ -4154,6 +4143,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||
|
||||
pReader->status.suspendInvoked = true; // record the suspend status
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
if (pBlockInfo != NULL) {
|
||||
|
@ -4167,84 +4158,34 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
|
||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
|
||||
SCostSummary* pCost = &pReader->cost;
|
||||
SReadCostSummary* pCost = &pReader->cost;
|
||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
||||
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
|
||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||
|
||||
pInfo->iterInit = false;
|
||||
pInfo->iter.hasVal = false;
|
||||
pInfo->iiter.hasVal = false;
|
||||
|
||||
if (pInfo->iter.iter != NULL) {
|
||||
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
|
||||
}
|
||||
|
||||
if (pInfo->iiter.iter != NULL) {
|
||||
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
|
||||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData);
|
||||
}
|
||||
} else {
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
||||
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
|
||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||
|
||||
pInfo->iterInit = false;
|
||||
pInfo->iter.hasVal = false;
|
||||
pInfo->iiter.hasVal = false;
|
||||
|
||||
if (pInfo->iter.iter != NULL) {
|
||||
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
|
||||
}
|
||||
|
||||
if (pInfo->iiter.iter != NULL) {
|
||||
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
|
||||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
}
|
||||
|
||||
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
||||
if (pBlockScanInfo) {
|
||||
// save lastKey to restore memory iterator
|
||||
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
|
||||
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
|
||||
|
||||
// reset current current table's data block scan info,
|
||||
pBlockScanInfo->iterInit = false;
|
||||
|
||||
pBlockScanInfo->iter.hasVal = false;
|
||||
pBlockScanInfo->iiter.hasVal = false;
|
||||
if (pBlockScanInfo->iter.iter != NULL) {
|
||||
pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter);
|
||||
}
|
||||
|
||||
if (pBlockScanInfo->iiter.iter != NULL) {
|
||||
pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter);
|
||||
}
|
||||
|
||||
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
|
||||
pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList);
|
||||
// TODO: keep skyline for reuse
|
||||
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
|
||||
}
|
||||
}
|
||||
|
||||
// resetDataBlockScanInfo excluding lastKey
|
||||
STableBlockScanInfo** p = NULL;
|
||||
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1;
|
||||
|
||||
int32_t iter = 0;
|
||||
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
|
||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||
clearBlockScanInfo(pInfo);
|
||||
pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step;
|
||||
}
|
||||
|
||||
pStatus->uidList.currentIndex = 0;
|
||||
initReaderStatus(pStatus);
|
||||
|
||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
||||
pReader->pReadSnap = NULL;
|
||||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
|
||||
#if SUSPEND_RESUME_TEST
|
||||
tsem_post(&pReader->resumeAfterSuspend);
|
||||
#endif
|
||||
|
||||
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
||||
pReader->idStr);
|
||||
return code;
|
||||
|
@ -4399,6 +4340,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
|||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
|
||||
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
|
||||
// the version range of query will be used to identify the correctness of suspend/resume functions.
|
||||
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
|
||||
#if SUSPEND_RESUME_TEST
|
||||
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
|
||||
tsem_wait(&pReader->resumeAfterSuspend);
|
||||
}
|
||||
#endif
|
||||
|
||||
code = tsdbAcquireReader(pReader);
|
||||
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
||||
|
||||
|
|
|
@ -210,6 +210,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
|
|||
p->iterInit = false;
|
||||
p->iter.hasVal = false;
|
||||
p->iiter.hasVal = false;
|
||||
p->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
|
||||
if (p->iter.iter != NULL) {
|
||||
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||
|
|
|
@ -96,7 +96,7 @@ typedef struct SResultBlockInfo {
|
|||
int64_t capacity;
|
||||
} SResultBlockInfo;
|
||||
|
||||
typedef struct SCostSummary {
|
||||
typedef struct SReadCostSummary {
|
||||
int64_t numOfBlocks;
|
||||
double blockLoadTime;
|
||||
double buildmemBlock;
|
||||
|
@ -110,7 +110,7 @@ typedef struct SCostSummary {
|
|||
double createScanInfoList;
|
||||
double createSkylineIterTime;
|
||||
double initLastBlockReader;
|
||||
} SCostSummary;
|
||||
} SReadCostSummary;
|
||||
|
||||
typedef struct STableUidList {
|
||||
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
||||
|
@ -122,12 +122,6 @@ typedef struct {
|
|||
int32_t numOfSttFiles;
|
||||
} SBlockNumber;
|
||||
|
||||
typedef struct SBlockIndex {
|
||||
int32_t ordinalIndex;
|
||||
int64_t inFileOffset;
|
||||
STimeWindow window; // todo replace it with overlap flag.
|
||||
} SBlockIndex;
|
||||
|
||||
typedef struct SBlockOrderWrapper {
|
||||
int64_t uid;
|
||||
int64_t offset;
|
||||
|
@ -192,6 +186,7 @@ typedef struct SFileBlockDumpInfo {
|
|||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SReaderStatus {
|
||||
bool suspendInvoked;
|
||||
bool loadFromFile; // check file stage
|
||||
bool composedDataBlock; // the returned data block is a composed block or not
|
||||
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||
|
@ -220,7 +215,8 @@ struct STsdbReader {
|
|||
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
||||
SBlockLoadSuppInfo suppInfo;
|
||||
STsdbReadSnap* pReadSnap;
|
||||
SCostSummary cost;
|
||||
tsem_t resumeAfterSuspend;
|
||||
SReadCostSummary cost;
|
||||
SHashObj** pIgnoreTables;
|
||||
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
|
||||
SDataFileReader* pFileReader; // the file reader
|
||||
|
|
|
@ -595,6 +595,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_HTASK_DROP: {
|
||||
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
||||
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_ALTER_CONFIRM:
|
||||
needCommit = pVnode->config.hashChange;
|
||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||
|
|
|
@ -113,25 +113,23 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32
|
|||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||
|
@ -152,8 +150,8 @@ int downloadCheckpoint(char* id, char* path);
|
|||
int deleteCheckpoint(char* id);
|
||||
int deleteCheckpointFile(char* id, char* name);
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -158,6 +158,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
|||
|
||||
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||
if (code == 0) {
|
||||
ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false);
|
||||
streamDispatchStreamBlock(pTask);
|
||||
} else {
|
||||
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
|
@ -278,6 +279,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
|||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||
pTask->checkpointNotReadyTasks = 0;
|
||||
pTask->checkpointAlignCnt = 0;
|
||||
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||
}
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
|
|||
}
|
||||
|
||||
// todo handle memory error
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||
terrno = 0;
|
||||
|
||||
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||
|
|
|
@ -593,6 +593,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->chkInfo.dispatchCheckpointTrigger) {
|
||||
stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id);
|
||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ASSERT(pTask->msgInfo.pData == NULL);
|
||||
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
|
||||
|
||||
|
@ -1039,30 +1045,14 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dispatchDataInFuture(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
if (status == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr);
|
||||
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
||||
} else {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref);
|
||||
streamDispatchStreamBlock(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
// this message has been sent successfully, let's try next one.
|
||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||
|
||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
if (delayDispatch) {
|
||||
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||
}
|
||||
|
||||
pTask->msgInfo.pData = NULL;
|
||||
pTask->msgInfo.dispatchMsgType = 0;
|
||||
|
@ -1083,13 +1073,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
|
||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||
if (delayDispatch) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref);
|
||||
if (pTask->msgInfo.pTimer != NULL) {
|
||||
taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
|
||||
} else {
|
||||
pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer);
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
|||
return code;
|
||||
}
|
||||
|
||||
// checkpoint trigger will be checked
|
||||
streamDispatchStreamBlock(pTask);
|
||||
}
|
||||
|
||||
|
@ -251,14 +252,18 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
|
|||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
qSetStreamOpOpen(exec);
|
||||
if (!pTask->hTaskInfo.operatorOpen) {
|
||||
qSetStreamOpOpen(exec);
|
||||
pTask->hTaskInfo.operatorOpen = true;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
|
||||
stDebug("s-task:%s paused from the scan-history task", id);
|
||||
// quit from step1, not continue to handle the step2
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
}
|
||||
|
@ -266,8 +271,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
|
||||
tstrerror(terrno));
|
||||
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -280,12 +284,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
}
|
||||
|
||||
// dispatch the generated results
|
||||
int32_t code = handleResultBlocks(pTask, pRes, size);
|
||||
/*int32_t code = */handleResultBlocks(pTask, pRes, size);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
// downstream task input queue is full, try in 5sec
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
|
||||
}
|
||||
|
||||
|
@ -293,9 +297,9 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
|
||||
}
|
||||
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
|
||||
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
|
||||
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
|
||||
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
|
||||
pTask->info.fillHistory, el / 1000.0);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
}
|
||||
}
|
||||
|
@ -400,7 +404,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
// 5. save to disk
|
||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
||||
|
||||
// 6. pause allowed.
|
||||
// 6. add empty delete block
|
||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
|
||||
|
@ -542,7 +546,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||
*/
|
||||
int32_t streamExecForAll(SStreamTask* pTask) {
|
||||
int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
// merge multiple input data if possible in the input queue.
|
||||
|
@ -653,7 +657,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
|
|||
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
|
||||
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
||||
while (1) {
|
||||
int32_t code = streamExecForAll(pTask);
|
||||
int32_t code = doStreamExecTask(pTask);
|
||||
if (code < 0) { // todo this status should be removed
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
|
||||
return -1;
|
||||
|
|
|
@ -784,7 +784,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
|
||||
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
|
||||
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
|
||||
|
@ -822,7 +822,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
|||
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
|
||||
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
|
||||
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
|
||||
|
@ -861,10 +861,18 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
|
||||
taosArrayDestroy(pMsg->pTaskStatus);
|
||||
taosArrayDestroy(pMsg->pUpdateNodes);
|
||||
taosArrayDestroy(pIdList);
|
||||
void streamMetaClearHbMsg(SStreamHbMsg* pMsg) {
|
||||
if (pMsg == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pMsg->pUpdateNodes != NULL) {
|
||||
taosArrayDestroy(pMsg->pUpdateNodes);
|
||||
}
|
||||
|
||||
if (pMsg->pTaskStatus != NULL) {
|
||||
taosArrayDestroy(pMsg->pTaskStatus);
|
||||
}
|
||||
}
|
||||
|
||||
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
|
||||
|
@ -1041,7 +1049,8 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
_end:
|
||||
clearHbMsg(&hbMsg, pIdList);
|
||||
streamMetaClearHbMsg(&hbMsg);
|
||||
taosArrayDestroy(pIdList);
|
||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
|
||||
taosReleaseRef(streamMetaId, rid);
|
||||
}
|
||||
|
|
|
@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*pInput = qItem;
|
||||
} else {
|
||||
// merge current block failed, let's handle the already merged blocks.
|
||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
|
||||
if (newRet == NULL) {
|
||||
if (terrno != 0) {
|
||||
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||
|
|
|
@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
}
|
||||
}
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask) {
|
||||
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
streamTaskSetReady(pTask);
|
||||
|
@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
|
||||
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
// set the state to be ready
|
||||
|
@ -470,6 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||
taosGetTimestampMs(), false);
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
|
||||
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
|
||||
taosGetTimestampMs(), false);
|
||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||
}
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
|
@ -1069,8 +1079,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
|
|||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
|
||||
|
||||
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
||||
if (numOfRecv == numOfTotal) {
|
||||
pStartInfo->readyTs = taosGetTimestampMs();
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
|
@ -1084,6 +1095,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
|
|||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
} else {
|
||||
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -448,7 +448,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
|
||||
// 2MiB per second for sink task
|
||||
// 50 times sink operator per second
|
||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
|
||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
|
||||
|
||||
TdThreadMutexAttr attr = {0};
|
||||
int code = taosThreadMutexAttrInit(&attr);
|
||||
|
|
|
@ -457,11 +457,11 @@ void doInitStateTransferTable(void) {
|
|||
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
||||
|
||||
// initialization event handle
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
|
||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
// scan-history related event
|
||||
|
|
|
@ -541,66 +541,71 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
memcpy(output, input + 1, nelements * longBytes);
|
||||
return nelements * longBytes;
|
||||
} else if (input[0] == 1) { // Decompress
|
||||
int64_t *ostream = (int64_t *)output;
|
||||
if (tsSIMDEnable && tsAVX512Enable) {
|
||||
tsDecompressTimestampAvx512(input, nelements, output, false);
|
||||
} else if (tsSIMDEnable && tsAVX2Enable) {
|
||||
tsDecompressTimestampAvx2(input, nelements, output, false);
|
||||
} else {
|
||||
int64_t *ostream = (int64_t *)output;
|
||||
|
||||
int32_t ipos = 1, opos = 0;
|
||||
int8_t nbytes = 0;
|
||||
int64_t prev_value = 0;
|
||||
int64_t prev_delta = 0;
|
||||
int64_t delta_of_delta = 0;
|
||||
int32_t ipos = 1, opos = 0;
|
||||
int8_t nbytes = 0;
|
||||
int64_t prev_value = 0;
|
||||
int64_t prev_delta = 0;
|
||||
int64_t delta_of_delta = 0;
|
||||
|
||||
while (1) {
|
||||
uint8_t flags = input[ipos++];
|
||||
// Decode dd1
|
||||
uint64_t dd1 = 0;
|
||||
nbytes = flags & INT8MASK(4);
|
||||
if (nbytes == 0) {
|
||||
delta_of_delta = 0;
|
||||
} else {
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
while (1) {
|
||||
uint8_t flags = input[ipos++];
|
||||
// Decode dd1
|
||||
uint64_t dd1 = 0;
|
||||
nbytes = flags & INT8MASK(4);
|
||||
if (nbytes == 0) {
|
||||
delta_of_delta = 0;
|
||||
} else {
|
||||
memcpy(&dd1, input + ipos, nbytes);
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
} else {
|
||||
memcpy(&dd1, input + ipos, nbytes);
|
||||
}
|
||||
delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
|
||||
}
|
||||
delta_of_delta = ZIGZAG_DECODE(int64_t, dd1);
|
||||
}
|
||||
ipos += nbytes;
|
||||
if (opos == 0) {
|
||||
prev_value = delta_of_delta;
|
||||
prev_delta = 0;
|
||||
ostream[opos++] = delta_of_delta;
|
||||
} else {
|
||||
|
||||
ipos += nbytes;
|
||||
if (opos == 0) {
|
||||
prev_value = delta_of_delta;
|
||||
prev_delta = 0;
|
||||
ostream[opos++] = delta_of_delta;
|
||||
} else {
|
||||
prev_delta = delta_of_delta + prev_delta;
|
||||
prev_value = prev_value + prev_delta;
|
||||
ostream[opos++] = prev_value;
|
||||
}
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
|
||||
// Decode dd2
|
||||
uint64_t dd2 = 0;
|
||||
nbytes = (flags >> 4) & INT8MASK(4);
|
||||
if (nbytes == 0) {
|
||||
delta_of_delta = 0;
|
||||
} else {
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
} else {
|
||||
memcpy(&dd2, input + ipos, nbytes);
|
||||
}
|
||||
// zigzag_decoding
|
||||
delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
|
||||
}
|
||||
ipos += nbytes;
|
||||
prev_delta = delta_of_delta + prev_delta;
|
||||
prev_value = prev_value + prev_delta;
|
||||
ostream[opos++] = prev_value;
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
}
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
|
||||
// Decode dd2
|
||||
uint64_t dd2 = 0;
|
||||
nbytes = (flags >> 4) & INT8MASK(4);
|
||||
if (nbytes == 0) {
|
||||
delta_of_delta = 0;
|
||||
} else {
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
} else {
|
||||
memcpy(&dd2, input + ipos, nbytes);
|
||||
}
|
||||
// zigzag_decoding
|
||||
delta_of_delta = ZIGZAG_DECODE(int64_t, dd2);
|
||||
}
|
||||
ipos += nbytes;
|
||||
prev_delta = delta_of_delta + prev_delta;
|
||||
prev_value = prev_value + prev_delta;
|
||||
ostream[opos++] = prev_value;
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return nelements * longBytes;
|
||||
}
|
||||
|
||||
/* --------------------------------------------Double Compression ---------------------------------------------- */
|
||||
|
|
|
@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
|
|||
int64_t prev_value = 0;
|
||||
|
||||
#if __AVX2__
|
||||
while (1) {
|
||||
if (_pos == nelements) break;
|
||||
|
||||
uint64_t w = 0;
|
||||
memcpy(&w, ip, LONG_BYTES);
|
||||
while (_pos < nelements) {
|
||||
uint64_t w = *(uint64_t*) ip;
|
||||
|
||||
char selector = (char)(w & INT64MASK(4)); // selector = 4
|
||||
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
|
||||
|
@ -114,7 +111,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
|
|||
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
|
||||
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
|
||||
|
||||
// get the four zigzag values here
|
||||
// get four zigzag values here
|
||||
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
|
||||
|
||||
// calculate the cumulative sum (prefix sum) for each number
|
||||
|
@ -246,6 +243,268 @@ int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelem
|
|||
// todo add later
|
||||
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) {
|
||||
#if __AVX2__
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output,
|
||||
bool bigEndian) {
|
||||
#if 0
|
||||
int64_t *ostream = (int64_t *)output;
|
||||
int32_t ipos = 1, opos = 0;
|
||||
__m128i prevVal = _mm_setzero_si128();
|
||||
__m128i prevDelta = _mm_setzero_si128();
|
||||
|
||||
#if __AVX2__
|
||||
int32_t batch = nelements >> 1;
|
||||
int32_t remainder = nelements & 0x01;
|
||||
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
|
||||
|
||||
int32_t i = 0;
|
||||
if (batch > 1) {
|
||||
// first loop
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
__m128i data1;
|
||||
if (nbytes1 == 0) {
|
||||
data1 = _mm_setzero_si128();
|
||||
} else {
|
||||
memcpy(&data1, (const void*) (input + ipos), nbytes1);
|
||||
}
|
||||
|
||||
__m128i data2;
|
||||
if (nbytes2 == 0) {
|
||||
data2 = _mm_setzero_si128();
|
||||
} else {
|
||||
memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2);
|
||||
}
|
||||
|
||||
data2 = _mm_broadcastq_epi64(data2);
|
||||
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
|
||||
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
|
||||
|
||||
// get two zigzag values here
|
||||
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
|
||||
|
||||
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
|
||||
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
|
||||
|
||||
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
|
||||
_mm_storeu_si128((__m128i *)&ostream[opos], val);
|
||||
|
||||
// keep the previous value
|
||||
prevVal = _mm_shuffle_epi32 (val, 0xEE);
|
||||
|
||||
// keep the previous delta of delta, for the first item
|
||||
prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE);
|
||||
|
||||
opos += 2;
|
||||
ipos += nbytes1 + nbytes2;
|
||||
i += 1;
|
||||
}
|
||||
|
||||
// the remain
|
||||
for(; i < batch; ++i) {
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
// __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
|
||||
// __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
|
||||
__m128i data1;
|
||||
if (nbytes1 == 0) {
|
||||
data1 = _mm_setzero_si128();
|
||||
} else {
|
||||
int64_t dd = 0;
|
||||
memcpy(&dd, (const void*) (input + ipos), nbytes1);
|
||||
data1 = _mm_loadu_si64(&dd);
|
||||
}
|
||||
|
||||
__m128i data2;
|
||||
if (nbytes2 == 0) {
|
||||
data2 = _mm_setzero_si128();
|
||||
} else {
|
||||
int64_t dd = 0;
|
||||
memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2);
|
||||
data2 = _mm_loadu_si64(&dd);
|
||||
}
|
||||
|
||||
data2 = _mm_broadcastq_epi64(data2);
|
||||
|
||||
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
|
||||
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
|
||||
|
||||
// get two zigzag values here
|
||||
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
|
||||
|
||||
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
|
||||
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
|
||||
|
||||
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
|
||||
_mm_storeu_si128((__m128i *)&ostream[opos], val);
|
||||
|
||||
// keep the previous value
|
||||
prevVal = _mm_shuffle_epi32 (val, 0xEE);
|
||||
|
||||
// keep the previous delta of delta
|
||||
__m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta);
|
||||
prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE);
|
||||
|
||||
opos += 2;
|
||||
ipos += nbytes1 + nbytes2;
|
||||
}
|
||||
|
||||
if (remainder > 0) {
|
||||
uint64_t dd = 0;
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int32_t nbytes = flags & INT8MASK(4);
|
||||
int64_t deltaOfDelta = 0;
|
||||
if (nbytes == 0) {
|
||||
deltaOfDelta = 0;
|
||||
} else {
|
||||
// if (is_bigendian()) {
|
||||
// memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
// } else {
|
||||
memcpy(&dd, input + ipos, nbytes);
|
||||
// }
|
||||
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd);
|
||||
}
|
||||
|
||||
ipos += nbytes;
|
||||
if (opos == 0) {
|
||||
ostream[opos++] = deltaOfDelta;
|
||||
} else {
|
||||
int64_t prevDeltaX = deltaOfDelta + prevDelta[1];
|
||||
ostream[opos++] = prevVal[1] + prevDeltaX;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
|
||||
bool UNUSED_PARAM(bigEndian)) {
|
||||
int64_t *ostream = (int64_t *)output;
|
||||
int32_t ipos = 1, opos = 0;
|
||||
|
||||
#if __AVX512VL__
|
||||
|
||||
__m128i prevVal = _mm_setzero_si128();
|
||||
__m128i prevDelta = _mm_setzero_si128();
|
||||
|
||||
int32_t numOfBatch = nelements >> 1;
|
||||
int32_t remainder = nelements & 0x01;
|
||||
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
|
||||
|
||||
int32_t i = 0;
|
||||
if (numOfBatch > 1) {
|
||||
// first loop
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
__m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
|
||||
__m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
|
||||
data2 = _mm_broadcastq_epi64(data2);
|
||||
|
||||
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
|
||||
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
|
||||
|
||||
// get two zigzag values here
|
||||
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
|
||||
|
||||
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
|
||||
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
|
||||
|
||||
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
|
||||
_mm_storeu_si128((__m128i *)&ostream[opos], val);
|
||||
|
||||
// keep the previous value
|
||||
prevVal = _mm_shuffle_epi32 (val, 0xEE);
|
||||
|
||||
// keep the previous delta of delta, for the first item
|
||||
prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE);
|
||||
|
||||
opos += 2;
|
||||
ipos += nbytes1 + nbytes2;
|
||||
i += 1;
|
||||
}
|
||||
|
||||
// the remain
|
||||
for(; i < numOfBatch; ++i) {
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
|
||||
int8_t nbytes2 = (flags >> 4) & INT8MASK(4);
|
||||
|
||||
__m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos));
|
||||
__m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1));
|
||||
data2 = _mm_broadcastq_epi64(data2);
|
||||
|
||||
__m128i zzVal = _mm_blend_epi32(data2, data1, 0x03);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal);
|
||||
signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask);
|
||||
|
||||
// get two zigzag values here
|
||||
__m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask);
|
||||
|
||||
__m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta);
|
||||
deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent);
|
||||
|
||||
__m128i val = _mm_add_epi64(deltaCurrent, prevVal);
|
||||
_mm_storeu_si128((__m128i *)&ostream[opos], val);
|
||||
|
||||
// keep the previous value
|
||||
prevVal = _mm_shuffle_epi32 (val, 0xEE);
|
||||
|
||||
// keep the previous delta of delta
|
||||
__m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta);
|
||||
prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE);
|
||||
|
||||
opos += 2;
|
||||
ipos += nbytes1 + nbytes2;
|
||||
}
|
||||
|
||||
if (remainder > 0) {
|
||||
uint64_t dd = 0;
|
||||
uint8_t flags = input[ipos++];
|
||||
|
||||
int32_t nbytes = flags & INT8MASK(4);
|
||||
int64_t deltaOfDelta = 0;
|
||||
if (nbytes == 0) {
|
||||
deltaOfDelta = 0;
|
||||
} else {
|
||||
memcpy(&dd, input + ipos, nbytes);
|
||||
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd);
|
||||
}
|
||||
|
||||
ipos += nbytes;
|
||||
if (opos == 0) {
|
||||
ostream[opos++] = deltaOfDelta;
|
||||
} else {
|
||||
int64_t prevDeltaX = deltaOfDelta + prevDelta[1];
|
||||
ostream[opos++] = prevVal[1] + prevDeltaX;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdlib.h>
|
||||
#include <tcompression.h>
|
||||
#include <random>
|
||||
|
||||
namespace {} // namespace
|
||||
|
||||
TEST(utilTest, decompress_test) {
|
||||
int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400,
|
||||
1700000500, 1700000600, 1700000700, 1700000800, 1700000900};
|
||||
|
||||
char* pOutput[10 * sizeof(int64_t)] = {0};
|
||||
int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
char* decompOutput[10 * 8] = {0};
|
||||
tsDecompressTimestamp(pOutput, len, 10, decompOutput, sizeof(int64_t)*10, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
for(int32_t i = 0; i < 10; ++i) {
|
||||
std::cout<< ((int64_t*)decompOutput)[i] << std::endl;
|
||||
}
|
||||
|
||||
memset(decompOutput, 0, 10*8);
|
||||
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 10,
|
||||
reinterpret_cast<char* const>(decompOutput), false);
|
||||
|
||||
for(int32_t i = 0; i < 10; ++i) {
|
||||
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int64_t tsList1[7] = {1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000900};
|
||||
int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
memset(decompOutput, 0, 10*8);
|
||||
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 7,
|
||||
reinterpret_cast<char* const>(decompOutput), false);
|
||||
|
||||
for(int32_t i = 0; i < 7; ++i) {
|
||||
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int64_t tsList2[1] = {1700000000};
|
||||
int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
memset(decompOutput, 0, 10*8);
|
||||
tsDecompressTimestampAvx512(reinterpret_cast<const char* const>(pOutput), 1,
|
||||
reinterpret_cast<char* const>(decompOutput), false);
|
||||
|
||||
for(int32_t i = 0; i < 1; ++i) {
|
||||
std::cout<<((int64_t*)decompOutput)[i] << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
TEST(utilTest, decompress_perf_test) {
|
||||
int32_t num = 10000;
|
||||
|
||||
int64_t* pList = static_cast<int64_t*>(taosMemoryCalloc(num, sizeof(int64_t)));
|
||||
int64_t iniVal = 1700000000;
|
||||
|
||||
uint32_t v = 100;
|
||||
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
iniVal += taosRandR(&v)%10;
|
||||
pList[i] = iniVal;
|
||||
}
|
||||
|
||||
char* px = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
|
||||
int32_t len = tsCompressTimestamp(pList, num * sizeof(int64_t), num, px, num, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
char* pOutput = static_cast<char*>(taosMemoryMalloc(num * sizeof(int64_t)));
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
for(int32_t k = 0; k < 10000; ++k) {
|
||||
tsDecompressTimestamp(px, len, num, pOutput, sizeof(int64_t) * num, ONE_STAGE_COMP, NULL, 0);
|
||||
}
|
||||
|
||||
int64_t el1 = taosGetTimestampUs() - st;
|
||||
std::cout << "soft decompress elapsed time:" << el1 << " us" << std::endl;
|
||||
|
||||
memset(pOutput, 0, num * sizeof(int64_t));
|
||||
st = taosGetTimestampUs();
|
||||
for(int32_t k = 0; k < 10000; ++k) {
|
||||
tsDecompressTimestampAvx512(px, num, pOutput, false);
|
||||
}
|
||||
|
||||
int64_t el2 = taosGetTimestampUs() - st;
|
||||
std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl;
|
||||
|
||||
taosMemoryFree(pList);
|
||||
taosMemoryFree(pOutput);
|
||||
taosMemoryFree(px);
|
||||
}
|
||||
|
|
@ -80,6 +80,7 @@ sql use test2;
|
|||
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791212001,2,2,3,1.1);
|
||||
|
|
Loading…
Reference in New Issue