Merge pull request #26344 from taosdata/fix/3_liaohj

fix(stream): validate the stream hb msg, and discard the invalid hb msg.
This commit is contained in:
Haojun Liao 2024-07-02 09:07:31 +08:00 committed by GitHub
commit 420b59fea5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1112 additions and 410 deletions

View File

@ -3506,12 +3506,6 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
int64_t recoverObjUid;
int32_t taskId;
int32_t hasCheckPoint;
} SMVStreamGatherInfoReq;
typedef struct SVUpdateCheckpointInfoReq {
SMsgHead head;
int64_t streamId;
@ -3537,50 +3531,8 @@ typedef struct {
int64_t suid;
} SMqRebVgReq;
static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(pCoder, pReq->subType) < 0) return -1;
if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1;
if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
}
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq);
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];

View File

@ -250,7 +250,9 @@
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
@ -331,7 +333,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) //unused
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)

View File

@ -29,6 +29,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);

View File

@ -17,6 +17,7 @@
#define TDENGINE_STREAMMSG_H
#include "tmsg.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@ -162,6 +163,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t msgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
@ -171,6 +173,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
SMsgHead head;
int32_t msgId;
} SMStreamHbRspMsg;
typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;
@ -204,6 +211,28 @@ typedef struct SCheckpointReport {
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq);
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq);
typedef struct SRestoreCheckpointInfo {
SMsgHead head;
int64_t startTs;
int64_t streamId;
int64_t checkpointId; // latest checkpoint id
int32_t taskId;
int32_t nodeId;
} SRestoreCheckpointInfo;
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
typedef struct SRestoreCheckpointInfoRsp {
int64_t streamId;
int64_t checkpointId;
int64_t startTs;
int32_t taskId;
} SRestoreCheckpointInfoRsp;
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo);
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo);
typedef struct {
SMsgHead head;
int64_t streamId;
@ -211,6 +240,12 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
SRpcMsg rsp;
int64_t ts;
} SCheckpointConsensusEntry;
#ifdef __cplusplus
}
#endif

View File

@ -17,8 +17,9 @@
#define TDENGINE_TSTREAM_H
#include "os.h"
#include "streamMsg.h"
#include "streamState.h"
#include "streammsg.h"
#include "streamMsg.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
@ -265,12 +266,13 @@ typedef struct SStreamTaskId {
} SStreamTaskId;
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t startTs;
int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
SActiveCheckpointInfo* pActiveInfo;
int64_t msgVer;
} SCheckpointInfo;
@ -523,7 +525,6 @@ typedef struct STaskUpdateEntry {
} STaskUpdateEntry;
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
@ -613,6 +614,12 @@ typedef struct SStreamTaskState {
char* name;
} SStreamTaskState;
typedef struct SCheckpointConsensusInfo {
SArray* pTaskList;
int64_t checkpointId;
int64_t genTs;
} SCheckpointConsensusInfo;
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related
@ -704,6 +711,7 @@ void streamTaskSetRemoveBackendFiles(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
@ -746,14 +754,15 @@ void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask);
// timer
tmr_h streamTimerGetInstance();
@ -790,6 +799,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* req);
void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
#ifdef __cplusplus
}
#endif

View File

@ -828,7 +828,6 @@ TEST(clientCase, projection_query_tables) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
pRes= taos_query(pConn, "use abc1");
taos_free_result(pRes);

View File

@ -151,9 +151,9 @@ void startRsync() {
// start rsync service to backup checkpoint
code = system(cmd);
if (code != 0) {
uError("[rsync] start server failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
} else {
uDebug("[rsync] start server successful");
uInfo("[rsync] cmd:%s start server successful", cmd);
}
}

View File

@ -9296,6 +9296,52 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
}
void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); }
int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(pCoder, pReq->subType) < 0) return -1;
if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1;
if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
}
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);

View File

