From 52b3e1be6f5d8e34ff654914d0f819bf5bc65459 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 14:08:22 +0800 Subject: [PATCH 1/2] fix(stream): drop orphan tasks/reset task in trans done by write-queue. --- include/common/tmsg.h | 16 +- include/common/tmsgdef.h | 6 +- source/common/src/tmsg.c | 49 +- source/dnode/mnode/impl/inc/mndStream.h | 17 +- source/dnode/mnode/impl/src/mndStream.c | 535 ++++-------------- source/dnode/mnode/impl/src/mndStreamHb.c | 232 +++++--- source/dnode/mnode/impl/src/mndStreamUtil.c | 585 +++++++++++++++++++- source/libs/stream/src/streamTask.c | 1 + 8 files changed, 883 insertions(+), 558 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 782b9a072d..4476eee447 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1827,12 +1827,18 @@ typedef struct { int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); // int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); -typedef struct { - int64_t tick; -} SMStreamTickReq; +typedef struct SOrphanTask { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SOrphanTask; -int32_t tSerializeSMStreamTickMsg(void* buf, int32_t bufLen, SMStreamTickReq* pReq); -// int32_t tDeserializeSMStreamTickMsg(void* buf, int32_t bufLen, SMStreamTickReq* pReq); +typedef struct SMStreamDropOrphanMsg { + SArray* pList; // SArray +} SMStreamDropOrphanMsg; + +int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); +int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); typedef struct { int32_t id; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3515df3127..b73a15ebcc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -224,9 +224,9 @@ TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // not used TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL) // not used TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL) @@ -251,6 +251,8 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9e663d495c..23356ca7c1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5275,12 +5275,22 @@ int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { // return 0; // } -int32_t tSerializeSMStreamTickMsg(void *buf, int32_t bufLen, SMStreamTickReq *pReq) { +int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI64(&encoder, pReq->tick) < 0) return -1; + + int32_t size = taosArrayGetSize(pMsg->pList); + if (tEncodeI32(&encoder, size) < 0) return -1; + + for (int32_t i = 0; i < size; i++) { + SOrphanTask *pTask = taosArrayGet(pMsg->pList, i); + if (tEncodeI64(&encoder, pTask->streamId) < 0) return -1; + if (tEncodeI32(&encoder, pTask->taskId) < 0) return -1; + if (tEncodeI32(&encoder, pTask->nodeId) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5288,17 +5298,34 @@ int32_t tSerializeSMStreamTickMsg(void *buf, int32_t bufLen, SMStreamTickReq *pR return tlen; } -// int32_t tDeserializeSMStreamTickMsg(void *buf, int32_t bufLen, SMStreamTickReq *pReq) { -// SDecoder decoder = {0}; -// tDecoderInit(&decoder, buf, bufLen); +int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); -// if (tStartDecode(&decoder) < 0) return -1; -// if (tDecodeI64(&decoder, &pReq->tick) < 0) return -1; -// tEndDecode(&decoder); + if (tStartDecode(&decoder) < 0) return -1; -// tDecoderClear(&decoder); -// return 0; -// } + int32_t num = 0; + if (tDecodeI32(&decoder, &num) < 0) return -1; + + if (num > 0) { + pMsg->pList = taosArrayInit(num, sizeof(SOrphanTask)); + if (NULL == pMsg->pList) return -1; + for (int32_t i = 0; i < num; ++i) { + SOrphanTask info = {0}; + if (tDecodeI64(&decoder, &info.streamId) < 0) return -1; + if (tDecodeI32(&decoder, &info.taskId) < 0) return -1; + if (tDecodeI32(&decoder, &info.nodeId) < 0) return -1; + + if (taosArrayPush(pMsg->pList, &info) == NULL) { + return -1; + } + } + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) { if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index bd0d97e34d..d713de5158 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -52,6 +52,11 @@ typedef struct SStreamTransMgmt { SHashObj *pDBTrans; } SStreamTransMgmt; +typedef struct SStreamTaskResetMsg { + int64_t streamId; + int32_t transId; +} SStreamTaskResetMsg; + typedef struct SStreamExecInfo { bool initTaskList; SArray *pNodeList; @@ -63,6 +68,7 @@ typedef struct SStreamExecInfo { SHashObj *pTransferStateStreams; SHashObj *pChkptStreams; SHashObj *pStreamConsensus; + SArray *pKilledChkptTrans; // SArray } SStreamExecInfo; extern SStreamExecInfo execInfo; @@ -75,12 +81,6 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SOrphanTask { - int64_t streamId; - int32_t taskId; - int32_t nodeId; -} SOrphanTask; - typedef struct { SMsgHead head; } SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp, SMStreamReqConsensChkptRsp; @@ -152,6 +152,11 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreChec void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); +int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); +int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); + +int32_t mndProcessResetStatusReq(SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e20529f4b6..297b747ea0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -61,9 +61,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); - - -static int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); +static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); @@ -121,6 +119,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); @@ -154,6 +154,7 @@ int32_t mndInitStream(SMnode *pMnode) { void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosArrayDestroy(execInfo.pNodeList); + taosArrayDestroy(execInfo.pKilledChkptTrans); taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.pTransferStateStreams); @@ -271,38 +272,12 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } -static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { - int8_t status = atomic_load_8(&pStream->status); - if (status == STREAM_STATUS__NORMAL) { - strcpy(dst, "ready"); - } else if (status == STREAM_STATUS__STOP) { - strcpy(dst, "stop"); - } else if (status == STREAM_STATUS__FAILED) { - strcpy(dst, "failed"); - } else if (status == STREAM_STATUS__RECOVER) { - strcpy(dst, "recover"); - } else if (status == STREAM_STATUS__PAUSE) { - strcpy(dst, "paused"); - } -} - SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; } SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; } int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; } int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; } int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; } -static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { - int8_t trigger = pStream->conf.trigger; - if (trigger == STREAM_TRIGGER_AT_ONCE) { - strcpy(dst, "at once"); - } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) { - strcpy(dst, "window close"); - } else if (trigger == STREAM_TRIGGER_MAX_DELAY) { - strcpy(dst, "max delay"); - } -} - static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || pCreate->targetStbFullName[0] == 0) { @@ -1365,8 +1340,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { mError("invalid drop stream msg recv, discarded"); - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + TAOS_RETURN(code); } mDebug("recv drop stream:%s msg", dropReq.name); @@ -1379,10 +1354,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { tFreeMDropStreamReq(&dropReq); return 0; } else { - terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + code = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); - return -1; + TAOS_RETURN(code); } } @@ -1399,11 +1374,11 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbCancelFetch(pMnode->pSdb, pIter); tFreeMDropStreamReq(&dropReq); - terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; + code = TSDB_CODE_TSMA_MUST_BE_DROPPED; mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma", dropReq.name, pStream->uid, tstrerror(terrno)); - return -1; + TAOS_RETURN(code); } if (pSma) { @@ -1425,7 +1400,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (conflict) { sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); - return -1; + return terrno; } STrans *pTrans = NULL; @@ -1434,26 +1409,35 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); - return -1; + TAOS_RETURN(code); } code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); - - // drop all tasks - if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { - mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, terrstr()); + if (code) { + mError("failed to register drop stream trans, code:%s", tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); - return -1; + TAOS_RETURN(code); + } + + // drop all tasks + code = mndStreamSetDropAction(pMnode, pTrans, pStream); + if (code) { + mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, tstrerror(code)); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + tFreeMDropStreamReq(&dropReq); + TAOS_RETURN(code); } // drop stream - if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED); + if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); - return -1; + TAOS_RETURN(code); } code = mndTransPrepare(pMnode, pTrans); @@ -1462,7 +1446,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); - return -1; + TAOS_RETURN(code); } // kill the related checkpoint trans @@ -1488,7 +1472,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (code == 0) { return TSDB_CODE_ACTION_IN_PROGRESS; } else { - return code; + TAOS_RETURN(code); } } @@ -1570,15 +1554,6 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) return 0; } -static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { - memset(pBuf, 0, bufLen); - pBuf[2] = '0'; - pBuf[3] = 'x'; - - int32_t len = tintToHex(id, &pBuf[4]); - varDataSetLen(pBuf, len + 2); -} - static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1606,379 +1581,6 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) { - int32_t code = 0; - int32_t cols = 0; - int32_t lino = 0; - - char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); - TSDB_CHECK_CODE(code, lino, _end); - - // create time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); - TSDB_CHECK_CODE(code, lino, _end); - - // stream id - char buf[128] = {0}; - int64ToHexStr(pStream->uid, buf, tListLen(buf)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, buf, false); - TSDB_CHECK_CODE(code, lino, _end); - - // related fill-history stream id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pStream->hTaskUid != 0) { - int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf)); - code = colDataSetVal(pColInfo, numOfRows, buf, false); - } else { - code = colDataSetVal(pColInfo, numOfRows, buf, true); - } - TSDB_CHECK_CODE(code, lino, _end); - - // related fill-history stream id - char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); - TSDB_CHECK_CODE(code, lino, _end); - - char status[20 + VARSTR_HEADER_SIZE] = {0}; - char status2[20] = {0}; - mndShowStreamStatus(status2, pStream); - STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); - TSDB_CHECK_CODE(code, lino, _end); - - char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false); - TSDB_CHECK_CODE(code, lino, _end); - - char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false); - TSDB_CHECK_CODE(code, lino, _end); - - if (pStream->targetSTbName[0] == 0) { - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, NULL, true); - } else { - char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false); - } - TSDB_CHECK_CODE(code, lino, _end); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false); - TSDB_CHECK_CODE(code, lino, _end); - - char trigger[20 + VARSTR_HEADER_SIZE] = {0}; - char trigger2[20] = {0}; - mndShowStreamTrigger(trigger2, pStream); - STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger)); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); - TSDB_CHECK_CODE(code, lino, _end); - - // sink_quota - char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; - sinkQuota[0] = '0'; - char dstStr[20] = {0}; - STR_TO_VARSTR(dstStr, sinkQuota) - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint interval - char tmp[20 + VARSTR_HEADER_SIZE] = {0}; - sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval); - varDataSetLen(tmp, strlen(varDataVal(tmp))); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint backup type - char backup[20 + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(backup, "none") - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); - TSDB_CHECK_CODE(code, lino, _end); - - // history scan idle - char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; - strcpy(scanHistoryIdle, "100a"); - - memset(dstStr, 0, tListLen(dstStr)); - STR_TO_VARSTR(dstStr, scanHistoryIdle) - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); - - _end: - return code; -} - -static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { - SColumnInfoData *pColInfo; - int32_t cols = 0; - int32_t code = 0; - int32_t lino = 0; - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); - if (pe == NULL) { - mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64 - " no valid status/stage info", - id.taskId, pStream->name, pStream->uid, pStream->createTime); - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - // stream name - char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); - TSDB_CHECK_CODE(code, lino, _end); - - // task id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - char idstr[128] = {0}; - int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr)); - code = colDataSetVal(pColInfo, numOfRows, idstr, false); - TSDB_CHECK_CODE(code, lino, _end); - - // node type - char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; - varDataSetLen(nodeType, 5); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pTask->info.nodeId > 0) { - memcpy(varDataVal(nodeType), "vnode", 5); - } else { - memcpy(varDataVal(nodeType), "snode", 5); - } - code = colDataSetVal(pColInfo, numOfRows, nodeType, false); - TSDB_CHECK_CODE(code, lino, _end); - - // node id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int64_t nodeId = TMAX(pTask->info.nodeId, 0); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); - TSDB_CHECK_CODE(code, lino, _end); - - // level - char level[20 + VARSTR_HEADER_SIZE] = {0}; - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - memcpy(varDataVal(level), "source", 6); - varDataSetLen(level, 6); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - memcpy(varDataVal(level), "agg", 3); - varDataSetLen(level, 3); - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - memcpy(varDataVal(level), "sink", 4); - varDataSetLen(level, 4); - } - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false); - TSDB_CHECK_CODE(code, lino, _end); - - // status - char status[20 + VARSTR_HEADER_SIZE] = {0}; - - const char *pStatus = streamTaskGetStatusStr(pe->status); - STR_TO_VARSTR(status, pStatus); - - // status - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false); - TSDB_CHECK_CODE(code, lino, _end); - - // stage - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); - TSDB_CHECK_CODE(code, lino, _end); - - // input queue - char vbuf[40] = {0}; - char buf[38] = {0}; - const char *queueInfoStr = "%4.2f MiB (%6.2f%)"; - snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate); - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - - // input total - const char *formatTotalMb = "%7.2f MiB"; - const char *formatTotalGb = "%7.2f GiB"; - if (pe->procsTotal < 1024) { - snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal); - } else { - snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024); - } - - memset(vbuf, 0, tListLen(vbuf)); - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - - // process throughput - const char *formatKb = "%7.2f KiB/s"; - const char *formatMb = "%7.2f MiB/s"; - if (pe->procsThroughput < 1024) { - snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput); - } else { - snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024); - } - - memset(vbuf, 0, tListLen(vbuf)); - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - - // output total - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - colDataSetNULL(pColInfo, numOfRows); - } else { - sprintf(buf, formatTotalMb, pe->outputTotal); - memset(vbuf, 0, tListLen(vbuf)); - STR_TO_VARSTR(vbuf, buf); - - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - } - - // output throughput - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - colDataSetNULL(pColInfo, numOfRows); - } else { - if (pe->outputThroughput < 1024) { - snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput); - } else { - snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024); - } - - memset(vbuf, 0, tListLen(vbuf)); - STR_TO_VARSTR(vbuf, buf); - - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - } - - // output queue - // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); - // STR_TO_VARSTR(vbuf, buf); - - // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); - - // info - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - const char *sinkStr = "%.2f MiB"; - snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - // offset info - const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); - } else { - memset(buf, 0, tListLen(buf)); - } - - STR_TO_VARSTR(vbuf, buf); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); - TSDB_CHECK_CODE(code, lino, _end); - - // start_time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false); - TSDB_CHECK_CODE(code, lino, _end); - - // start id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false); - TSDB_CHECK_CODE(code, lino, _end); - - // start ver - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false); - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pe->checkpointInfo.latestTime != 0) { - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false); - } else { - code = colDataSetVal(pColInfo, numOfRows, 0, true); - } - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint_id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false); - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint version - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false); - TSDB_CHECK_CODE(code, lino, _end); - - // checkpoint size - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetNULL(pColInfo, numOfRows); - - // checkpoint backup status - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, 0, true); - TSDB_CHECK_CODE(code, lino, _end); - - // ds_err_info - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, 0, true); - TSDB_CHECK_CODE(code, lino, _end); - - // history_task_id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - if (pe->hTaskId != 0) { - int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr)); - code = colDataSetVal(pColInfo, numOfRows, idstr, false); - } else { - code = colDataSetVal(pColInfo, numOfRows, 0, true); - } - TSDB_CHECK_CODE(code, lino, _end); - - // history_task_status - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - code = colDataSetVal(pColInfo, numOfRows, 0, true); - TSDB_CHECK_CODE(code, lino, _end); - - _end: - return code; -} - static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -3141,3 +2743,80 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, return TSDB_CODE_ACTION_IN_PROGRESS; } + +static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = 0; + SOrphanTask *pTask = NULL; + int32_t i = 0; + + SMStreamDropOrphanMsg msg = {0}; + code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg); + if (code) { + return code; + } + + int32_t numOfTasks = taosArrayGetSize(msg.pList); + if (numOfTasks == 0) { + mDebug("no orphan tasks to drop, no need to create trans"); + return code; + } + + mDebug("create trans to drop %d orphan tasks", numOfTasks); + + i = 0; + while (i < numOfTasks && ((pTask = taosArrayGet(msg.pList, i)) == NULL)) { + i += 1; + } + + if (pTask == NULL) { + mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task"); + return TSDB_CODE_SUCCESS; + } + + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); + if (conflict) { + return -1; + } + + SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); + if (pTrans == NULL || code != 0) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + return code; + } + + code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); + if (code) { + return code; + } + + // drop all tasks + if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mndTransDrop(pTrans); + return code; + } + + // drop stream + if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) { + mndTransDrop(pTrans); + return code; + } + + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return code; + } + + if (code == TSDB_CODE_SUCCESS) { + mDebug("create drop %d orphan tasks trans succ", numOfTasks); + } + + mndTransDrop(pTrans); + return code; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 507cafabe5..11556a212d 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,12 +22,12 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); +static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); +static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); +static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); -static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); -static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); @@ -37,6 +37,10 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); + if (pNodeEntry == NULL) { + continue; + } + if (pNodeEntry->nodeId == pTaskEntry->nodeId) { mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); @@ -52,7 +56,7 @@ void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); - if (p->transId == pInfo->transId) { + if (p && (p->transId == pInfo->transId)) { return; } } @@ -104,15 +108,110 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - return TSDB_CODE_ACTION_IN_PROGRESS; + return code; } -int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { - int32_t code = TSDB_CODE_SUCCESS; - mndKillTransImpl(pMnode, transId, ""); +int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) { + int32_t size = sizeof(SStreamTaskResetMsg); + int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans); + for(int32_t i = 0; i < num; ++i) { + SStreamTaskResetMsg* p = taosArrayGet(execInfo.pKilledChkptTrans, i); + if (p == NULL) { + continue; + } + + if (p->transId == transId && p->streamId == streamId) { + mDebug("already reset stream:0x%" PRIx64 ", not send reset-msg again for transId:%d", streamId, transId); + return TSDB_CODE_SUCCESS; + } + } + + if (num >= 10) { + taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail + } + + SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId}; + + void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p); + if (px == NULL) { + mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1); + return terrno; + } + + SStreamTaskResetMsg *pReq = rpcMallocCont(size); + if (pReq == NULL) { + return terrno; + } + + pReq->streamId = streamId; + pReq->transId = transId; + + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size}; + int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (code) { + mError("failed to put reset-task msg into write queue, code:%s", tstrerror(code)); + } else { + mDebug("send reset task status msg for transId:%d succ", transId); + } + + return code; +} + +int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode) { // here reuse the doCheckpointmsg + int32_t size = sizeof(SMStreamDoCheckpointMsg); + void *pMsg = rpcMallocCont(size); + if (pMsg == NULL) { + return terrno; + } + + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; + int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (code) { + mError("failed to put update-checkpoint-info msg into write queue, code:%s", tstrerror(code)); + } else { + mDebug("send update checkpoint-info msg succ"); + } + + return code; +} + +int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) { + SMStreamDropOrphanMsg msg = {.pList = pList}; + + int32_t num = taosArrayGetSize(pList); + int32_t contLen = tSerializeDropOrphanTaskMsg(NULL, 0, &msg); + if (contLen <= 0) { + return terrno; + } + + void *pReq = rpcMallocCont(contLen); + if (pReq == NULL) { + return terrno; + } + + (void)tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); + + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen}; + int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (code) { + mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code)); + } else { + mDebug("send drop %d orphan tasks msg succ", num); + } + + return code; +} + +int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = TSDB_CODE_SUCCESS; SStreamObj *pStream = NULL; - code = mndGetStreamObj(pMnode, streamId, &pStream); + + SStreamTaskResetMsg* pMsg = pReq->pCont; + mndKillTransImpl(pMnode, pMsg->transId, ""); + + code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); if (pStream == NULL || code != 0) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); @@ -123,7 +222,7 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t t pStream->sourceDb, pStream->targetSTbName); } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, - pStream->uid, transId); + pStream->uid, pMsg->transId); code = mndCreateStreamResetStatusTrans(pMnode, pStream); } } @@ -138,6 +237,10 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { for (int k = 0; k < num; ++k) { int32_t *pVgId = taosArrayGet(pNodeList, k); + if (pVgId == NULL) { + continue; + } + mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); bool setFlag = false; @@ -145,8 +248,7 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { for (int i = 0; i < numOfNodes; ++i) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); - - if (pNodeEntry->nodeId == *pVgId) { + if ((pNodeEntry) && (pNodeEntry->nodeId == *pVgId)) { mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; setFlag = true; @@ -162,52 +264,6 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { return TSDB_CODE_SUCCESS; } -int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { - SOrphanTask *pTask = taosArrayGet(pList, 0); - - // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); - if (conflict) { - return -1; - } - - SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; - STrans *pTrans = NULL; - int32_t code = - doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); - if (pTrans == NULL || code != 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); - return code; - } - - code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); - if (code) { - return code; - } - // drop all tasks - if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, pList)) < 0) { - mError("failed to create trans to drop orphan tasks since %s", terrstr()); - mndTransDrop(pTrans); - return code; - } - - // drop stream - if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) { - mndTransDrop(pTrans); - return code; - } - - code = mndTransPrepare(pMnode, pTrans); - if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return code; - } - - mndTransDrop(pTrans); - return code; -} - int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -259,7 +315,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamHbMsg req = {0}; SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; - int32_t code = 0; + int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { @@ -273,8 +329,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (tDecodeStreamHbMsg(&decoder, &req) < 0) { tCleanupStreamHbMsg(&req); tDecoderClear(&decoder); - code = terrno = TSDB_CODE_INVALID_MSG; - return code; + TAOS_RETURN(TSDB_CODE_INVALID_MSG); } tDecoderClear(&decoder); @@ -282,6 +337,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); + if (pFailedChkpt == NULL || pOrphanTasks == NULL) { + taosArrayDestroy(pFailedChkpt); + taosArrayDestroy(pOrphanTasks); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } streamMutexLock(&execInfo.lock); @@ -289,12 +349,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); - code = terrno = TSDB_CODE_INVALID_MSG; doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); - return code; + TAOS_RETURN(TSDB_CODE_INVALID_MSG); } int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); @@ -306,15 +365,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + if (p == NULL) { + continue; + } STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { - mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); + mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; void* px = taosArrayPush(pOrphanTasks, &oTask); if (px == NULL) { - mError("Failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); + mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); } continue; } @@ -331,13 +393,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { - code = TSDB_CODE_STREAM_TASK_NOT_EXIST; continue; } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); - SCheckpointConsensusInfo *pInfo = NULL; + SCheckpointConsensusInfo *pInfo = NULL; code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo); if (code == 0) { mndAddConsensusTasks(pInfo, &cp); @@ -357,7 +418,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { streamTaskStatusCopy(pTaskEntry, p); if ((pChkInfo->activeId != 0) && pChkInfo->failed) { - mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, + mError("stream task:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed, kill it", p->id.taskId, pChkInfo->activeId, pChkInfo->activeTransId); SFailedCheckpointInfo info = { @@ -372,13 +433,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - if (p->status == pTaskEntry->status) { - pTaskEntry->statusLastDuration++; - } else { - pTaskEntry->status = p->status; - pTaskEntry->statusLastDuration = 0; - } - if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } @@ -391,7 +445,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pMnode != NULL) { SArray *p = NULL; - code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); taosArrayDestroy(p); if (code) { @@ -405,10 +458,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal for (int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) { SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i); + if (pInfo == NULL) { + continue; + } + mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); - code = mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); + code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId); if (code) { mError("failed to create reset task trans, code:%s", tstrerror(code)); } @@ -420,11 +477,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors. if (taosArrayGetSize(pOrphanTasks) > 0) { - code = mndDropOrphanTasks(pMnode, pOrphanTasks); + code = mndSendDropOrphanTasksMsg(pMnode, pOrphanTasks); + if (code) { + mError("failed to send drop orphan tasks msg, code:%s, try next time", tstrerror(code)); + } } if (pMnode != NULL) { // make sure that the unit test case can work - mndStreamStartUpdateCheckpointInfo(pMnode); + mndStreamSendUpdateChkptInfoMsg(pMnode); } streamMutexUnlock(&execInfo.lock); @@ -435,22 +495,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { return code; } -void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg - SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - if (pMsg != NULL) { - int32_t size = sizeof(SMStreamDoCheckpointMsg); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; - int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - if (code) { - mError("failed to put into write Queue, code:%s", tstrerror(code)); - } - } -} - bool validateHbMsg(const SArray *pNodeList, int32_t vgId) { for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeEntry *pEntry = taosArrayGet(pNodeList, i); - if (pEntry->nodeId == vgId) { + if ((pEntry) && (pEntry->nodeId == vgId)) { return true; } } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 548eb118c7..6640841e5a 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -17,6 +17,8 @@ #include "mndTrans.h" #include "tmisce.h" #include "mndVgroup.h" +#include "mndStb.h" +#include "mndDb.h" struct SStreamTaskIter { SStreamObj *pStream; @@ -31,7 +33,6 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) { *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); if (*pIter == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -96,6 +97,9 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { *allReady = true; SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pVgroupList == NULL) { + return terrno; + } while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -511,6 +515,10 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SOrphanTask* pTask = taosArrayGet(pList, i); + if (pTask == NULL) { + return terrno; + } + int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask); if (code != 0) { return code; @@ -530,8 +538,8 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang pMsg->transId = transId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); if (pMsg->pNodeList == NULL) { - mError("failed to prepare node list, code:out of memory"); - code = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to prepare node list, code:%s", tstrerror(terrno)); + code = terrno; } if (code == 0) { @@ -561,7 +569,6 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosArrayDestroy(req.pNodeList); return terrno; } @@ -655,9 +662,8 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgro static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + tstrerror(terrno)); return terrno; } @@ -734,6 +740,14 @@ int32_t mndInitExecInfo() { execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); + execInfo.pKilledChkptTrans = taosArrayInit(4, sizeof(SStreamTaskResetMsg)); + + if (execInfo.pTaskList == NULL || execInfo.pTaskMap == NULL || execInfo.transMgmt.pDBTrans == NULL || + execInfo.pTransferStateStreams == NULL || execInfo.pChkptStreams == NULL || execInfo.pStreamConsensus == NULL || + execInfo.pNodeList == NULL || execInfo.pKilledChkptTrans == NULL) { + mError("failed to initialize the stream runtime env, code:%s", tstrerror(terrno)); + return terrno; + } taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); @@ -743,14 +757,25 @@ int32_t mndInitExecInfo() { void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pValidList == NULL) { // not continue + return; + } + int32_t size = taosArrayGetSize(pNodeSnapshot); int32_t oldSize = taosArrayGetSize(execInfo.pNodeList); for (int32_t i = 0; i < oldSize; ++i) { SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i); + if (p == NULL) { + continue; + } for (int32_t j = 0; j < size; ++j) { SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); + if (pEntry == NULL) { + continue; + } + if (pEntry->nodeId == p->nodeId) { void* px = taosArrayPush(pValidList, p); if (px == NULL) { @@ -781,6 +806,10 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId == NULL) { + continue; + } + if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { taosArrayRemove(pExecNode->pTaskList, k); @@ -796,6 +825,10 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { STaskId *pId = taosArrayGet(pTaskIds, i); + if (pId == NULL) { + continue; + } + int32_t code = doRemoveTasks(pExecInfo, pId); if (code) { mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId); @@ -843,6 +876,10 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { for (int32_t i = 0; i < num; ++i) { SNodeEntry *pEntry = taosArrayGet(pList, i); + if (pEntry == NULL) { + continue; + } + if (pEntry->nodeId == nodeId) { return true; } @@ -853,12 +890,22 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); + if (pRemovedTasks == NULL) { + return terrno; + } int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); for (int32_t i = 0; i < numOfTask; ++i) { STaskId *pId = taosArrayGet(execInfo.pTaskList, i); + if (pId == NULL) { + continue; + } STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry == NULL) { + continue; + } + if (pEntry->nodeId == SNODE_HANDLE) { continue; } @@ -902,6 +949,10 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas int32_t size = taosArrayGetSize(*pReqTaskList); for(int32_t i = 0; i < size; ++i) { STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); + if (pInfo == NULL) { + continue; + } + if (pInfo->taskId == pTask->id.taskId) { pReq->checkpointId = pInfo->checkpointId; pReq->checkpointVer = pInfo->version; @@ -965,8 +1016,11 @@ int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; - SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); int32_t code = 0; + SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); + if (pDropped == NULL) { + return terrno; + } mDebug("start to scan checkpoint report info"); @@ -974,11 +1028,15 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SArray *pList = *(SArray **)pIter; STaskChkptInfo *pInfo = taosArrayGet(pList, 0); - SStreamObj *pStream = NULL; + if (pInfo == NULL) { + continue; + } + + SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); if (pStream == NULL || code != 0) { mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); - void* p = taosArrayPush(pDropped, &pInfo->streamId); + void *p = taosArrayPush(pDropped, &pInfo->streamId); if (p == NULL) { mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId); } @@ -1022,10 +1080,14 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t size = taosArrayGetSize(pDropped); if (size > 0) { for (int32_t i = 0; i < size; ++i) { - int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); - code = taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); + int64_t* pStreamId = (int64_t *)taosArrayGet(pDropped, i); + if (pStreamId == NULL) { + continue; + } + + code = taosHashRemove(execInfo.pChkptStreams, pStreamId, sizeof(*pStreamId)); if (code) { - mError("failed to remove stream in buf:0x%"PRIx64, streamId); + mError("failed to remove stream in buf:0x%"PRIx64, *pStreamId); } } @@ -1051,14 +1113,14 @@ static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStream int32_t blen; tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); if (code < 0) { - return terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t tlen = sizeof(SMsgHead) + blen; void *pBuf = taosMemoryMalloc(tlen); if (pBuf == NULL) { - return terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); @@ -1160,6 +1222,10 @@ int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTask .streamId = streamId, }; + if (p.pTaskList == NULL) { + return terrno; + } + int32_t code = taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p)); if (code == 0) { void *pChkptInfo = (SCheckpointConsensusInfo *)taosHashGet(pHash, &streamId, sizeof(streamId)); @@ -1167,6 +1233,7 @@ int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTask } else { *pInfo = NULL; } + return code; } @@ -1178,6 +1245,10 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) { SCheckpointConsensusEntry *p = taosArrayGet(pInfo->pTaskList, i); + if (p == NULL) { + continue; + } + if (p->req.taskId == info.req.taskId) { mDebug("s-task:0x%x already in consensus-checkpointId list for stream:0x%" PRIx64 ", update ts %" PRId64 "->%" PRId64 " total existed:%d", @@ -1218,5 +1289,491 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list, remain:%d", streamId, numOfStreams); } + return code; +} + +static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { + int8_t status = atomic_load_8(&pStream->status); + if (status == STREAM_STATUS__NORMAL) { + strcpy(dst, "ready"); + } else if (status == STREAM_STATUS__STOP) { + strcpy(dst, "stop"); + } else if (status == STREAM_STATUS__FAILED) { + strcpy(dst, "failed"); + } else if (status == STREAM_STATUS__RECOVER) { + strcpy(dst, "recover"); + } else if (status == STREAM_STATUS__PAUSE) { + strcpy(dst, "paused"); + } +} + +static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { + int8_t trigger = pStream->conf.trigger; + if (trigger == STREAM_TRIGGER_AT_ONCE) { + strcpy(dst, "at once"); + } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) { + strcpy(dst, "window close"); + } else if (trigger == STREAM_TRIGGER_MAX_DELAY) { + strcpy(dst, "max delay"); + } +} + +static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { + memset(pBuf, 0, bufLen); + pBuf[2] = '0'; + pBuf[3] = 'x'; + + int32_t len = tintToHex(id, &pBuf[4]); + varDataSetLen(pBuf, len + 2); +} + +int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) { + int32_t code = 0; + int32_t cols = 0; + int32_t lino = 0; + + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + TSDB_CHECK_CODE(code, lino, _end); + + // create time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + TSDB_CHECK_CODE(code, lino, _end); + + // stream id + char buf[128] = {0}; + int64ToHexStr(pStream->uid, buf, tListLen(buf)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + code = colDataSetVal(pColInfo, numOfRows, buf, false); + TSDB_CHECK_CODE(code, lino, _end); + + // related fill-history stream id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + if (pStream->hTaskUid != 0) { + int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf)); + code = colDataSetVal(pColInfo, numOfRows, buf, false); + } else { + code = colDataSetVal(pColInfo, numOfRows, buf, true); + } + TSDB_CHECK_CODE(code, lino, _end); + + // related fill-history stream id + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); + TSDB_CHECK_CODE(code, lino, _end); + + char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status2[20] = {0}; + mndShowStreamStatus(status2, pStream); + STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); + TSDB_CHECK_CODE(code, lino, _end); + + char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false); + TSDB_CHECK_CODE(code, lino, _end); + + char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false); + TSDB_CHECK_CODE(code, lino, _end); + + if (pStream->targetSTbName[0] == 0) { + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, NULL, true); + } else { + char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false); + } + TSDB_CHECK_CODE(code, lino, _end); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false); + TSDB_CHECK_CODE(code, lino, _end); + + char trigger[20 + VARSTR_HEADER_SIZE] = {0}; + char trigger2[20] = {0}; + mndShowStreamTrigger(trigger2, pStream); + STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + TSDB_CHECK_CODE(code, lino, _end); + + // sink_quota + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint interval + char tmp[20 + VARSTR_HEADER_SIZE] = {0}; + sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval); + varDataSetLen(tmp, strlen(varDataVal(tmp))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint backup type + char backup[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(backup, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); + TSDB_CHECK_CODE(code, lino, _end); + + // history scan idle + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + +_end: + if (code) { + mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code)); + } + return code; +} + +int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows) { + SColumnInfoData *pColInfo = NULL; + int32_t cols = 0; + int32_t code = 0; + int32_t lino = 0; + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64 + " no valid status/stage info", + id.taskId, pStream->name, pStream->uid, pStream->createTime); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + TSDB_CHECK_CODE(code, lino, _end); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + char idstr[128] = {0}; + int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr)); + code = colDataSetVal(pColInfo, numOfRows, idstr, false); + TSDB_CHECK_CODE(code, lino, _end); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + if (pTask->info.nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + code = colDataSetVal(pColInfo, numOfRows, nodeType, false); + TSDB_CHECK_CODE(code, lino, _end); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + int64_t nodeId = TMAX(pTask->info.nodeId, 0); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false); + TSDB_CHECK_CODE(code, lino, _end); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)level, false); + TSDB_CHECK_CODE(code, lino, _end); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + + const char *pStatus = streamTaskGetStatusStr(pe->status); + STR_TO_VARSTR(status, pStatus); + + // status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + TSDB_CHECK_CODE(code, lino, _end); + + // stage + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); + TSDB_CHECK_CODE(code, lino, _end); + + // input queue + char vbuf[40] = {0}; + char buf[38] = {0}; + const char *queueInfoStr = "%4.2f MiB (%6.2f%)"; + snprintf(buf, tListLen(buf), queueInfoStr, pe->inputQUsed, pe->inputRate); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + + // input total + const char *formatTotalMb = "%7.2f MiB"; + const char *formatTotalGb = "%7.2f GiB"; + if (pe->procsTotal < 1024) { + snprintf(buf, tListLen(buf), formatTotalMb, pe->procsTotal); + } else { + snprintf(buf, tListLen(buf), formatTotalGb, pe->procsTotal / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + + // process throughput + const char *formatKb = "%7.2f KiB/s"; + const char *formatMb = "%7.2f MiB/s"; + if (pe->procsThroughput < 1024) { + snprintf(buf, tListLen(buf), formatKb, pe->procsThroughput); + } else { + snprintf(buf, tListLen(buf), formatMb, pe->procsThroughput / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + + // output total + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + colDataSetNULL(pColInfo, numOfRows); + } else { + sprintf(buf, formatTotalMb, pe->outputTotal); + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + } + + // output throughput + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + colDataSetNULL(pColInfo, numOfRows); + } else { + if (pe->outputThroughput < 1024) { + snprintf(buf, tListLen(buf), formatKb, pe->outputThroughput); + } else { + snprintf(buf, tListLen(buf), formatMb, pe->outputThroughput / 1024); + } + + memset(vbuf, 0, tListLen(vbuf)); + STR_TO_VARSTR(vbuf, buf); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + } + + // output queue + // sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate); + // STR_TO_VARSTR(vbuf, buf); + + // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + + // info + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + const char *sinkStr = "%.2f MiB"; + snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); + } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + // offset info + const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + } else { + memset(buf, 0, tListLen(buf)); + } + + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + TSDB_CHECK_CODE(code, lino, _end); + + // start_time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startTime, false); + TSDB_CHECK_CODE(code, lino, _end); + + // start id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointId, false); + TSDB_CHECK_CODE(code, lino, _end); + + // start ver + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->startCheckpointVer, false); + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + if (pe->checkpointInfo.latestTime != 0) { + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false); + } else { + code = colDataSetVal(pColInfo, numOfRows, 0, true); + } + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestId, false); + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint version + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestVer, false); + TSDB_CHECK_CODE(code, lino, _end); + + // checkpoint size + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + colDataSetNULL(pColInfo, numOfRows); + + // checkpoint backup status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, 0, true); + TSDB_CHECK_CODE(code, lino, _end); + + // ds_err_info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, 0, true); + TSDB_CHECK_CODE(code, lino, _end); + + // history_task_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + if (pe->hTaskId != 0) { + int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr)); + code = colDataSetVal(pColInfo, numOfRows, idstr, false); + } else { + code = colDataSetVal(pColInfo, numOfRows, 0, true); + } + TSDB_CHECK_CODE(code, lino, _end); + + // history_task_status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); + + code = colDataSetVal(pColInfo, numOfRows, 0, true); + TSDB_CHECK_CODE(code, lino, _end); + + _end: + if (code) { + mError("error happens during build task attr result blocks, lino:%d, code:%s", lino, tstrerror(code)); + } return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 99528d01b0..86090fed43 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -872,6 +872,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->checkpointInfo = pSrc->checkpointInfo; pDst->startCheckpointId = pSrc->startCheckpointId; pDst->startCheckpointVer = pSrc->startCheckpointVer; + pDst->status = pSrc->status; pDst->startTime = pSrc->startTime; pDst->hTaskId = pSrc->hTaskId; From 715b6428aaae8a0d10b1e35cab8599f9a0a657ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 16:33:44 +0800 Subject: [PATCH 2/2] fix(stream): update the merge result check. --- source/libs/stream/src/streamData.c | 8 ++++++-- source/libs/stream/src/streamQueue.c | 11 ++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 00a62d4773..57e5322e38 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -219,13 +219,17 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; - (void) taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); + void* px = taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); + if (px == NULL) { + return terrno; + } + taosArrayDestroy(pBlockSrc->blocks); streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); taosFreeQitem(pElem); *pRes = dst; - return TSDB_CODE_SUCCESS; + return code; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 537062b04e..5e538c1e42 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -231,13 +231,14 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte if (*pInput == NULL) { ASSERT((*numOfBlocks) == 0); *pInput = qItem; - } else { - // merge current block failed, let's handle the already merged blocks. + } else { // merge current block failed, let's handle the already merged blocks. void* newRet = NULL; int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, - tstrerror(terrno)); + if (newRet == NULL) { + if (code) { + stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(code)); + } *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) {