enh(stream): handle the stream hb.

This commit is contained in:
Haojun Liao 2023-07-30 19:25:00 +08:00
parent 8dfef8768a
commit cc5ff44604
3 changed files with 147 additions and 112 deletions

View File

@ -25,10 +25,12 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
static int32_t updateTaskUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
static void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
static void updateFixDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@ -141,7 +143,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
}
} else {
SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
setFixedDownstreamEpInfo(pTask, pOneSinkTask);
setFixedDownstreamInfo(pTask, pOneSinkTask);
}
return 0;
@ -272,7 +274,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pTask, pSinkTask);
setTaskUpstreamInfo(pSinkTask, pTask);
}
return TSDB_CODE_SUCCESS;
@ -293,27 +295,27 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
return pEpInfo;
}
void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
pDispatcher->taskId = pTask->id.taskId;
pDispatcher->nodeId = pTask->info.nodeId;
pDispatcher->epSet = pTask->info.epSet;
void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
pDispatcher->taskId = pDownstreamTask->id.taskId;
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
pDispatcher->epSet = pDownstreamTask->info.epSet;
pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pDownstream->pUpstreamInfoList == NULL) {
pDownstream->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
if (pTask->pUpstreamInfoList == NULL) {
pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pDownstream->pUpstreamInfoList, &pEpInfo);
taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
@ -421,12 +423,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
pWindow->skey, pWindow->ekey);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask);
setFixedDownstreamInfo(pTask, pDownstreamTask);
if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) {
return -1;
}
return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
return setTaskUpstreamInfo(pDownstreamTask, pTask);
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
@ -598,7 +600,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
setTaskUpstreamInfo(pSinkTask, pUpstreamTask);
}
}

View File

@ -30,8 +30,21 @@
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_HB_INTERVAL 100 // 100 sec
#define MND_STREAM_MAX_NUM 60
typedef struct SNodeEntry {
int32_t vgId;
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
SHashObj* pVnodeMap;
SArray* pVnodeEntryList;
} SStreamVnodeRevertIndex;
static SStreamVnodeRevertIndex execNodeList;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
@ -49,6 +62,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -85,11 +100,6 @@ int32_t mndInitStream(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask,
SMStreamDoCheckpointMsg *pMsg);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
void mndCleanupStream(SMnode *pMnode) {}
@ -838,48 +848,6 @@ _OVER:
return code;
}
// static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) {
// void *buf = NULL;
// int32_t tlen = 0;
// int32_t checkpointId = tGenIdPI64();
// SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
// SArray *stream = taosArrayInit(64, sizeof(void *));
// SListIter iter = {0};
// tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD);
// SListNode *pNode = NULL;
// while ((pNode = tdListNext(&iter)) != NULL) {
// char streamName[TSDB_STREAM_FNAME_LEN] = {0};
// tdListNodeGetData(pStreamList, pNode, streamName);
// SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
// taosArrayPush(stream, &pStream);
// }
// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) {
// mndReleaseVgroup(pMnode, pVgObj);
// for (int i = 0; i < taosArrayGetSize(stream); i++) {
// SStreamObj *p = taosArrayGetP(stream, i);
// mndReleaseStream(pMnode, p);
// }
// taosArrayDestroy(stream);
// return -1;
// STransAction action = {0};
// action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
// action.pCont = buf;
// action.contLen = tlen;
// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
// }
// mndReleaseVgroup(pMnode, pVgObj);
// for (int i = 0; i < taosArrayGetSize(stream); i++) {
// SStreamObj *p = taosArrayGetP(stream, i);
// mndReleaseStream(pMnode, p);
// }
// taosArrayDestroy(stream);
// return 0;
// }
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -897,48 +865,6 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask,
SMStreamDoCheckpointMsg *pMsg) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->info.nodeId;
req.expireTime = -1;
req.streamId = pTask->id.streamId;
req.taskId = pTask->id.taskId;
int32_t code;
int32_t blen;
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamCheckpointSourceReq(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->info.nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) {
SStreamCheckpointSourceReq req = {0};
@ -1807,3 +1733,110 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// todo: handle the database drop/stream drop case
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStreamHbMsg req = {0};
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int64_t now = taosGetTimestampSec();
// only handle the vnode transfer case.
SArray* pList = taosArrayInit(4, sizeof(int32_t));
// record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pVnodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pVnodeEntryList, i);
if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
// taosArrayPush(pList, &pEntry);
}
if (pEntry->vgId != req.vgId) {
continue;
}
// check epset to identify whether the node has been transferred to other dnodes.
// 1. if the epset is changed
taosArrayPush(pList, &pEntry);
}
int32_t nodeId = 0;
SEpSet newEpSet = {0};
{//check all streams that involved this vnode
SStreamObj *pStream = NULL;
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks
taosRLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for(int32_t j = 0; j < numOfLevels; ++j) {
SArray* pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for(int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask* pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
// pTask->info.epSet = 0; set the new epset
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
// only update the upstream info of the direct downstream tasks
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// todo extract method
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
pVgInfo->epSet = newEpSet;
}
}
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
if (pDispatcher->nodeId == nodeId) {
pDispatcher->epSet = newEpSet;
}
} else {
// do nothing
}
} else if (level == TASK_LEVEL__AGG) {
// update the upstream info
SArray* pupstream = pTask->pUpstreamInfoList;
// for(int32_t i = 0; i < )
} else {
// update the upstream tasks
}
}
}
}
taosRLockLatch(&pStream->lock);
}
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
return TSDB_CODE_SUCCESS;
}

View File

@ -557,10 +557,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
return pEncoder->pos;
}
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp) {
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->numOfTasks) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}