enh(stream): update the stream task.

This commit is contained in:
Haojun Liao 2023-08-01 18:14:58 +08:00
parent 164bfd5408
commit ad4c5916e7
3 changed files with 55 additions and 10 deletions

View File

@ -532,6 +532,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct {
int32_t vgId;
SEpSet epset;
int32_t numOfTasks;
} SStreamHbMsg;

View File

@ -27,6 +27,7 @@
#include "mndVgroup.h"
#include "parser.h"
#include "tname.h"
#include "tmisce.h"
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64
@ -1783,12 +1784,13 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
return -1;
}
mDebug("start to build stream:0x%"PRIx64" task DAG update", pStream->uid);
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
ASSERT(0);
// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint");
// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint");
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to build stream:0x%"PRIx64" task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
@ -1818,7 +1820,6 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
@ -1843,6 +1844,35 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
return TSDB_CODE_SUCCESS;
}
static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) {
int32_t numOfLevels = 0;
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
pTask->info.epSet = *pEpSet;
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
}
}
return 0;
}
// todo: handle the database drop/stream drop case
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -1862,7 +1892,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
int64_t now = taosGetTimestampSec();
// only handle the vnode transfer case.
// timeout list
bool nodeChanged = false;
SArray* pList = taosArrayInit(4, sizeof(int32_t));
// record the timeout node
@ -1876,13 +1907,25 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue;
}
pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes.
// 1. if the epset is changed
taosArrayPush(pList, &pEntry);
// 1. node the epset is changed, which means the node transfer has occurred for this node.
if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
nodeChanged = true;
break;
}
}
int32_t nodeId = 0;
SEpSet newEpSet = {0};
// todo handle the node timeout case.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
SEpSet newEpSet = req.epset;
{ // check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
@ -1936,7 +1979,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// }
// }
// mndTransDrop(pTrans);
return code;
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
return TSDB_CODE_SUCCESS;

View File

@ -553,6 +553,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pReq->epset) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -561,6 +562,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->epset) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}