@ -242,6 +242,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -96,6 +96,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -972,6 +972,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -34,6 +34,7 @@ extern "C" {
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update"
#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen"
typedef struct SStreamTransInfo {
int64_t startTime;
@ -61,6 +62,7 @@ typedef struct SStreamExecInfo {
TdThreadMutex lock;
SHashObj *pTransferStateStreams;
SHashObj *pChkptStreams;
SHashObj *pStreamConsensus;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
@ -81,7 +83,7 @@ typedef struct SOrphanTask {
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
typedef struct STaskChkptInfo {
int32_t nodeId;
@ -131,6 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
@ -142,6 +146,14 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg);
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream);
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
#ifdef __cplusplus
}
#endif

View File

@ -59,6 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -117,6 +118,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
@ -150,6 +152,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.pTransferStateStreams);
taosHashCleanup(execInfo.pChkptStreams);
taosHashCleanup(execInfo.pStreamConsensus);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup");
}
@ -1233,6 +1236,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p);
// clear the consensus checkpoint info
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid);
if (code != -1) {
started += 1;
@ -2513,7 +2519,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
return 0;
}
static void doAddTaskInfo(SArray *pList, SCheckpointReport *pReport) {
static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) {
bool existed = false;
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo *p = taosArrayGet(pList, i);
@ -2584,12 +2590,12 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
doAddTaskInfo(pList, &req);
doAddReportStreamTask(pList, &req);
taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskInfo(*pReqTaskList, &req);
doAddReportStreamTask(*pReqTaskList, &req);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
@ -2597,25 +2603,6 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" will be issued soon",
req.streamId, pStream->name, total, req.checkpointId);
// if (pStream != NULL) {
// bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
// if (conflict) {
// mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId);
// } else {
// int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList);
// if (code == TSDB_CODE_SUCCESS) { // remove this entry
// taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
//
// int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
// mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId,
// numOfStreams);
// } else {
// mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet",
// req.streamId);
// }
// }
// }
}
if (pStream != NULL) {
@ -2637,6 +2624,101 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SDecoder decoder = {0};
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeRestoreCheckpointInfo(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task consensus-checkpoint msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64,
req.nodeId, req.streamId, req.taskId, req.checkpointId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
}
}
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
if (ckId != -1) { // consensus checkpoint id already exist
SRpcMsg rsp = {0};
rsp.code = 0;
rsp.info = pReq->info;
rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
return TSDB_CODE_SUCCESS;
}
mndAddConsensusTasks(pInfo, &req, pReq);
int32_t total = 0;
if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs
// start transaction to set the checkpoint id
int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream);
mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64
" will be issued soon",
req.streamId, pStream->name, numOfTasks, checkpointId);
// start the checkpoint consensus trans
int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId);
if (code == TSDB_CODE_SUCCESS) {
mndClearConsensusRspEntry(pInfo);
mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId);
} else {
mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId);
}
} else {
mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks);
}
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
return 0;
}
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessCreateStreamReq(pReq);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -22,9 +22,18 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
@ -39,7 +48,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
}
}
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo *p = taosArrayGet(pList, i);
@ -86,7 +95,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
mndKillTransImpl(pMnode, transId, "");
@ -110,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
return code;
}
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
@ -133,15 +142,14 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
}
if (!setFlag) {
mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist, update it", *pVgId);
ASSERT(0);
mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId);
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
SOrphanTask *pTask = taosArrayGet(pList, 0);
// check if it is conflict with other trans in both sourceDb and targetDb.
@ -238,7 +246,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId);
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
@ -246,6 +254,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo);
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("invalid hbMsg from vgId:%d, discarded", req.vgId);
terrno = TSDB_CODE_INVALID_MSG;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
taosThreadMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return -1;
}
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
@ -335,21 +353,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.vgId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
terrno = TSDB_CODE_SUCCESS;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return TSDB_CODE_SUCCESS;
}
@ -360,4 +368,33 @@ void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doC
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
}
bool validateHbMsg(const SArray *pNodeList, int32_t vgId) {
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeEntry *pEntry = taosArrayGet(pNodeList, i);
if (pEntry->nodeId == vgId) {
return true;
}
}
return false;
}
void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) {
tCleanupStreamHbMsg(pReq);
taosArrayDestroy(pFailedChkptList);
taosArrayDestroy(pOrphanTasks);
}
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMStreamHbRspMsg *pMsg = rsp.pCont;
pMsg->head.vgId = htonl(vgId);
pMsg->msgId = msgId;
tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp
}

