Merge pull request #23791 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2023-11-24 14:42:43 +08:00 committed by GitHub
commit d5e123db78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 422 additions and 325 deletions

View File

@ -1,13 +0,0 @@
# bdb
ExternalProject_Add(bdb
GIT_REPOSITORY https://github.com/berkeleydb/libdb.git
GIT_TAG v5.3.28
SOURCE_DIR "${TD_CONTRIB_DIR}/bdb"
BINARY_DIR "${TD_CONTRIB_DIR}/bdb"
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND COMMAND ./dist/configure --enable-debug
BUILD_COMMAND "$(MAKE)"
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,13 +0,0 @@
# leveldb
ExternalProject_Add(leveldb
GIT_REPOSITORY https://github.com/taosdata-contrib/leveldb.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/leveldb"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,12 +0,0 @@
# lucene
ExternalProject_Add(lucene
GIT_REPOSITORY https://github.com/yihaoDeng/LucenePlusPlus.git
SOURCE_DIR "${TD_CONTRIB_DIR}/lucene"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -1,12 +0,0 @@
# NuRaft
ExternalProject_Add(NuRaft
GIT_REPOSITORY https://github.com/eBay/NuRaft.git
GIT_TAG v1.3.0
SOURCE_DIR "${TD_CONTRIB_DIR}/nuraft"
BINARY_DIR "${TD_CONTRIB_DIR}/nuraft"
CONFIGURE_COMMAND "./prepare.sh"
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -109,11 +109,6 @@ cat("${TD_SUPPORT_DIR}/zlib_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cJson
cat("${TD_SUPPORT_DIR}/cjson_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# leveldb
if(${BUILD_WITH_LEVELDB})
cat("${TD_SUPPORT_DIR}/leveldb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_LEVELDB})
if (${BUILD_CONTRIB})
if(${BUILD_WITH_ROCKSDB})
cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -132,28 +127,11 @@ else()
endif()
endif()
# canonical-raft
if(${BUILD_WITH_CRAFT})
cat("${TD_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
endif(${BUILD_WITH_CRAFT})
# traft
if(${BUILD_WITH_TRAFT})
cat("${TD_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE)
endif(${BUILD_WITH_TRAFT})
#libuv
if(${BUILD_WITH_UV})
cat("${TD_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_UV})
# bdb
if(${BUILD_WITH_BDB})
cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_BDB})
# sqlite
if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -178,17 +156,6 @@ elseif(${BUILD_WITH_COS})
endif()
# lucene
if(${BUILD_WITH_LUCENE})
cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_LUCENE)
endif(${BUILD_WITH_LUCENE})
# NuRaft
if(${BUILD_WITH_NURAFT})
cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
# crashdump
if(${BUILD_CRASHDUMP})
cat("${TD_SUPPORT_DIR}/crashdump_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
@ -448,23 +415,6 @@ elseif(${BUILD_WITH_COS})
endif()
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE})
option(ENABLE_TEST "Enable the tests" OFF)
add_subdirectory(lucene EXCLUDE_FROM_ALL)
target_include_directories(
lucene++
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include>
)
endif(${BUILD_WITH_LUCENE})
# NuRaft
if(${BUILD_WITH_NURAFT})
add_subdirectory(nuraft EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_NURAFT})
# pthread
if(${BUILD_PTHREAD})
if ("${CMAKE_BUILD_TYPE}" STREQUAL "")
@ -537,30 +487,6 @@ if(${BUILD_WCWIDTH})
SET_TARGET_PROPERTIES(wcwidth PROPERTIES OUTPUT_NAME wcwidth)
endif(${BUILD_WCWIDTH})
# CRAFT
if(${BUILD_WITH_CRAFT})
add_library(craft STATIC IMPORTED GLOBAL)
set_target_properties(craft PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft/include"
)
# target_link_libraries(craft
# INTERFACE pthread
# )
endif(${BUILD_WITH_CRAFT})
# TRAFT
if(${BUILD_WITH_TRAFT})
add_library(traft STATIC IMPORTED GLOBAL)
set_target_properties(traft PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include"
)
# target_link_libraries(craft
# INTERFACE pthread
# )
endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
if (TD_WINDOWS)
@ -572,18 +498,6 @@ if(${BUILD_WITH_UV})
add_subdirectory(libuv EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_UV})
# BDB
if(${BUILD_WITH_BDB})
add_library(bdb STATIC IMPORTED GLOBAL)
set_target_properties(bdb PROPERTIES
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
)
target_link_libraries(bdb
INTERFACE pthread
)
endif(${BUILD_WITH_BDB})
# SQLite
# see https://stackoverflow.com/questions/8774593/cmake-link-to-external-library#comment58570736_10550334
if(${BUILD_WITH_SQLITE})

View File

@ -3271,7 +3271,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
typedef struct {
int8_t reserved;

View File

@ -271,6 +271,7 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)

View File

@ -655,17 +655,19 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t statusLastDuration; // to record the last duration of current status
int64_t stage;
int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task
double sinkDataSize; // sink to dest data size
double sinkDataSize; // sink to dst data size
} STaskStatusEntry;
typedef struct SStreamHbMsg {

View File

@ -84,6 +84,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -833,6 +833,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -78,6 +78,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
@ -1570,42 +1572,7 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) {
break;
}
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
count += taosArrayGetSize(pLevel);
}
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
static void setTaskAttrInResBlock(SStreamObj* pStream, SStreamTask* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
SColumnInfoData *pColInfo;
int32_t cols = 0;
@ -1660,11 +1627,11 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
return;
}
const char *pStatus = streamTaskGetStatusStr(pe->status);
@ -1708,14 +1675,54 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
}
static int32_t getNumOfTasks(SArray* pTaskList) {
int32_t numOfLevels = taosArrayGetSize(pTaskList);
int32_t count = 0;
for (int32_t i = 0; i < numOfLevels; i++) {
SArray *pLevel = taosArrayGetP(pTaskList, i);
count += taosArrayGetSize(pLevel);
}
return count;
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) {
break;
}
// lock
taosRLockLatch(&pStream->lock);
int32_t count = getNumOfTasks(pStream->tasks);
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
numOfRows++;
}
}
// unlock
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -2729,7 +2736,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le
return TSDB_CODE_SUCCESS;
}
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
STrans* pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
@ -2765,6 +2772,91 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
return 0;
}
static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
}
}
return NULL;
}
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) {
int32_t numOfReady = 0;
int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pTaskEntry->id.streamId == pId->streamId) {
numOfTotal++;
if (pTaskEntry->id.taskId != pId->taskId) {
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->status == TASK_STATUS__READY) {
numOfReady++;
}
}
}
}
if (numOfReady > 0) {
mDebug("stream:0x%" PRIx64
" %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history task",
pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
return true;
} else {
return false;
}
}
}
return false;
}
// currently only handle the sink task
// 1. sink task, drop related fill-history task msg is missing
// 2. other tasks are in ready state for at least 3 * hb_interval
static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) {
SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream);
if (pTask == NULL) {
mError("failed to get the stream task:0x%x, may have been dropped", (int32_t) pTaskEntry->id.taskId);
return -1;
}
SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq));
if (pReq == NULL) {
mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq));
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
tmsgSendReq(&epset, &msg);
return TSDB_CODE_SUCCESS;
}
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
@ -2865,9 +2957,28 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
if (p->status == pTaskEntry->status) {
pTaskEntry->statusLastDuration++;
} else {
pTaskEntry->status = p->status;
pTaskEntry->statusLastDuration = 0;
}
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
if(drop) {
SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
if (pStreamObj == NULL) {
mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
} else {
mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
mndReleaseStream(pMnode, pStreamObj);
}
}
}
}
}
@ -2894,3 +3005,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->uid == streamId) {
sdbCancelFetch(pSdb, pIter);
return pStream;
}
}
return NULL;
}

