diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in deleted file mode 100644 index dd21020105..0000000000 --- a/cmake/bdb_CMakeLists.txt.in +++ /dev/null @@ -1,13 +0,0 @@ - -# bdb -ExternalProject_Add(bdb - GIT_REPOSITORY https://github.com/berkeleydb/libdb.git - GIT_TAG v5.3.28 - SOURCE_DIR "${TD_CONTRIB_DIR}/bdb" - BINARY_DIR "${TD_CONTRIB_DIR}/bdb" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND COMMAND ./dist/configure --enable-debug - BUILD_COMMAND "$(MAKE)" - INSTALL_COMMAND "" - TEST_COMMAND "" -) diff --git a/cmake/cmake.define b/cmake/cmake.define index 44b36d0efa..5c13181099 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -151,6 +151,7 @@ ELSE () CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F) CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI) + CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL) IF (COMPILER_SUPPORT_SSE42) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") @@ -158,11 +159,11 @@ ELSE () ENDIF() IF ("${SIMD_SUPPORT}" MATCHES "true") - IF (COMPILER_SUPPORT_FMA) + IF (COMPILER_SUPPORT_FMA) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma") - ENDIF() - IF (COMPILER_SUPPORT_AVX) + ENDIF() + IF (COMPILER_SUPPORT_AVX) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") ENDIF() @@ -175,7 +176,13 @@ ELSE () IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") - MESSAGE(STATUS "avx512 supported by gcc") + MESSAGE(STATUS "avx512f/avx512bmi supported by compiler") + ENDIF() + + IF (COMPILER_SUPPORT_AVX512VL) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512vl") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl") + MESSAGE(STATUS "avx512vl supported by compiler") ENDIF() ENDIF() diff --git a/cmake/leveldb_CMakeLists.txt.in b/cmake/leveldb_CMakeLists.txt.in deleted file mode 100644 index 6878711bc1..0000000000 --- a/cmake/leveldb_CMakeLists.txt.in +++ /dev/null @@ -1,13 +0,0 @@ - -# leveldb -ExternalProject_Add(leveldb - GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git - GIT_TAG master - SOURCE_DIR "${TD_CONTRIB_DIR}/leveldb" - BINARY_DIR "" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) \ No newline at end of file diff --git a/cmake/lucene_CMakeLists.txt.in b/cmake/lucene_CMakeLists.txt.in deleted file mode 100644 index 9fd7471705..0000000000 --- a/cmake/lucene_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# lucene -ExternalProject_Add(lucene - GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git - SOURCE_DIR "${TD_CONTRIB_DIR}/lucene" - BINARY_DIR "" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) diff --git a/cmake/nuraft_CMakeLists.txt.in b/cmake/nuraft_CMakeLists.txt.in deleted file mode 100644 index 593c6fed26..0000000000 --- a/cmake/nuraft_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# NuRaft -ExternalProject_Add(NuRaft - GIT_REPOSITORY https://github.com/eBay/NuRaft.git - GIT_TAG v1.3.0 - SOURCE_DIR "${TD_CONTRIB_DIR}/nuraft" - BINARY_DIR "${TD_CONTRIB_DIR}/nuraft" - CONFIGURE_COMMAND "./prepare.sh" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) \ No newline at end of file diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c5715bd53f..b786f2df25 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -109,11 +109,6 @@ cat("${TD_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) # cJson cat("${TD_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -# leveldb -if(${BUILD_WITH_LEVELDB}) - cat("${TD_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_LEVELDB}) - if (${BUILD_CONTRIB}) if(${BUILD_WITH_ROCKSDB}) cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -132,28 +127,11 @@ else() endif() endif() -# canonical-raft -if(${BUILD_WITH_CRAFT}) - cat("${TD_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE) -endif(${BUILD_WITH_CRAFT}) - -# traft -if(${BUILD_WITH_TRAFT}) - cat("${TD_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE) -endif(${BUILD_WITH_TRAFT}) - #libuv if(${BUILD_WITH_UV}) cat("${TD_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_UV}) -# bdb -if(${BUILD_WITH_BDB}) - cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_BDB}) - # sqlite if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -178,17 +156,6 @@ elseif(${BUILD_WITH_COS}) endif() -# lucene -if(${BUILD_WITH_LUCENE}) - cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - add_definitions(-DUSE_LUCENE) -endif(${BUILD_WITH_LUCENE}) - -# NuRaft -if(${BUILD_WITH_NURAFT}) - cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_NURAFT}) - # crashdump if(${BUILD_CRASHDUMP}) cat("${TD_SUPPORT_DIR}/crashdump_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -437,23 +404,6 @@ elseif(${BUILD_WITH_COS}) endif() -# lucene -# To support build on ubuntu: sudo apt-get install libboost-all-dev -if(${BUILD_WITH_LUCENE}) - option(ENABLE_TEST "Enable the tests" OFF) - add_subdirectory(lucene EXCLUDE_FROM_ALL) - target_include_directories( - lucene++ - PUBLIC $ - ) - -endif(${BUILD_WITH_LUCENE}) - -# NuRaft -if(${BUILD_WITH_NURAFT}) - add_subdirectory(nuraft EXCLUDE_FROM_ALL) -endif(${BUILD_WITH_NURAFT}) - # pthread if(${BUILD_PTHREAD}) set(CMAKE_BUILD_TYPE debug) @@ -524,30 +474,6 @@ if(${BUILD_WCWIDTH}) SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth) endif(${BUILD_WCWIDTH}) -# CRAFT -if(${BUILD_WITH_CRAFT}) - add_library(craft STATIC IMPORTED GLOBAL) - set_target_properties(craft PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft/include" - ) - # target_link_libraries(craft - # INTERFACE pthread - # ) -endif(${BUILD_WITH_CRAFT}) - -# TRAFT -if(${BUILD_WITH_TRAFT}) - add_library(traft STATIC IMPORTED GLOBAL) - set_target_properties(traft PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include" - ) - # target_link_libraries(craft - # INTERFACE pthread - # ) -endif(${BUILD_WITH_TRAFT}) - # LIBUV if(${BUILD_WITH_UV}) if (TD_WINDOWS) @@ -559,18 +485,6 @@ if(${BUILD_WITH_UV}) add_subdirectory(libuv EXCLUDE_FROM_ALL) endif(${BUILD_WITH_UV}) -# BDB -if(${BUILD_WITH_BDB}) - add_library(bdb STATIC IMPORTED GLOBAL) - set_target_properties(bdb PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb" - ) - target_link_libraries(bdb - INTERFACE pthread - ) -endif(${BUILD_WITH_BDB}) - # SQLite # see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334 if(${BUILD_WITH_SQLITE}) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e17a72992c..73d1ab2473 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3271,7 +3271,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq; +} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq; typedef struct { int8_t reserved; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4eb8328caa..2d1148a209 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -271,6 +271,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d148d7f85..12f7ffb1cd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -305,6 +305,7 @@ typedef struct SCheckpointInfo { int64_t processedVer; // already processed ver, that has generated results version. int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id + bool dispatchCheckpointTrigger; } SCheckpointInfo; typedef struct SStreamStatus { @@ -390,6 +391,7 @@ typedef struct SHistoryTaskInfo { int32_t retryTimes; int32_t waitInterval; int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { @@ -654,17 +656,21 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; - int32_t stage; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; int32_t nodeId; int64_t verStart; // start version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task int64_t processedVer; // only valid for source task + int32_t relatedHTask; // has related fill-history task int64_t activeCheckpointId; // current active checkpoint id bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dest data size + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { @@ -676,6 +682,7 @@ typedef struct SStreamHbMsg { int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +void streamMetaClearHbMsg(SStreamHbMsg* pMsg); typedef struct { int64_t streamId; diff --git a/include/util/tcompression.h b/include/util/tcompression.h index ab0c22fc9b..75ddbb12e7 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -139,6 +139,8 @@ int32_t getWordLength(char type); int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian); +int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index addf0aa629..e78783cf3c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -126,9 +126,9 @@ void queryCallback(void* param, void* res, int32_t code) { taos_fetch_raw_block_a(res, fetchCallback, param); } -void createNewTable(TAOS* pConn, int32_t index) { +void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t startTs, const char* pVarchar) { char str[1024] = {0}; - sprintf(str, "create table tu%d using st2 tags(%d)", index, index); + sprintf(str, "create table if not exists tu%d using st2 tags(%d)", index, index); TAOS_RES* pRes = taos_query(pConn, str); if (taos_errno(pRes) != 0) { @@ -136,22 +136,43 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for (int32_t i = 0; i < 10000; i += 20) { - char sql[1024] = {0}; - sprintf(sql, - "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", - index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, - i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, - i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } + if (startTs == 0) { + for (int32_t i = 0; i < numOfRows; i += 20) { + char sql[1024] = {0}; + sprintf(sql, + "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", + index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, + i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, + i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } - taos_free_result(p); + taos_free_result(p); + } + } else { + for (int32_t i = 0; i < numOfRows; i += 20) { + char sql[1024*50] = {0}; + sprintf(sql, + "insert into tu%d values(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, " + "%d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, " + "'%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')", + index, startTs, i, pVarchar, startTs + 1, i + 1, pVarchar, startTs + 2, i + 2, pVarchar, startTs + 3, i + 3, pVarchar, startTs + 4, i + 4, + pVarchar, startTs + 5, i + 5, pVarchar, startTs + 6, i + 6, pVarchar, startTs + 7, i + 7, pVarchar, startTs + 8, i + 8, pVarchar, startTs + 9, i + 9, + pVarchar, startTs + 10, i + 10, pVarchar, startTs + 11, i + 11, pVarchar, startTs + 12, i + 12, pVarchar, startTs + 13, i + 13, pVarchar, startTs + 14, + i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18, + pVarchar, startTs + 19, i + 19, pVarchar); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } } } @@ -808,14 +829,7 @@ TEST(clientCase, projection_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)"); + pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } @@ -828,28 +842,32 @@ TEST(clientCase, projection_query_tables) { taos_free_result(pRes); int64_t start = 1685959190000; + const char* pstr = + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefgh" + "ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnop" + "qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwx" + "yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef" + "ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:" + "QWERTYUIOP{}"; - int32_t code = -1; - for(int32_t i = 0; i < 1000000; ++i) { - char t[512] = {0}; + for(int32_t i = 0; i < 10000; ++i) { + char str[1024] = {0}; + sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i); - sprintf(t, "insert into t1 values(now, %d)", i); - while(1) { - void* p = taos_query(pConn, t); - code = taos_errno(p); - taos_free_result(p); - if (code != 0) { - printf("insert data error, retry\n"); - } else { - break; - } + TAOS_RES* px = taos_query(pConn, str); + if (taos_errno(px) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(px); + } + + for(int32_t j = 0; j < 5000; ++j) { + start += 20; + for (int32_t i = 0; i < 10000; ++i) { + createNewTable(pConn, i, 20, start, pstr); } } - for (int32_t i = 0; i < 1; ++i) { - printf("create table :%d\n", i); - createNewTable(pConn, i); - } // // pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index b29c5c1eb4..85cabb0934 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -83,6 +83,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c4d525a871..836adcd115 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -831,6 +831,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 244a6d08dd..f2b3ce6cac 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -52,7 +52,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); -bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); +bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0e141f2d68..03d877e98e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -78,6 +78,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); +static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); + static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg); @@ -146,6 +148,7 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); +// taosHashCleanup(execInfo.transMgmt.pWaitingList); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } @@ -739,13 +742,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_TOO_MANY_STREAMS; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { mError("Cannot write the same stable as other stream:%s", pStream->name); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_INVALID_TARGET_TABLE; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } } @@ -754,11 +759,11 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - int32_t code = -1; SStreamObj *pStream = NULL; SStreamObj streamObj = {0}; char *sql = NULL; int32_t sqlLen = 0; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createStreamReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { @@ -781,7 +786,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream != NULL) { if (createStreamReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); - code = 0; goto _OVER; } else { terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; @@ -804,8 +808,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - code = checkForNumOfStreams(pMnode, &streamObj); - if (code != TSDB_CODE_SUCCESS) { + if (checkForNumOfStreams(pMnode, &streamObj) < 0) { goto _OVER; } @@ -868,8 +871,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); - code = TSDB_CODE_ACTION_IN_PROGRESS; - SName dbname = {0}; tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -886,7 +887,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } _OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } @@ -896,7 +897,8 @@ _OVER: if (sql != NULL) { taosMemoryFreeClear(sql); } - return code; + + return terrno; } int64_t mndStreamGenChkpId(SMnode *pMnode) { @@ -1344,7 +1346,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); @@ -1568,6 +1570,123 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } +static void setTaskAttrInResBlock(SStreamObj* pStream, SStreamTask* pTask, SSDataBlock* pBlock, int32_t numOfRows) { + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + + char idstr[128] = {0}; + int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pTask->info.nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + colDataSetVal(pColInfo, numOfRows, nodeType, false); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int64_t nodeId = TMAX(pTask->info.nodeId, 0); + colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)level, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + return; + } + + const char *pStatus = streamTaskGetStatusStr(pe->status); + STR_TO_VARSTR(status, pStatus); + + // status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + + // stage + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); + + // input queue + char vbuf[30] = {0}; + char buf[25] = {0}; + const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; + sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + + // output queue + // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); + // STR_TO_VARSTR(vbuf, buf); + + // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + const char *sinkStr = "%.2fMiB"; + sprintf(buf, sinkStr, pe->sinkDataSize); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + // offset info + const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; + sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); + } + + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); +} + +static int32_t getNumOfTasks(SArray* pTaskList) { + int32_t numOfLevels = taosArrayGetSize(pTaskList); + + int32_t count = 0; + for (int32_t i = 0; i < numOfLevels; i++) { + SArray *pLevel = taosArrayGetP(pTaskList, i); + count += taosArrayGetSize(pLevel); + } + + return count; +} + static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode * pMnode = pReq->info.node; SSdb * pSdb = pMnode->pSdb; @@ -1583,137 +1702,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // lock taosRLockLatch(&pStream->lock); - // count task num - int32_t sz = taosArrayGetSize(pStream->tasks); - - int32_t count = 0; - for (int32_t i = 0; i < sz; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - count += taosArrayGetSize(pLevel); - } - + int32_t count = getNumOfTasks(pStream->tasks); if (numOfRows + count > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + count); } // add row for each task - for (int32_t i = 0; i < sz; i++) { + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); - int32_t levelCnt = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < levelCnt; j++) { + int32_t numOfLevels = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfLevels; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - - SColumnInfoData *pColInfo; - int32_t cols = 0; - - // stream name - char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); - - // task id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - char idstr[128] = {0}; - int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); - colDataSetVal(pColInfo, numOfRows, idstr, false); - - // node type - char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; - varDataSetLen(nodeType, 5); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pTask->info.nodeId > 0) { - memcpy(varDataVal(nodeType), "vnode", 5); - } else { - memcpy(varDataVal(nodeType), "snode", 5); - } - colDataSetVal(pColInfo, numOfRows, nodeType, false); - - // node id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int64_t nodeId = TMAX(pTask->info.nodeId, 0); - colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); - - // level - char level[20 + VARSTR_HEADER_SIZE] = {0}; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - memcpy(varDataVal(level), "source", 6); - varDataSetLen(level, 6); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - memcpy(varDataVal(level), "agg", 3); - varDataSetLen(level, 3); - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - memcpy(varDataVal(level), "sink", 4); - varDataSetLen(level, 4); - } - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)level, false); - - // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (pe == NULL) { - continue; - } - - const char *pStatus = streamTaskGetStatusStr(pe->status); - STR_TO_VARSTR(status, pStatus); - - // status - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)status, false); - - // stage - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); - - // input queue - char vbuf[30] = {0}; - char buf[25] = {0}; - const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; - sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - - // output queue - // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); - // STR_TO_VARSTR(vbuf, buf); - - // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char *sinkStr = "%.2fMiB"; - sprintf(buf, sinkStr, pe->sinkDataSize); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - // offset info - const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); - } - - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - + setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); numOfRows++; } } // unlock taosRUnLockLatch(&pStream->lock); - sdbRelease(pSdb, pStream); } @@ -1822,7 +1829,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; @@ -1957,7 +1964,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; @@ -2704,7 +2711,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le return TSDB_CODE_SUCCESS; } -int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { STrans* pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill checkpoint transId:%d to reset task status", transId); @@ -2722,7 +2729,7 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { break; } - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false); if (conflict) { mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetDb); @@ -2740,6 +2747,91 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { return 0; } +static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) { + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfLevels = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfLevels; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + if (pTask->id.taskId == pId->taskId) { + return pTask; + } + } + } + + return NULL; +} + +static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { + if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { + if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { + int32_t numOfReady = 0; + int32_t numOfTotal = 0; + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pTaskEntry->id.streamId == pId->streamId) { + numOfTotal++; + + if (pTaskEntry->id.taskId != pId->taskId) { + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->status == TASK_STATUS__READY) { + numOfReady++; + } + } + } + } + + if (numOfReady > 0) { + mDebug("stream:0x%" PRIx64 + " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", + pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); + return true; + } else { + return false; + } + } + } + + return false; +} + +// currently only handle the sink task +// 1. sink task, drop related fill-history task msg is missing +// 2. other tasks are in ready state for at least 3 * hb_interval +static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) { + SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream); + if (pTask == NULL) { + mError("failed to get the stream task:0x%x, may have been dropped", (int32_t) pTaskEntry->id.taskId); + return -1; + } + + SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq)); + if (pReq == NULL) { + mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SRpcMsg msg = {.info.noResp = 1}; + + initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq)); + + mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId); + + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + tmsgSendReq(&epset, &msg); + return TSDB_CODE_SUCCESS; +} + int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); mInfo("set node expired for %d nodes", num); @@ -2768,7 +2860,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { for(int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, + mInfo("vgId:%d stage updated from %" PRId64 " to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); pNodeEntry->stageUpdated = true; @@ -2789,6 +2881,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + streamMetaClearHbMsg(&req); tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -2815,6 +2908,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); @@ -2823,7 +2917,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); + // NOTE: uncomment this when merging 3.0 to main. + if(pTaskEntry->nodeId == SNODE_HANDLE) { +// snodeChanged = true; + } } else { + // task is idle for more than 50 sec. + if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging) { + pTaskEntry->inputQUnchangeCounter++; + } else { + pTaskEntry->inputQChanging = false; + } + } else { + pTaskEntry->inputQChanging = true; + pTaskEntry->inputQUnchangeCounter = 0; + } + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { @@ -2838,9 +2948,28 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - pTaskEntry->status = p->status; + if (p->status == pTaskEntry->status) { + pTaskEntry->statusLastDuration++; + } else { + pTaskEntry->status = p->status; + pTaskEntry->statusLastDuration = 0; + } + if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); + + if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { + bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); + if(drop) { + SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); + if (pStreamObj == NULL) { + mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); + } else { + mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); + mndReleaseStream(pMnode, pStreamObj); + } + } + } } } @@ -2862,8 +2991,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); + streamMetaClearHbMsg(&req); - taosArrayDestroy(req.pTaskStatus); - taosArrayDestroy(req.pUpdateNodes); return TSDB_CODE_SUCCESS; } + +SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { + if (pStream->uid == streamId) { + sdbCancelFetch(pSdb, pIter); + return pStream; + } + } + + return NULL; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 2345de290a..9dd9f64037 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -35,17 +35,15 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pS } int32_t clearFinishedTrans(SMnode* pMnode) { - SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); size_t keyLen = 0; + SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); + void* pIter = NULL; - taosThreadMutexLock(&execInfo.lock); - - void* pIter = NULL; while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { - SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; - STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); + SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter; // let's clear the finished trans + STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); if (pTrans == NULL) { void* pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name @@ -60,44 +58,55 @@ int32_t clearFinishedTrans(SMnode* pMnode) { } size_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SKeyInfo* pKey = taosArrayGet(pList, i); taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); } - mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); - taosThreadMutexUnlock(&execInfo.lock); + mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); terrno = TSDB_CODE_SUCCESS; taosArrayDestroy(pList); return 0; } -bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { - clearFinishedTrans(pMnode); +bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) { + if (lock) { + taosThreadMutexLock(&execInfo.lock); + } - taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } return false; } + clearFinishedTrans(pMnode); + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); if (pEntry != NULL) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); if (pEntry != NULL) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } + return false; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c9756ef814..7749decf91 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) { if (mndCheckTransConflict(pMnode, pTrans)) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - return -1; + return terrno; } return 0; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index df1720d4a7..f3f84896ad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,7 +93,11 @@ typedef struct SQueryNode SQueryNode; #define VNODE_RSMA2_DIR "rsma2" #define VNODE_TQ_STREAM "stream" +#if SUSPEND_RESUME_TEST // only for test purpose +#define VNODE_BUFPOOL_SEGMENTS 1 +#else #define VNODE_BUFPOOL_SEGMENTS 3 +#endif #define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" @@ -232,6 +236,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg); int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); int32_t tqRestartStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ff1f08076b..76475d7c60 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -2017,3 +2017,37 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } + +int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { + SVDropHTaskReq* pReq = (SVDropHTaskReq*) pMsg->pCont; + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, pReq->taskId); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr); + if (pTask->hTaskInfo.id.taskId == 0) { + tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY); + + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + + SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); + + // clear the scheduler status + streamTaskSetSchedStatusInactive(pTask); + tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; +} + diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 751df706ab..cb899f9ee8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); @@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -168,7 +169,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo return TSDB_CODE_SUCCESS; } - SCostSummary* pCost = &pReader->cost; + SReadCostSummary* pCost = &pReader->cost; pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); @@ -291,11 +292,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) } static int32_t tsdbInitReaderLock(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexInit(&pReader->readerMutex, NULL); - + int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL); qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; @@ -324,22 +321,14 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) { } static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexTryLock(&pReader->readerMutex); - + int32_t code = taosThreadMutexTryLock(&pReader->readerMutex); qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbReleaseReader(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexUnlock(&pReader->readerMutex); - + int32_t code = taosThreadMutexUnlock(&pReader->readerMutex); qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; @@ -432,6 +421,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void } tsdbInitReaderLock(pReader); + tsem_init(&pReader->resumeAfterSuspend, 0, 0); *ppReader = pReader; return code; @@ -1015,8 +1005,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (outOfTimeWindow(ts, - &pReader->info.window)) { // the remain data has out of query time window, ignore current block + if (outOfTimeWindow(ts, &pReader->info.window)) { + // the remain data has out of query time window, ignore current block setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { @@ -1123,16 +1113,12 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo } int32_t step = asc ? 1 : -1; - // *nextIndex = pBlockInfo->tbBlockIdx + step; - // *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); memcpy(pRecord, &p->record, sizeof(SBrinRecord)); *nextIndex = pBlockInfo->tbBlockIdx + step; - - // tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); return true; } @@ -1376,23 +1362,19 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int64_t st = taosGetTimestampUs(); + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); - blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]); - pBlock->info.id.uid = pBlockScanInfo->uid; + double el = (taosGetTimestampUs() - st) / 1000.0; + updateComposedBlockInfo(pReader, el, pBlockScanInfo); - setComposedBlockFlag(pReader, true); - - double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 " - %" PRId64 ", uid:%" PRIu64 ", %s", - pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, + pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid, pReader->idStr); - pReader->cost.buildmemBlock += elapsedTime; + pReader->cost.buildmemBlock += el; return code; } @@ -2293,13 +2275,12 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock return code; } -static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { +void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); - setComposedBlockFlag(pReader, true); pReader->cost.composedBlocks += 1; @@ -2356,7 +2337,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pBlockScanInfo = *pReader->status.pTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { - // setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order); return code; } } @@ -2436,7 +2416,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; } -int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) { +int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost) { int32_t code = 0; int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData); if (newDelDataInFile == 0 && @@ -2935,6 +2915,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; + tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); @@ -3043,6 +3025,8 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SDataBlockIter* pBlockIter = &pReader->status.blockIter; + tsdbDebug("seq load data blocks from stt files %s", pReader->idStr); + while (1) { terrno = 0; @@ -3774,7 +3758,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e int32_t code = TSDB_CODE_SUCCESS; do { - // SRow* pTSRow = NULL; TSDBROW row = {.type = -1}; bool freeTSRow = false; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); @@ -3783,6 +3766,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } if (row.type == TSDBROW_ROW_FMT) { + int64_t ts = row.pTSRow->ts;; code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); if (freeTSRow) { @@ -3792,13 +3776,17 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e if (code) { return code; } + + pBlockScanInfo->lastProcKey = ts; } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { break; } + pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow]; } + // no data in buffer, return immediately if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { break; @@ -4107,7 +4095,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); } - SCostSummary* pCost = &pReader->cost; + SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; @@ -4122,6 +4110,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true); pReader->pReadSnap = NULL; + tsem_destroy(&pReader->resumeAfterSuspend); tsdbReleaseReader(pReader); tsdbUninitReaderLock(pReader); @@ -4154,6 +4143,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableBlockScanInfo* pBlockScanInfo = NULL; + pReader->status.suspendInvoked = true; // record the suspend status + if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { @@ -4167,84 +4158,34 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); - SCostSummary* pCost = &pReader->cost; + SReadCostSummary* pCost = &pReader->cost; pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); - // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo** p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } - - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData); - } - } else { - // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo** p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } - - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - } - - pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; - if (pBlockScanInfo) { - // save lastKey to restore memory iterator - STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; - pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; - - // reset current current table's data block scan info, - pBlockScanInfo->iterInit = false; - - pBlockScanInfo->iter.hasVal = false; - pBlockScanInfo->iiter.hasVal = false; - if (pBlockScanInfo->iter.iter != NULL) { - pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter); - } - - if (pBlockScanInfo->iiter.iter != NULL) { - pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter); - } - - pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); - pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList); - // TODO: keep skyline for reuse - pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); - } } + // resetDataBlockScanInfo excluding lastKey + STableBlockScanInfo** p = NULL; + + int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1; + + int32_t iter = 0; + while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; + clearBlockScanInfo(pInfo); + pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step; + } + + pStatus->uidList.currentIndex = 0; + initReaderStatus(pStatus); + tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; pReader->flag = READER_STATUS_SUSPEND; +#if SUSPEND_RESUME_TEST + tsem_post(&pReader->resumeAfterSuspend); +#endif + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, pReader->idStr); return code; @@ -4399,6 +4340,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { SReaderStatus* pStatus = &pReader->status; + // NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit + // the data should be ingested in round-robin and all the child tables should be createted before ingesting data + // the version range of query will be used to identify the correctness of suspend/resume functions. + // this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files +#if SUSPEND_RESUME_TEST + if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) { + tsem_wait(&pReader->resumeAfterSuspend); + } +#endif + code = tsdbAcquireReader(pReader); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 305399e0af..24c526a906 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -210,6 +210,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->iterInit = false; p->iter.hasVal = false; p->iiter.hasVal = false; + p->sttKeyInfo.status = STT_FILE_READER_UNINIT; if (p->iter.iter != NULL) { p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 60e6e6960a..709e311ff0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -96,7 +96,7 @@ typedef struct SResultBlockInfo { int64_t capacity; } SResultBlockInfo; -typedef struct SCostSummary { +typedef struct SReadCostSummary { int64_t numOfBlocks; double blockLoadTime; double buildmemBlock; @@ -110,7 +110,7 @@ typedef struct SCostSummary { double createScanInfoList; double createSkylineIterTime; double initLastBlockReader; -} SCostSummary; +} SReadCostSummary; typedef struct STableUidList { uint64_t* tableUidList; // access table uid list in uid ascending order list @@ -122,12 +122,6 @@ typedef struct { int32_t numOfSttFiles; } SBlockNumber; -typedef struct SBlockIndex { - int32_t ordinalIndex; - int64_t inFileOffset; - STimeWindow window; // todo replace it with overlap flag. -} SBlockIndex; - typedef struct SBlockOrderWrapper { int64_t uid; int64_t offset; @@ -192,6 +186,7 @@ typedef struct SFileBlockDumpInfo { } SFileBlockDumpInfo; typedef struct SReaderStatus { + bool suspendInvoked; bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SSHashObj* pTableMap; // SHash @@ -220,7 +215,8 @@ struct STsdbReader { int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; - SCostSummary cost; + tsem_t resumeAfterSuspend; + SReadCostSummary cost; SHashObj** pIgnoreTables; SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SDataFileReader* pFileReader; // the file reader diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 33b4114009..8cbca403e3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -595,6 +595,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; + case TDMT_STREAM_HTASK_DROP: { + if (pVnode->restored && vnodeIsLeader(pVnode)) { + tqProcessTaskDropHTask(pVnode->pTq, pMsg); + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 6dd1e5c1c3..b54f19e4ce 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -113,25 +113,23 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32 int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); +int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); +STaskId streamTaskExtractKey(const SStreamTask* pTask); +void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); +void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask); -int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); -STaskId streamTaskExtractKey(const SStreamTask* pTask); -void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); -void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); - SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueProcessSuccess(SStreamQueue* queue); @@ -152,8 +150,8 @@ int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); int deleteCheckpointFile(char* id, char* name); -int32_t onNormalTaskReady(SStreamTask* pTask); -int32_t onScanhistoryTaskReady(SStreamTask* pTask); +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6201329b95..031bb812de 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -158,6 +158,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { + ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false); streamDispatchStreamBlock(pTask); } else { stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code)); @@ -278,6 +279,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->checkpointNotReadyTasks = 0; pTask->checkpointAlignCnt = 0; + pTask->chkInfo.dispatchCheckpointTrigger = false; streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index f6ec6e9fdb..bcda85e7a7 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } // todo handle memory error -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { terrno = 0; if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1f9b21becc..1306d9f746 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -593,6 +593,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } + if (pTask->chkInfo.dispatchCheckpointTrigger) { + stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + return 0; + } + ASSERT(pTask->msgInfo.pData == NULL); stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); @@ -1039,30 +1045,14 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } -static void dispatchDataInFuture(void* param, void* tmrId) { - SStreamTask* pTask = param; - if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); - return; - } - - ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); - taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); - } else { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); - streamDispatchStreamBlock(pTask); - } -} - // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + if (delayDispatch) { + pTask->chkInfo.dispatchCheckpointTrigger = true; + } pTask->msgInfo.pData = NULL; pTask->msgInfo.dispatchMsgType = 0; @@ -1083,13 +1073,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId // otherwise, continue dispatch the first block to down stream task in pipeline if (delayDispatch) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); - if (pTask->msgInfo.pTimer != NULL) { - taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); - } else { - pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); - } + return 0; } else { streamDispatchStreamBlock(pTask); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cae537a860..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -48,6 +48,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl return code; } + // checkpoint trigger will be checked streamDispatchStreamBlock(pTask); } @@ -251,14 +252,18 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; - bool finished = false; + void* exec = pTask->exec.pExecutor; + bool finished = false; + const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); + stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } @@ -266,8 +271,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, - tstrerror(terrno)); + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno)); continue; } @@ -280,12 +284,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - int32_t code = handleResultBlocks(pTask, pRes, size); + /*int32_t code = */handleResultBlocks(pTask, pRes, size); int64_t el = taosGetTimestampMs() - st; // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } @@ -293,9 +297,9 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; } - if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { - stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", - pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); + if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { + stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, + pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } @@ -400,7 +404,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 5. save to disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - // 6. pause allowed. + // 6. add empty delete block if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); @@ -542,7 +546,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t streamExecForAll(SStreamTask* pTask) { +int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. @@ -653,7 +657,7 @@ int32_t streamExecTask(SStreamTask* pTask) { int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { while (1) { - int32_t code = streamExecForAll(pTask); + int32_t code = doStreamExecTask(pTask); if (code < 0) { // todo this status should be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae8c92d48e..85ef11b841 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -784,7 +784,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; @@ -822,7 +822,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; @@ -861,10 +861,18 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { return false; } -static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { - taosArrayDestroy(pMsg->pTaskStatus); - taosArrayDestroy(pMsg->pUpdateNodes); - taosArrayDestroy(pIdList); +void streamMetaClearHbMsg(SStreamHbMsg* pMsg) { + if (pMsg == NULL) { + return; + } + + if (pMsg->pUpdateNodes != NULL) { + taosArrayDestroy(pMsg->pUpdateNodes); + } + + if (pMsg->pTaskStatus != NULL) { + taosArrayDestroy(pMsg->pTaskStatus); + } } static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { @@ -1041,7 +1049,8 @@ void metaHbToMnode(void* param, void* tmrId) { } _end: - clearHbMsg(&hbMsg, pIdList); + streamMetaClearHbMsg(&hbMsg); + taosArrayDestroy(pIdList); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d19dfc13bf..d1610362f9 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *pInput = qItem; } else { // merge current block failed, let's handle the already merged blocks. - void* newRet = streamMergeQueueItem(*pInput, qItem); + void* newRet = streamQueueMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 01078713c5..d97355b5a6 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } } -int32_t onNormalTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamTaskSetReady(pTask); @@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t onScanhistoryTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; // set the state to be ready @@ -470,6 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, taosGetTimestampMs(), false); + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -1069,8 +1079,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { + if (numOfRecv == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; @@ -1084,6 +1095,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); + } else { + stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 24228c0307..e1000bc68e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -448,7 +448,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1c951e1452..cac3766893 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -457,11 +457,11 @@ void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 06b82f3ba1..833cf048fd 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -541,66 +541,71 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement memcpy(output, input + 1, nelements * longBytes); return nelements * longBytes; } else if (input[0] == 1) { // Decompress - int64_t *ostream = (int64_t *)output; + if (tsSIMDEnable && tsAVX512Enable) { + tsDecompressTimestampAvx512(input, nelements, output, false); + } else if (tsSIMDEnable && tsAVX2Enable) { + tsDecompressTimestampAvx2(input, nelements, output, false); + } else { + int64_t *ostream = (int64_t *)output; - int32_t ipos = 1, opos = 0; - int8_t nbytes = 0; - int64_t prev_value = 0; - int64_t prev_delta = 0; - int64_t delta_of_delta = 0; + int32_t ipos = 1, opos = 0; + int8_t nbytes = 0; + int64_t prev_value = 0; + int64_t prev_delta = 0; + int64_t delta_of_delta = 0; - while (1) { - uint8_t flags = input[ipos++]; - // Decode dd1 - uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; - } else { - if (is_bigendian()) { - memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + while (1) { + uint8_t flags = input[ipos++]; + // Decode dd1 + uint64_t dd1 = 0; + nbytes = flags & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; } else { - memcpy(&dd1, input + ipos, nbytes); + if (is_bigendian()) { + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd1, input + ipos, nbytes); + } + delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); } - delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); - } - ipos += nbytes; - if (opos == 0) { - prev_value = delta_of_delta; - prev_delta = 0; - ostream[opos++] = delta_of_delta; - } else { + + ipos += nbytes; + if (opos == 0) { + prev_value = delta_of_delta; + prev_delta = 0; + ostream[opos++] = delta_of_delta; + } else { + prev_delta = delta_of_delta + prev_delta; + prev_value = prev_value + prev_delta; + ostream[opos++] = prev_value; + } + if (opos == nelements) return nelements * longBytes; + + // Decode dd2 + uint64_t dd2 = 0; + nbytes = (flags >> 4) & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; + } else { + if (is_bigendian()) { + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd2, input + ipos, nbytes); + } + // zigzag_decoding + delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); + } + ipos += nbytes; prev_delta = delta_of_delta + prev_delta; prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; + if (opos == nelements) return nelements * longBytes; } - if (opos == nelements) return nelements * longBytes; - - // Decode dd2 - uint64_t dd2 = 0; - nbytes = (flags >> 4) & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; - } else { - if (is_bigendian()) { - memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd2, input + ipos, nbytes); - } - // zigzag_decoding - delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); - } - ipos += nbytes; - prev_delta = delta_of_delta + prev_delta; - prev_value = prev_value + prev_delta; - ostream[opos++] = prev_value; - if (opos == nelements) return nelements * longBytes; } - - } else { - ASSERT(0); - return -1; } + + return nelements * longBytes; } /* --------------------------------------------Double Compression ---------------------------------------------- */ diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index f32a4014d6..470d221100 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int64_t prev_value = 0; #if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); + while (_pos < nelements) { + uint64_t w = *(uint64_t*) ip; char selector = (char)(w & INT64MASK(4)); // selector = 4 char bit = bit_per_integer[(int32_t)selector]; // bit = 3 @@ -114,7 +111,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); - // get the four zigzag values here + // get four zigzag values here __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); // calculate the cumulative sum (prefix sum) for each number @@ -246,6 +243,268 @@ int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelem // todo add later int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { #if __AVX2__ +#endif + return 0; +} + +int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, + bool bigEndian) { +#if 0 + int64_t *ostream = (int64_t *)output; + int32_t ipos = 1, opos = 0; + __m128i prevVal = _mm_setzero_si128(); + __m128i prevDelta = _mm_setzero_si128(); + +#if __AVX2__ + int32_t batch = nelements >> 1; + int32_t remainder = nelements & 0x01; + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; + + int32_t i = 0; + if (batch > 1) { + // first loop + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __m128i data1; + if (nbytes1 == 0) { + data1 = _mm_setzero_si128(); + } else { + memcpy(&data1, (const void*) (input + ipos), nbytes1); + } + + __m128i data2; + if (nbytes2 == 0) { + data2 = _mm_setzero_si128(); + } else { + memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2); + } + + data2 = _mm_broadcastq_epi64(data2); + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta, for the first item + prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + i += 1; + } + + // the remain + for(; i < batch; ++i) { + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + +// __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); +// __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + __m128i data1; + if (nbytes1 == 0) { + data1 = _mm_setzero_si128(); + } else { + int64_t dd = 0; + memcpy(&dd, (const void*) (input + ipos), nbytes1); + data1 = _mm_loadu_si64(&dd); + } + + __m128i data2; + if (nbytes2 == 0) { + data2 = _mm_setzero_si128(); + } else { + int64_t dd = 0; + memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2); + data2 = _mm_loadu_si64(&dd); + } + + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta + __m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta); + prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + } + + if (remainder > 0) { + uint64_t dd = 0; + uint8_t flags = input[ipos++]; + + int32_t nbytes = flags & INT8MASK(4); + int64_t deltaOfDelta = 0; + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + // if (is_bigendian()) { + // memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + // } else { + memcpy(&dd, input + ipos, nbytes); + // } + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd); + } + + ipos += nbytes; + if (opos == 0) { + ostream[opos++] = deltaOfDelta; + } else { + int64_t prevDeltaX = deltaOfDelta + prevDelta[1]; + ostream[opos++] = prevVal[1] + prevDeltaX; + } + } +#endif +#endif + return 0; +} + +int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, + bool UNUSED_PARAM(bigEndian)) { + int64_t *ostream = (int64_t *)output; + int32_t ipos = 1, opos = 0; + +#if __AVX512VL__ + + __m128i prevVal = _mm_setzero_si128(); + __m128i prevDelta = _mm_setzero_si128(); + + int32_t numOfBatch = nelements >> 1; + int32_t remainder = nelements & 0x01; + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; + + int32_t i = 0; + if (numOfBatch > 1) { + // first loop + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta, for the first item + prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + i += 1; + } + + // the remain + for(; i < numOfBatch; ++i) { + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta + __m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta); + prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + } + + if (remainder > 0) { + uint64_t dd = 0; + uint8_t flags = input[ipos++]; + + int32_t nbytes = flags & INT8MASK(4); + int64_t deltaOfDelta = 0; + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + memcpy(&dd, input + ipos, nbytes); + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd); + } + + ipos += nbytes; + if (opos == 0) { + ostream[opos++] = deltaOfDelta; + } else { + int64_t prevDeltaX = deltaOfDelta + prevDelta[1]; + ostream[opos++] = prevVal[1] + prevDeltaX; + } + } + #endif return 0; } \ No newline at end of file diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp new file mode 100644 index 0000000000..caf8df3ba8 --- /dev/null +++ b/source/util/test/decompressTest.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include + +namespace {} // namespace + +TEST(utilTest, decompress_test) { + int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400, + 1700000500, 1700000600, 1700000700, 1700000800, 1700000900}; + + char* pOutput[10 * sizeof(int64_t)] = {0}; + int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0); + + char* decompOutput[10 * 8] = {0}; + tsDecompressTimestamp(pOutput, len, 10, decompOutput, sizeof(int64_t)*10, ONE_STAGE_COMP, NULL, 0); + + for(int32_t i = 0; i < 10; ++i) { + std::cout<< ((int64_t*)decompOutput)[i] << std::endl; + } + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 10, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 10; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + int64_t tsList1[7] = {1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000900}; + int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7, ONE_STAGE_COMP, NULL, 0); + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 7, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 7; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + int64_t tsList2[1] = {1700000000}; + int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1, ONE_STAGE_COMP, NULL, 0); + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 1, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 1; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } +} + +TEST(utilTest, decompress_perf_test) { + int32_t num = 10000; + + int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); + int64_t iniVal = 1700000000; + + uint32_t v = 100; + + for(int32_t i = 0; i < num; ++i) { + iniVal += taosRandR(&v)%10; + pList[i] = iniVal; + } + + char* px = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + int32_t len = tsCompressTimestamp(pList, num * sizeof(int64_t), num, px, num, ONE_STAGE_COMP, NULL, 0); + + char* pOutput = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + + int64_t st = taosGetTimestampUs(); + for(int32_t k = 0; k < 10000; ++k) { + tsDecompressTimestamp(px, len, num, pOutput, sizeof(int64_t) * num, ONE_STAGE_COMP, NULL, 0); + } + + int64_t el1 = taosGetTimestampUs() - st; + std::cout << "soft decompress elapsed time:" << el1 << " us" << std::endl; + + memset(pOutput, 0, num * sizeof(int64_t)); + st = taosGetTimestampUs(); + for(int32_t k = 0; k < 10000; ++k) { + tsDecompressTimestampAvx512(px, num, pOutput, false); + } + + int64_t el2 = taosGetTimestampUs() - st; + std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl; + + taosMemoryFree(pList); + taosMemoryFree(pOutput); + taosMemoryFree(px); +} + diff --git a/tests/script/tsim/stream/basic4.sim b/tests/script/tsim/stream/basic4.sim index b4e3d62545..d2bf321ad5 100644 --- a/tests/script/tsim/stream/basic4.sim +++ b/tests/script/tsim/stream/basic4.sim @@ -80,6 +80,7 @@ sql use test2; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s); +sleep 1000 sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791212001,2,2,3,1.1);