View File

@ -575,10 +575,12 @@ void mndInitExecInfo() {
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
}
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
@ -817,4 +819,150 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
taosArrayDestroy(pDropped);
return TSDB_CODE_SUCCESS;
}
}
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) {
int32_t code = 0;
int32_t blen;
SRestoreCheckpointInfoRsp req = {
.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs};
tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeRestoreCheckpointInfoRsp(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pInfo->nodeId);
tEncoderClear(&encoder);
tmsgSendRsp(pMsg);
return code;
}
int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) {
for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) {
SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i);
doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId);
}
return 0;
}
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) {
return (SCheckpointConsensusInfo*)pInfo;
}
SCheckpointConsensusInfo p = {
.genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
return pChkptInfo;
}
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
// discard the msg may lead to the lost of connections.
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) {
SCheckpointConsensusEntry info = {0};
memcpy(&info.req, pRestoreInfo, sizeof(info.req));
info.rsp.code = 0;
info.rsp.info = pMsg->info;
info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
info.rsp.pCont = rpcMallocCont(info.rsp.contLen);
SMsgHead *pHead = info.rsp.pCont;
pHead->vgId = htonl(pRestoreInfo->nodeId);
taosArrayPush(pInfo->pTaskList, &info);
}
static int32_t entryComparFn(const void* p1, const void* p2) {
const SCheckpointConsensusEntry* pe1 = p1;
const SCheckpointConsensusEntry* pe2 = p2;
if (pe1->req.taskId == pe2->req.taskId) {
return 0;
}
return pe1->req.taskId < pe2->req.taskId? -1:1;
}
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) {
int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList);
if (numOfExisted < numOfTasks) {
if (pTotal != NULL) {
*pTotal = numOfExisted;
}
return false;
}
taosArraySort(pInfo->pTaskList, entryComparFn);
int32_t num = 1;
int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId;
for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i);
if (pe->req.taskId != taskId) {
num += 1;
taskId = pe->req.taskId;
}
}
if (pTotal != NULL) {
*pTotal = num;
}
ASSERT(num <= numOfTasks);
return num == numOfTasks;
}
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
if (pInfo->genTs > 0) {
ASSERT(pInfo->checkpointId > 0);
return pInfo->checkpointId;
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) {
return -1;
}
int64_t checkpointId = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i);
if (pEntry->req.checkpointId < checkpointId) {
checkpointId = pEntry->req.checkpointId;
mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name,
pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId);
}
}
pInfo->checkpointId = checkpointId;
pInfo->genTs = taosGetTimestampMs();
return checkpointId;
}
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList);
}
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
taosHashRemove(pHash, &streamId, sizeof(streamId));
int32_t numOfStreams = taosHashGetSize(pHash);
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId,
numOfStreams);
return TSDB_CODE_SUCCESS;
}

View File

@ -25,7 +25,7 @@
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
// clang-format on
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) {
@ -71,8 +71,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
startRsync();
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE,
taosGetTimestampMs(), tqStartTaskCompleteCallback);
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
@ -157,6 +156,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP:
return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg);
default:
ASSERT(0);
}

View File

@ -262,6 +262,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);

View File