View File

@ -232,6 +232,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq);

View File

@ -2014,3 +2014,34 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*) pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
if (pTask->hTaskInfo.id.taskId == 0) {
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
ASSERT(status == TASK_STATUS__STREAM_SCAN_HISTORY);
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -595,6 +595,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_STREAM_HTASK_DROP: {
if (pVnode->restored && vnodeIsLeader(pVnode)) {
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -113,29 +113,25 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
@ -156,8 +152,8 @@ int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);
#ifdef __cplusplus
}

View File

@ -158,7 +158,7 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
}
// todo handle memory error
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
terrno = 0;
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {

View File

@ -958,7 +958,9 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
if (pTask->pReadyMsgList == NULL) return;
if (pTask->pReadyMsgList == NULL) {
return;
}
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);

View File

@ -401,7 +401,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 5. save to disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
// 6. pause allowed.
// 6. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);

View File

@ -221,7 +221,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*pInput = qItem;
} else {
// merge current block failed, let's handle the already merged blocks.
void* newRet = streamMergeQueueItem(*pInput, qItem);
void* newRet = streamQueueMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
if (terrno != 0) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,

View File

@ -323,7 +323,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
}
int32_t onNormalTaskReady(SStreamTask* pTask) {
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
streamTaskSetReady(pTask);
@ -348,7 +348,7 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// set the state to be ready

View File

@ -451,7 +451,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task
// 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);

View File

@ -457,11 +457,11 @@ void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event

View File

@ -280,8 +280,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
#endif
}
/* ----------------------------------------------Bool Compression
* ---------------------------------------------- */
/* ----------------------------------------------Bool Compression ---------------------------------------------- */
// TODO: You can also implement it using RLE method.
int32_t tsCompressBoolImp(const char *const input, const int32_t nelements, char *const output) {
int32_t pos = -1;
@ -387,8 +386,7 @@ int32_t tsDecompressBoolRLEImp(const char *const input, const int32_t nelements,
}
#endif
/* ----------------------------------------------String Compression
* ---------------------------------------------- */
/* ----------------------------------------------String Compression ---------------------------------------------- */
// Note: the size of the output must be larger than input_size + 1 and
// LZ4_compressBound(size) + 1;
// >= max(input_size, LZ4_compressBound(input_size)) + 1;
@ -430,8 +428,7 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
}
}
/* --------------------------------------------Timestamp Compression
* ---------------------------------------------- */
/* --------------------------------------------Timestamp Compression ---------------------------------------------- */
// TODO: Take care here, we assumes little endian encoding.
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
int32_t _pos = 1;
@ -751,8 +748,7 @@ int32_t tsDecompressDoubleImp(const char *const input, const int32_t nelements,
return nelements * DOUBLE_BYTES;
}
/* --------------------------------------------Float Compression
* ---------------------------------------------- */
/* --------------------------------------------Float Compression ---------------------------------------------- */
void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *const pos) {
uint8_t nbytes = (flag & INT8MASK(3)) + 1;
int32_t nshift = (FLOAT_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);

View File

@ -50,7 +50,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
const char *ip = input + 1;
int32_t count = 0;
int32_t _pos = 0;
int64_t prev_value = 0;
int64_t prevValue = 0;
#if __AVX2__
while (1) {
@ -80,13 +80,13 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX2Enable) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
__m256i prev = _mm256_set1_epi64x(prevValue);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
_pos += 4;
}
for (int32_t i = 0; i < remain; ++i) {
p[_pos++] = prev_value;
p[_pos++] = prevValue;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
#if __AVX512F__
@ -94,7 +94,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
#endif
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
}
@ -118,16 +118,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
// calculate the cumulative sum (prefix sum) for each number
// decode[0] = prev_value + final[0]
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
// decode[0] = prevValue + final[0]
// decode[1] = decode[0] + final[1] -----> prevValue + final[0] + final[1]
// decode[2] = decode[1] + final[2] -----> prevValue + final[0] + final[1] + final[2]
// decode[3] = decode[2] + final[3] -----> prevValue + final[0] + final[1] + final[2] + final[3]
// 1, 2, 3, 4
//+ 0, 1, 0, 3
// 1, 3, 3, 7
// shift and add for the first round
__m128i prev = _mm_set1_epi64x(prev_value);
__m128i prev = _mm_set1_epi64x(prevValue);
__m256i x = _mm256_slli_si256(delta, 8);
delta = _mm256_add_epi64(delta, x);
@ -148,16 +148,16 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
shiftBits = _mm256_add_epi64(shiftBits, inc);
prev_value = p[_pos + 3];
prevValue = p[_pos + 3];
_pos += 4;
}
// handle the remain value
for (int32_t i = 0; i < remain; i++) {
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
} else if (tsSIMDEnable && tsAVX512Enable) {
@ -167,9 +167,9 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
} else { // alternative implementation without SIMD instructions.
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = prev_value;
p[_pos++] = prevValue;
v += bit;
}
}
@ -180,14 +180,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int32_t)prev_value;
p[_pos++] = (int32_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int32_t)prev_value;
p[_pos++] = (int32_t)prevValue;
v += bit;
}
}
@ -197,14 +197,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int16_t)prev_value;
p[_pos++] = (int16_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
p[_pos++] = (int16_t)prevValue;
v += bit;
}
}
@ -215,14 +215,14 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = (int8_t)prev_value;
p[_pos++] = (int8_t)prevValue;
}
} else {
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
zigzag_value = ((w >> v) & mask);
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
prevValue += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
p[_pos++] = (int8_t)prevValue;
v += bit;
}
}
@ -249,3 +249,74 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen
#endif
return 0;
}
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) {
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
int8_t nbytes = 0;
int64_t prevValue = 0;
int64_t prevDelta = 0;
int64_t deltaOfDelta = 0;
int32_t longBytes = LONG_BYTES;
#if __AVX2__
int32_t batch = nelements >> 2;
int32_t remainder = nelements & 0x1;
while (1) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
}
#endif
return 0;
}