enh(stream): handle the stream hb.

This commit is contained in:
Haojun Liao 2023-07-31 14:07:18 +08:00
parent cc5ff44604
commit e61aa83594
4 changed files with 114 additions and 96 deletions

View File

@ -630,6 +630,10 @@ void streamTaskHalt(SStreamTask* pTask);
void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
@ -651,7 +655,7 @@ void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
// save to b-tree meta store
// save to stream meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);

View File

@ -25,12 +25,8 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
static int32_t updateTaskUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
static void updateFixDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@ -143,7 +139,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
}
} else {
SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
setFixedDownstreamInfo(pTask, pOneSinkTask);
streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
}
return 0;
@ -274,51 +270,12 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamInfo(pSinkTask, pTask);
streamTaskSetUpstreamInfo(pSinkTask, pTask);
}
return TSDB_CODE_SUCCESS;
}
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
return pEpInfo;
}
void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
pDispatcher->taskId = pDownstreamTask->id.taskId;
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
pDispatcher->epSet = pDownstreamTask->info.epSet;
pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->pUpstreamInfoList == NULL) {
pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
static SArray* addNewTaskList(SArray* pTasksList) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pTasksList, &pTaskList);
@ -423,12 +380,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
pWindow->skey, pWindow->ekey);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamInfo(pTask, pDownstreamTask);
streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask);
if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) {
return -1;
}
return setTaskUpstreamInfo(pDownstreamTask, pTask);
return streamTaskSetUpstreamInfo(pDownstreamTask, pTask);
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
@ -600,7 +557,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamInfo(pSinkTask, pUpstreamTask);
streamTaskSetUpstreamInfo(pSinkTask, pUpstreamTask);
}
}

View File

@ -34,14 +34,14 @@
#define MND_STREAM_HB_INTERVAL 100 // 100 sec
typedef struct SNodeEntry {
int32_t vgId;
int32_t nodeId;
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
SHashObj* pVnodeMap;
SArray* pVnodeEntryList;
SArray* pNodeEntryList;
} SStreamVnodeRevertIndex;
static SStreamVnodeRevertIndex execNodeList;
@ -1017,10 +1017,11 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
pTask = taosArrayGetP(pLevel, j);
if (pTask->info.fillHistory == 1) {
continue;
}
@ -1083,6 +1084,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
return 0;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -1756,13 +1758,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray* pList = taosArrayInit(4, sizeof(int32_t));
// record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pVnodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pVnodeEntryList, i);
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
// taosArrayPush(pList, &pEntry);
}
if (pEntry->vgId != req.vgId) {
if (pEntry->nodeId != req.vgId) {
continue;
}
@ -1774,9 +1776,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
int32_t nodeId = 0;
SEpSet newEpSet = {0};
{//check all streams that involved this vnode
{ // check all streams that involved this vnode
SStreamObj *pStream = NULL;
void* pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -1784,59 +1786,38 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
// update the related upstream and downstream tasks
taosRLockLatch(&pStream->lock);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for(int32_t j = 0; j < numOfLevels; ++j) {
SArray* pLevel = taosArrayGetP(pStream->tasks, j);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for(int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask* pTask = taosArrayGetP(pLevel, k);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
// pTask->info.epSet = 0; set the new epset
//pTask->info.epSet = 0; set the new epset
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
// only update the upstream info of the direct downstream tasks
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
// todo extract method
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
pVgInfo->epSet = newEpSet;
}
}
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
if (pDispatcher->nodeId == nodeId) {
pDispatcher->epSet = newEpSet;
}
} else {
// do nothing
}
} else if (level == TASK_LEVEL__AGG) {
// update the upstream info
SArray* pupstream = pTask->pUpstreamInfoList;
// for(int32_t i = 0; i < )
} else {
// update the upstream tasks
}
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet);
}
}
}
taosRLockLatch(&pStream->lock);
taosWUnLockLatch(&pStream->lock);
}
}
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
return TSDB_CODE_SUCCESS;
}

View File

@ -321,3 +321,79 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
}
}
}
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
return pEpInfo;
}
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->pUpstreamInfoList == NULL) {
pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
if (pInfo->nodeId == nodeId) {
pInfo->epSet = *pEpSet;
break;
}
}
}
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
pDispatcher->taskId = pDownstreamTask->id.taskId;
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
pDispatcher->epSet = pDownstreamTask->info.epSet;
pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
pVgInfo->epSet = *pEpSet;
qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId);
break;
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher;
if (pDispatcher->nodeId == nodeId) {
pDispatcher->epSet = *pEpSet;
qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId);
}
} else {
// do nothing
}
}