diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in deleted file mode 100644 index dd21020105..0000000000 --- a/cmake/bdb_CMakeLists.txt.in +++ /dev/null @@ -1,13 +0,0 @@ - -# bdb -ExternalProject_Add(bdb - GIT_REPOSITORY https://github.com/berkeleydb/libdb.git - GIT_TAG v5.3.28 - SOURCE_DIR "${TD_CONTRIB_DIR}/bdb" - BINARY_DIR "${TD_CONTRIB_DIR}/bdb" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND COMMAND ./dist/configure --enable-debug - BUILD_COMMAND "$(MAKE)" - INSTALL_COMMAND "" - TEST_COMMAND "" -) diff --git a/cmake/leveldb_CMakeLists.txt.in b/cmake/leveldb_CMakeLists.txt.in deleted file mode 100644 index 6878711bc1..0000000000 --- a/cmake/leveldb_CMakeLists.txt.in +++ /dev/null @@ -1,13 +0,0 @@ - -# leveldb -ExternalProject_Add(leveldb - GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git - GIT_TAG master - SOURCE_DIR "${TD_CONTRIB_DIR}/leveldb" - BINARY_DIR "" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) \ No newline at end of file diff --git a/cmake/lucene_CMakeLists.txt.in b/cmake/lucene_CMakeLists.txt.in deleted file mode 100644 index 9fd7471705..0000000000 --- a/cmake/lucene_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# lucene -ExternalProject_Add(lucene - GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git - SOURCE_DIR "${TD_CONTRIB_DIR}/lucene" - BINARY_DIR "" - #BUILD_IN_SOURCE TRUE - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" -) diff --git a/cmake/nuraft_CMakeLists.txt.in b/cmake/nuraft_CMakeLists.txt.in deleted file mode 100644 index 593c6fed26..0000000000 --- a/cmake/nuraft_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# NuRaft -ExternalProject_Add(NuRaft - GIT_REPOSITORY https://github.com/eBay/NuRaft.git - GIT_TAG v1.3.0 - SOURCE_DIR "${TD_CONTRIB_DIR}/nuraft" - BINARY_DIR "${TD_CONTRIB_DIR}/nuraft" - CONFIGURE_COMMAND "./prepare.sh" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) \ No newline at end of file diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 1b0a091e9d..bb18516ba4 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -109,11 +109,6 @@ cat("${TD_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) # cJson cat("${TD_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -# leveldb -if(${BUILD_WITH_LEVELDB}) - cat("${TD_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_LEVELDB}) - if (${BUILD_CONTRIB}) if(${BUILD_WITH_ROCKSDB}) cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -132,28 +127,11 @@ else() endif() endif() -# canonical-raft -if(${BUILD_WITH_CRAFT}) - cat("${TD_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE) -endif(${BUILD_WITH_CRAFT}) - -# traft -if(${BUILD_WITH_TRAFT}) - cat("${TD_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE) -endif(${BUILD_WITH_TRAFT}) - #libuv if(${BUILD_WITH_UV}) cat("${TD_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_UV}) -# bdb -if(${BUILD_WITH_BDB}) - cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_BDB}) - # sqlite if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -178,17 +156,6 @@ elseif(${BUILD_WITH_COS}) endif() -# lucene -if(${BUILD_WITH_LUCENE}) - cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) - add_definitions(-DUSE_LUCENE) -endif(${BUILD_WITH_LUCENE}) - -# NuRaft -if(${BUILD_WITH_NURAFT}) - cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_NURAFT}) - # crashdump if(${BUILD_CRASHDUMP}) cat("${TD_SUPPORT_DIR}/crashdump_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -448,23 +415,6 @@ elseif(${BUILD_WITH_COS}) endif() -# lucene -# To support build on ubuntu: sudo apt-get install libboost-all-dev -if(${BUILD_WITH_LUCENE}) - option(ENABLE_TEST "Enable the tests" OFF) - add_subdirectory(lucene EXCLUDE_FROM_ALL) - target_include_directories( - lucene++ - PUBLIC $ - ) - -endif(${BUILD_WITH_LUCENE}) - -# NuRaft -if(${BUILD_WITH_NURAFT}) - add_subdirectory(nuraft EXCLUDE_FROM_ALL) -endif(${BUILD_WITH_NURAFT}) - # pthread if(${BUILD_PTHREAD}) if ("${CMAKE_BUILD_TYPE}" STREQUAL "") @@ -537,30 +487,6 @@ if(${BUILD_WCWIDTH}) SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth) endif(${BUILD_WCWIDTH}) -# CRAFT -if(${BUILD_WITH_CRAFT}) - add_library(craft STATIC IMPORTED GLOBAL) - set_target_properties(craft PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft/include" - ) - # target_link_libraries(craft - # INTERFACE pthread - # ) -endif(${BUILD_WITH_CRAFT}) - -# TRAFT -if(${BUILD_WITH_TRAFT}) - add_library(traft STATIC IMPORTED GLOBAL) - set_target_properties(traft PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include" - ) - # target_link_libraries(craft - # INTERFACE pthread - # ) -endif(${BUILD_WITH_TRAFT}) - # LIBUV if(${BUILD_WITH_UV}) if (TD_WINDOWS) @@ -572,18 +498,6 @@ if(${BUILD_WITH_UV}) add_subdirectory(libuv EXCLUDE_FROM_ALL) endif(${BUILD_WITH_UV}) -# BDB -if(${BUILD_WITH_BDB}) - add_library(bdb STATIC IMPORTED GLOBAL) - set_target_properties(bdb PROPERTIES - IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a" - INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb" - ) - target_link_libraries(bdb - INTERFACE pthread - ) -endif(${BUILD_WITH_BDB}) - # SQLite # see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334 if(${BUILD_WITH_SQLITE}) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e17a72992c..73d1ab2473 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3271,7 +3271,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq; +} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq; typedef struct { int8_t reserved; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4eb8328caa..2d1148a209 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -271,6 +271,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ad2e9514d1..cf9fc1d826 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -655,17 +655,19 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; + int32_t statusLastDuration; // to record the last duration of current status int64_t stage; int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int64_t activeCheckpointId; // current active checkpoint id - bool checkpointFailed; // denote if the checkpoint is failed or not - double inputQUsed; // in MiB + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + int32_t relatedHTask; // has related fill-history task + int64_t activeCheckpointId; // current active checkpoint id + bool checkpointFailed; // denote if the checkpoint is failed or not + double inputQUsed; // in MiB double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dest data size + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index cd81b9873f..6de29f8513 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -84,6 +84,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c4c8a51d6b..a535ab17d7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -833,6 +833,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d2f0a13038..db013017e3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -78,6 +78,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); +static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); + static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg); @@ -1570,6 +1572,123 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } +static void setTaskAttrInResBlock(SStreamObj* pStream, SStreamTask* pTask, SSDataBlock* pBlock, int32_t numOfRows) { + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + + char idstr[128] = {0}; + int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pTask->info.nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + colDataSetVal(pColInfo, numOfRows, nodeType, false); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int64_t nodeId = TMAX(pTask->info.nodeId, 0); + colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)level, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + return; + } + + const char *pStatus = streamTaskGetStatusStr(pe->status); + STR_TO_VARSTR(status, pStatus); + + // status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + + // stage + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); + + // input queue + char vbuf[30] = {0}; + char buf[25] = {0}; + const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; + sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + + // output queue + // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); + // STR_TO_VARSTR(vbuf, buf); + + // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + const char *sinkStr = "%.2fMiB"; + sprintf(buf, sinkStr, pe->sinkDataSize); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + // offset info + const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; + sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); + } + + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); +} + +static int32_t getNumOfTasks(SArray* pTaskList) { + int32_t numOfLevels = taosArrayGetSize(pTaskList); + + int32_t count = 0; + for (int32_t i = 0; i < numOfLevels; i++) { + SArray *pLevel = taosArrayGetP(pTaskList, i); + count += taosArrayGetSize(pLevel); + } + + return count; +} + static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode * pMnode = pReq->info.node; SSdb * pSdb = pMnode->pSdb; @@ -1585,137 +1704,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // lock taosRLockLatch(&pStream->lock); - // count task num - int32_t sz = taosArrayGetSize(pStream->tasks); - - int32_t count = 0; - for (int32_t i = 0; i < sz; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - count += taosArrayGetSize(pLevel); - } - + int32_t count = getNumOfTasks(pStream->tasks); if (numOfRows + count > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + count); } // add row for each task - for (int32_t i = 0; i < sz; i++) { + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); - int32_t levelCnt = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < levelCnt; j++) { + int32_t numOfLevels = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfLevels; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - - SColumnInfoData *pColInfo; - int32_t cols = 0; - - // stream name - char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); - - // task id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - char idstr[128] = {0}; - int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); - colDataSetVal(pColInfo, numOfRows, idstr, false); - - // node type - char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; - varDataSetLen(nodeType, 5); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pTask->info.nodeId > 0) { - memcpy(varDataVal(nodeType), "vnode", 5); - } else { - memcpy(varDataVal(nodeType), "snode", 5); - } - colDataSetVal(pColInfo, numOfRows, nodeType, false); - - // node id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int64_t nodeId = TMAX(pTask->info.nodeId, 0); - colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); - - // level - char level[20 + VARSTR_HEADER_SIZE] = {0}; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - memcpy(varDataVal(level), "source", 6); - varDataSetLen(level, 6); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - memcpy(varDataVal(level), "agg", 3); - varDataSetLen(level, 3); - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - memcpy(varDataVal(level), "sink", 4); - varDataSetLen(level, 4); - } - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)level, false); - - // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (pe == NULL) { - continue; - } - - const char *pStatus = streamTaskGetStatusStr(pe->status); - STR_TO_VARSTR(status, pStatus); - - // status - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)status, false); - - // stage - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); - - // input queue - char vbuf[30] = {0}; - char buf[25] = {0}; - const char *queueInfoStr = "%4.2fMiB (%5.2f%)"; - sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - - // output queue - // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); - // STR_TO_VARSTR(vbuf, buf); - - // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char *sinkStr = "%.2fMiB"; - sprintf(buf, sinkStr, pe->sinkDataSize); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - // offset info - const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); - } - - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - + setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); numOfRows++; } } // unlock taosRUnLockLatch(&pStream->lock); - sdbRelease(pSdb, pStream); } @@ -2729,7 +2736,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le return TSDB_CODE_SUCCESS; } -int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { STrans* pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill checkpoint transId:%d to reset task status", transId); @@ -2765,6 +2772,91 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { return 0; } +static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) { + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfLevels = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfLevels; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + if (pTask->id.taskId == pId->taskId) { + return pTask; + } + } + } + + return NULL; +} + +static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { + if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { + if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) { + int32_t numOfReady = 0; + int32_t numOfTotal = 0; + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pTaskEntry->id.streamId == pId->streamId) { + numOfTotal++; + + if (pTaskEntry->id.taskId != pId->taskId) { + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->status == TASK_STATUS__READY) { + numOfReady++; + } + } + } + } + + if (numOfReady > 0) { + mDebug("stream:0x%" PRIx64 + " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task", + pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady); + return true; + } else { + return false; + } + } + } + + return false; +} + +// currently only handle the sink task +// 1. sink task, drop related fill-history task msg is missing +// 2. other tasks are in ready state for at least 3 * hb_interval +static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) { + SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream); + if (pTask == NULL) { + mError("failed to get the stream task:0x%x, may have been dropped", (int32_t) pTaskEntry->id.taskId); + return -1; + } + + SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq)); + if (pReq == NULL) { + mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SRpcMsg msg = {.info.noResp = 1}; + + initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq)); + + mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId); + + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + tmsgSendReq(&epset, &msg); + return TSDB_CODE_SUCCESS; +} + int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); mInfo("set node expired for %d nodes", num); @@ -2865,9 +2957,28 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - pTaskEntry->status = p->status; + if (p->status == pTaskEntry->status) { + pTaskEntry->statusLastDuration++; + } else { + pTaskEntry->status = p->status; + pTaskEntry->statusLastDuration = 0; + } + if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); + + if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { + bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); + if(drop) { + SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); + if (pStreamObj == NULL) { + mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); + } else { + mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); + mndReleaseStream(pMnode, pStreamObj); + } + } + } } } @@ -2894,3 +3005,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosArrayDestroy(req.pUpdateNodes); return TSDB_CODE_SUCCESS; } + +SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { + if (pStream->uid == streamId) { + sdbCancelFetch(pSdb, pIter); + return pStream; + } + } + + return NULL; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index df1720d4a7..16379db053 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -232,6 +232,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg); int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); int32_t tqRestartStreamTasks(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7be2a357d3..9d16402ee6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -2014,3 +2014,34 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } + +int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { + SVDropHTaskReq* pReq = (SVDropHTaskReq*) pMsg->pCont; + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, pReq->taskId); + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr); + if (pTask->hTaskInfo.id.taskId == 0) { + tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY); + + streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); + + SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); + + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; +} + diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 33b4114009..8cbca403e3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -595,6 +595,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg tqProcessTaskResetReq(pVnode->pTq, pMsg); } } break; + case TDMT_STREAM_HTASK_DROP: { + if (pVnode->restored && vnodeIsLeader(pVnode)) { + tqProcessTaskDropHTask(pVnode->pTq, pMsg); + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4800e62109..afb6b9227d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -113,28 +113,24 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32 int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); - -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, - int32_t* blockSize); -int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); -void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); -const char* streamQueueItemGetTypeStr(int32_t type); - -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen); -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); - -void streamClearChkptReadyMsg(SStreamTask* pTask); - int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); +int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); + +void streamClearChkptReadyMsg(SStreamTask* pTask); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + int32_t* blockSize); +int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); +void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); +const char* streamQueueItemGetTypeStr(int32_t type); +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); + +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); +int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); @@ -156,8 +152,8 @@ int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); int deleteCheckpointFile(char* id, char* name); -int32_t onNormalTaskReady(SStreamTask* pTask); -int32_t onScanhistoryTaskReady(SStreamTask* pTask); +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index f6ec6e9fdb..bcda85e7a7 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm } // todo handle memory error -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { +SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { terrno = 0; if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index da077c67f8..b0ba78ca6e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -958,7 +958,9 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, } void streamClearChkptReadyMsg(SStreamTask* pTask) { - if (pTask->pReadyMsgList == NULL) return; + if (pTask->pReadyMsgList == NULL) { + return; + } for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) { SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 43875319b7..8b14846414 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -401,7 +401,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 5. save to disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); - // 6. pause allowed. + // 6. add empty delete block if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d19dfc13bf..d1610362f9 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *pInput = qItem; } else { // merge current block failed, let's handle the already merged blocks. - void* newRet = streamMergeQueueItem(*pInput, qItem); + void* newRet = streamQueueMergeQueueItem(*pInput, qItem); if (newRet == NULL) { if (terrno != 0) { stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index f042687942..469813defc 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ } } -int32_t onNormalTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; streamTaskSetReady(pTask); @@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t onScanhistoryTaskReady(SStreamTask* pTask) { +int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { const char* id = pTask->id.idStr; // set the state to be ready diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c93d658adb..beaab0a415 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -451,7 +451,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i // 2MiB per second for sink task // 50 times sink operator per second - streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr); + streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); TdThreadMutexAttr attr = {0}; int code = taosThreadMutexAttrInit(&attr); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 1c951e1452..cac3766893 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -457,11 +457,11 @@ void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 06b82f3ba1..09f50d0f6d 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -280,8 +280,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha #endif } -/* ----------------------------------------------Bool Compression - * ---------------------------------------------- */ +/* ----------------------------------------------Bool Compression ---------------------------------------------- */ // TODO: You can also implement it using RLE method. int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) { int32_t pos = -1; @@ -387,8 +386,7 @@ int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements, } #endif -/* ----------------------------------------------String Compression - * ---------------------------------------------- */ +/* ----------------------------------------------String Compression ---------------------------------------------- */ // Note: the size of the output must be larger than input_size + 1 and // LZ4_compressBound(size) + 1; // >= max(input_size, LZ4_compressBound(input_size)) + 1; @@ -430,8 +428,7 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c } } -/* --------------------------------------------Timestamp Compression - * ---------------------------------------------- */ +/* --------------------------------------------Timestamp Compression ---------------------------------------------- */ // TODO: Take care here, we assumes little endian encoding. int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { int32_t _pos = 1; @@ -751,8 +748,7 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements, return nelements * DOUBLE_BYTES; } -/* --------------------------------------------Float Compression - * ---------------------------------------------- */ +/* --------------------------------------------Float Compression ---------------------------------------------- */ void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) { uint8_t nbytes = (flag & INT8MASK(3)) + 1; int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index f32a4014d6..68841941db 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -50,7 +50,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, const char *ip = input + 1; int32_t count = 0; int32_t _pos = 0; - int64_t prev_value = 0; + int64_t prevValue = 0; #if __AVX2__ while (1) { @@ -80,13 +80,13 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, if (selector == 0 || selector == 1) { if (tsSIMDEnable && tsAVX2Enable) { for (int32_t i = 0; i < batch; ++i) { - __m256i prev = _mm256_set1_epi64x(prev_value); + __m256i prev = _mm256_set1_epi64x(prevValue); _mm256_storeu_si256((__m256i *)&p[_pos], prev); _pos += 4; } for (int32_t i = 0; i < remain; ++i) { - p[_pos++] = prev_value; + p[_pos++] = prevValue; } } else if (tsSIMDEnable && tsAVX512Enable) { #if __AVX512F__ @@ -94,7 +94,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, #endif } else { // alternative implementation without SIMD instructions. for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = prev_value; + p[_pos++] = prevValue; v += bit; } } @@ -118,16 +118,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); // calculate the cumulative sum (prefix sum) for each number - // decode[0] = prev_value + final[0] - // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] - // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2] - // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3] + // decode[0] = prevValue + final[0] + // decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1] + // decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2] + // decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3] // 1, 2, 3, 4 //+ 0, 1, 0, 3 // 1, 3, 3, 7 // shift and add for the first round - __m128i prev = _mm_set1_epi64x(prev_value); + __m128i prev = _mm_set1_epi64x(prevValue); __m256i x = _mm256_slli_si256(delta, 8); delta = _mm256_add_epi64(delta, x); @@ -148,16 +148,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, _mm_storeu_si128((__m128i *)&p[_pos + 2], secPart); shiftBits = _mm256_add_epi64(shiftBits, inc); - prev_value = p[_pos + 3]; + prevValue = p[_pos + 3]; _pos += 4; } // handle the remain value for (int32_t i = 0; i < remain; i++) { zigzag_value = ((w >> (v + (batch * bit * 4))) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + prevValue += ZIGZAG_DECODE(int64_t, zigzag_value); - p[_pos++] = prev_value; + p[_pos++] = prevValue; v += bit; } } else if (tsSIMDEnable && tsAVX512Enable) { @@ -167,9 +167,9 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, } else { // alternative implementation without SIMD instructions. for (int32_t i = 0; i < elems && count < nelements; i++, count++) { zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + prevValue += ZIGZAG_DECODE(int64_t, zigzag_value); - p[_pos++] = prev_value; + p[_pos++] = prevValue; v += bit; } } @@ -180,14 +180,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, if (selector == 0 || selector == 1) { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int32_t)prev_value; + p[_pos++] = (int32_t)prevValue; } } else { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + prevValue += ZIGZAG_DECODE(int64_t, zigzag_value); - p[_pos++] = (int32_t)prev_value; + p[_pos++] = (int32_t)prevValue; v += bit; } } @@ -197,14 +197,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, if (selector == 0 || selector == 1) { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int16_t)prev_value; + p[_pos++] = (int16_t)prevValue; } } else { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + prevValue += ZIGZAG_DECODE(int64_t, zigzag_value); - p[_pos++] = (int16_t)prev_value; + p[_pos++] = (int16_t)prevValue; v += bit; } } @@ -215,14 +215,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, if (selector == 0 || selector == 1) { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int8_t)prev_value; + p[_pos++] = (int8_t)prevValue; } } else { for (int32_t i = 0; i < elems && count < nelements; i++, count++) { zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + prevValue += ZIGZAG_DECODE(int64_t, zigzag_value); - p[_pos++] = (int8_t)prev_value; + p[_pos++] = (int8_t)prevValue; v += bit; } } @@ -246,6 +246,77 @@ int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelem // todo add later int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { #if __AVX2__ +#endif + return 0; +} + +int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { + int64_t *ostream = (int64_t *)output; + int32_t ipos = 1, opos = 0; + int8_t nbytes = 0; + + int64_t prevValue = 0; + int64_t prevDelta = 0; + + int64_t deltaOfDelta = 0; + int32_t longBytes = LONG_BYTES; + +#if __AVX2__ + + int32_t batch = nelements >> 2; + int32_t remainder = nelements & 0x1; + + while (1) { + uint8_t flags = input[ipos++]; + + // Decode dd1 + uint64_t dd1 = 0; + nbytes = flags & INT8MASK(4); + + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + if (bigEndian) { + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd1, input + ipos, nbytes); + } + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1); + } + + ipos += nbytes; + prevDelta += deltaOfDelta; + prevValue += prevDelta; + ostream[opos++] = prevValue; + + if (opos == nelements) { + return nelements * longBytes; + } + + // Decode dd2 + uint64_t dd2 = 0; + nbytes = (flags >> 4) & INT8MASK(4); + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + if (bigEndian) { + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd2, input + ipos, nbytes); + } + // zigzag_decoding + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2); + } + + ipos += nbytes; + prevDelta += deltaOfDelta; + prevValue += prevDelta; + ostream[opos++] = prevValue; + + if (opos == nelements) { + return nelements * longBytes; + } + } #endif return 0; } \ No newline at end of file