@ -139,9 +139,11 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pCheckInfo);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
int32_t vgId = pTq->pStreamMeta->vgId;
streamMetaClose(pTq->pStreamMeta);
qDebug("end to close tq");
qDebug("vgId:%d end to close tq", vgId);
taosMemoryFree(pTq);
}
@ -1030,7 +1032,6 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
//
return 0;
}
@ -1274,3 +1275,7 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -121,7 +121,6 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
@ -132,6 +131,26 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
}
int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
int32_t code = streamTaskSendRestoreChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -191,7 +210,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaInitUpdateTaskList(pMeta, req.transId);
}
} else {
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
}
// duplicate update epset msg received, discard this redundant message
@ -660,8 +680,11 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
} else { // failed to get the task.
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
vgId, pReq->taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqError(
"vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
"dropped already",
vgId, pReq->taskId, numOfTasks);
}
streamMetaWUnLock(pMeta);
@ -712,9 +735,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
if (isLeader && !tsDisableStream) {
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
}
@ -730,10 +753,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t vgId = pMeta->vgId;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask);
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
streamMetaStartAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
restartStreamTasks(pMeta, isLeader);
@ -861,7 +884,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
} else if (pState->state == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
}
@ -1070,7 +1093,9 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
}
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
@ -1087,6 +1112,71 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
}
streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t now = taosGetTimestampMs();
SRestoreCheckpointInfoRsp req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
rsp.info.handle = NULL;
if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) {
// rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily
tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return TSDB_CODE_SUCCESS;
}
// discard the rsp from before restart
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
" from task createTs:%" PRId64 ", discard",
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs);
streamMetaAddFailedTaskSelf(pTask, now);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
taosThreadMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) {
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
pTask->chkInfo.checkpointId, req.checkpointId);
pTask->chkInfo.checkpointId = req.checkpointId;
tqSetRestoreVersionInfo(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
if (pMeta->role == NODE_ROLE_LEADER) {
/*code = */ tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
} else {
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -647,6 +647,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: {
if (pVnode->restored) {
tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -65,6 +65,11 @@ struct SActiveCheckpointInfo {
tmr_h pSendReadyMsgTmr;
};
struct SConsensusCheckpoint {
int8_t inProcess;
};
typedef struct {
int8_t type;
SSDataBlock* pBlock;
@ -157,6 +162,7 @@ extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();
@ -211,7 +217,13 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
SMetaHbInfo* createMetaHbInfo(int64_t* pRid);
void* destroyMetaHbInfo(SMetaHbInfo* pInfo);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();

View File

@ -1893,8 +1893,12 @@ void* taskDbAddRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
return taosAcquireRef(taskDbWrapperId, pBackend->refId);
}
void taskDbRemoveRef(void* pTaskDb) {
if (pTaskDb == NULL) return;
if (pTaskDb == NULL) {
return;
}
STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskDbWrapperId, pBackend->refId);
}

View File

@ -447,14 +447,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
// if (restored && (pStatus->state != TASK_STATUS__CK)) {
// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%"
// PRId64
// " failed",
// pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
// taosThreadMutexUnlock(&pTask->lock);
// return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
// }
if (restored && (pStatus->state != TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
@ -1108,3 +1107,51 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
s3DeleteObjects((const char**)&tmp, 1);
return 0;
}
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
ASSERT(pTask->pBackend == NULL);
SRestoreCheckpointInfo req = {
.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.nodeId = vgId,
.checkpointId = pInfo->checkpointId,
.startTs = pTask->execInfo.created,
};
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen);
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
pInfo->checkpointId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}

View File

@ -0,0 +1,342 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInt.h"
#include "tmisce.h"
#include "tref.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
int32_t streamMetaId = 0;
struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
int64_t msgSendTs;
SStreamHbMsg hbMsg;
};
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
return false;
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
return true;
}
}
return false;
}
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
SStreamMeta* pMeta = pTask->pMeta;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
taosThreadMutexUnlock(&pTask->lock);
}
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, pMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_FAILED;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED;
}
tEncoderClear(&encoder);
stDebug("vgId:%d send hb to mnode, numOfTasks:%d msgId:%d", pMeta->vgId, pMsg->numOfTasks, pMsg->msgId);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
tmsgSendReq(pEpset, &msg);
return TSDB_CODE_SUCCESS;
}
// NOTE: this task should be executed within the SStreamMeta lock region.
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
// not recv the hb msg rsp yet, send current hb msg again
if (pInfo->msgSendTs > 0) {
stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId,
pInfo->hbCount);
for(int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
}
if ((*pTask)->info.fillHistory == 1) {
continue;
}
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
break;
}
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset);
return TSDB_CODE_SUCCESS;
}
SStreamHbMsg* pMsg = &pInfo->hbMsg;
stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER),
pMeta->pHbInfo->hbCount);
pMsg->vgId = pMeta->vgId;
pMsg->msgId = pMeta->pHbInfo->hbCount;
pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
if (pMsg->pTaskStatus == NULL || pMsg->pUpdateNodes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
continue;
}
STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
if (p->activeId != 0) {
entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = p->activeId;
entry.checkpointInfo.activeTransId = p->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
}
}
if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, pMsg);
taosArrayPush(pMsg->pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasMnodeEpset = true;
}
}
pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus);
if (hasMnodeEpset) {
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(pMsg, pMeta, &epset);
} else {
stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId);
tCleanupStreamHbMsg(&pInfo->hbMsg);
pInfo->msgSendTs = -1;
}
return TSDB_CODE_SUCCESS;
}
void streamMetaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid);
return;
}
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
streamMetaRLock(pMeta);
streamMetaSendHbHelper(pMeta);
streamMetaRUnLock(pMeta);
taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
SMetaHbInfo* createMetaHbInfo(int64_t* pRid) {
SMetaHbInfo* pInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return pInfo;
}
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pInfo->tickCounter = 0;
pInfo->stopFlag = 0;
pInfo->msgSendTs = -1;
pInfo->hbCount = 0;
return pInfo;
}
void* destroyMetaHbInfo(SMetaHbInfo* pInfo) {
if (pInfo != NULL) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
if (pInfo->hbTmr != NULL) {
taosTmrStop(pInfo->hbTmr);
pInfo->hbTmr = NULL;
}
taosMemoryFree(pInfo);
}
return NULL;
}
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}
}
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount) {
*pStartTs = 0;
*pSendCount = 0;
if (pInfo == NULL) {
return;
}
*pStartTs = pInfo->hbStart;
*pSendCount = pInfo->hbCount;
}
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
streamMetaRLock(pMeta);
// current waiting rsp recved
if (pRsp->msgId == pInfo->hbCount) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
stDebug("vgId:%d hbMsg msgId:%d sendTs:%" PRId64 " recved confirmed", pMeta->vgId, pRsp->msgId, pInfo->msgSendTs);
pInfo->hbCount += 1;
pInfo->msgSendTs = -1;
} else {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
}
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -27,10 +27,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
int32_t taskDbWrapperId = 0;
static void metaHbToMnode(void* param, void* tmrId);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
@ -39,14 +37,6 @@ typedef struct {
SHashObj* pTable;
} SMetaRefMgt;
struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
};
typedef struct STaskInitTs {
int64_t start;
int64_t end;
@ -352,12 +342,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
goto _err;
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pMeta->pHbInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) {
@ -400,9 +384,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->pHbInfo = createMetaHbInfo(pRid);
if (pMeta->pHbInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
@ -486,7 +473,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
void streamMetaClose(SStreamMeta* pMeta) {
stDebug("start to close stream meta");
stDebug("vgId:%d start to close stream meta", pMeta->vgId);
if (pMeta == NULL) {
return;
}
@ -502,11 +489,13 @@ void streamMetaClose(SStreamMeta* pMeta) {
void streamMetaCloseImpl(void* arg) {
SStreamMeta* pMeta = arg;
stDebug("start to do-close stream meta");
if (pMeta == NULL) {
return;
}
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d start to do-close stream meta", vgId);
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
@ -526,7 +515,8 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta->pHbInfo);
pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
@ -539,7 +529,7 @@ void streamMetaCloseImpl(void* arg) {
taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
stDebug("vgId:%d end to close stream meta", vgId);
}
// todo let's check the status for each task
@ -901,13 +891,16 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
// do duplicate task check.
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) {
stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask);
continue;
}
@ -962,219 +955,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
(void)streamMetaCommit(pMeta);
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
return false;
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
return true;
}
}
return false;
}
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
SStreamMeta* pMeta = pTask->pMeta;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
taosThreadMutexUnlock(&pTask->lock);
}
static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
SStreamHbMsg hbMsg = {0};
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
continue;
}
STaskStatusEntry entry = {
.id = id,
.status = streamTaskGetStatus(*pTask)->state,
.nodeId = hbMsg.vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
.startTime = (*pTask)->execInfo.readyTs,
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.hTaskId = (*pTask)->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize),
.outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize),
.procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput),
.outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput),
.startCheckpointId = (*pTask)->execInfo.startCheckpointId,
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
};
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
if (p->activeId != 0) {
entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = p->activeId;
entry.checkpointInfo.activeTransId = p->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
}
}
if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasMnodeEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
if (hasMnodeEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
goto _end;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
_end:
tCleanupStreamHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid);
return;
}
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
streamMetaRLock(pMeta);
metaHeartbeatToMnodeImpl(pMeta);
streamMetaRUnLock(pMeta);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
streamMetaRLock(pMeta);
@ -1200,14 +980,19 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int64_t startTs = 0;
int32_t sendCount = 0;
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
streamMetaWaitForHbTmrQuit(pMeta);
streamMetaWLock(pMeta);
pMeta->closeFlag = true;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
@ -1222,15 +1007,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
streamMetaWUnLock(pMeta);
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}
stDebug("vgId:%d start to check all tasks for closing", vgId);
int64_t st = taosGetTimestampMs();
@ -1247,10 +1023,10 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid);
*pRid = pMeta->rid;
metaHbToMnode(pRid, NULL);
streamMetaHbToMnode(pRid, NULL);
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
@ -1258,6 +1034,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear all start-all-task info", vgId);
}
void streamMetaRLock(SStreamMeta* pMeta) {
@ -1336,7 +1113,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
streamMetaReleaseTask(pMeta, pTask);
}
metaHeartbeatToMnodeImpl(pMeta);
streamMetaSendHbHelper(pMeta);
pMeta->sendMsgBeforeClosing = false;
return pTaskList;
}
@ -1386,16 +1163,17 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
return TSDB_CODE_SUCCESS;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) {
// restore the checkpoint id by negotiating the latest consensus checkpoint id
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now);
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks to be started", pMeta->vgId);
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
@ -1406,11 +1184,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
return TSDB_CODE_SUCCESS;
}
// broadcast the check downstream tasks msg
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
numOfTasks = taosArrayGetSize(pTaskList);
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization , when the operation of check downstream tasks status is executed far quickly.
// initialization, when the operation of check downstream tasks status is executed far quickly.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
@ -1420,19 +1198,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
continue;
}
if (pTask->pBackend == NULL) { // TODO: add test cases for this
code = expandFn(pTask);
if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
}
} else {
stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId);
}
streamMetaReleaseTask(pMeta, pTask);
}
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
@ -1465,16 +1242,28 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
continue;
}
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// negotiate the consensus checkpoint id for current task
ASSERT(pTask->pBackend == NULL);
code = streamTaskSendRestoreChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
}
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
@ -1535,7 +1324,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
return true;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
@ -1556,7 +1345,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
ASSERT(pTask->status.downstreamReady == 0);
if (pTask->pBackend == NULL) {
code = expandFn(pTask);
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
@ -1638,7 +1427,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
streamMetaResetStartInfo(pStartInfo, pMeta->vgId);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streammsg.h"
#include "streamMsg.h"
#include "os.h"
#include "tstream.h"
@ -363,6 +363,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -422,6 +423,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
@ -432,12 +434,16 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
}
if (pMsg->pUpdateNodes != NULL) {
taosArrayDestroy(pMsg->pUpdateNodes);
pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes);
}
if (pMsg->pTaskStatus != NULL) {
taosArrayDestroy(pMsg->pTaskStatus);
pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus);
}
pMsg->msgId = -1;
pMsg->vgId = -1;
pMsg->numOfTasks = -1;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
@ -621,4 +627,46 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1;
if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1;
if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1;
tEndDecode(pCoder);
return 0;
}

View File

@ -273,6 +273,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
}
if (pTask->pNameMap) {
@ -830,6 +831,34 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->hTaskId = pSrc->hTaskId;
}
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
STaskStatusEntry entry = {
.id = streamTaskGetTaskId(pTask),
.status = streamTaskGetStatus(pTask)->state,
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
.startTime = pExecInfo->readyTs,
.checkpointInfo.latestId = pTask->chkInfo.checkpointId,
.checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
.checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.hTaskId = pTask->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
.procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
.outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
.startCheckpointId = pExecInfo->startCheckpointId,
.startCheckpointVer = pExecInfo->startCheckpointVer,
};
return entry;
}
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;