enh(stream): create task update info

This commit is contained in:
Haojun Liao 2023-08-01 14:04:07 +08:00
parent e61aa83594
commit 75b1520be0
3 changed files with 59 additions and 2 deletions

View File

@ -549,6 +549,14 @@ typedef struct {
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct {
int32_t nodeId;
SEpSet epset;
} SStreamTaskUpdateInfo;
int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg);
int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t downstreamTaskId; int32_t downstreamTaskId;

View File

@ -1776,7 +1776,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
int32_t nodeId = 0; int32_t nodeId = 0;
SEpSet newEpSet = {0}; SEpSet newEpSet = {0};
{ // check all streams that involved this vnode { // check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
@ -1796,7 +1796,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t k = 0; k < numOfTasks; ++k) { for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k); SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) { if (pTask->info.nodeId == nodeId) {
//pTask->info.epSet = 0; set the new epset pTask->info.epSet = newEpSet;
continue; continue;
} }
@ -1813,6 +1813,39 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} }
{ // build trans to update the epset
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
// mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
mndTransSetDbName(pTrans, "checkpoint", "checkpoint");
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndTransDrop(pTrans);
return -1;
}
void* pBuf = NULL;
int32_t len = 0;
// doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, newEpSet);
STransAction action = {0};
action.epSet = /*mndGetVgroupEpset(pMnode, pVgObj)*/;
action.pCont = pBuf;
action.contLen = len;
action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
} }

View File

@ -958,3 +958,19 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
num); num);
return 0; return 0;
} }
int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}