enh(stream): stop stream task before redistribute vnodes.

This commit is contained in:
Haojun Liao 2023-08-11 23:46:53 +08:00
parent 87c7e4c0fe
commit f24b22000f
9 changed files with 125 additions and 2 deletions

View File

@ -265,6 +265,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)

View File

@ -202,6 +202,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -76,6 +76,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -745,6 +745,7 @@ SArray *vmGetMsgHandles() {
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -33,6 +33,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans);
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);

View File

@ -91,6 +91,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
@ -1819,6 +1820,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_
pAction->msgType = msgType;
}
// todo extract method: traverse stream tasks
// build trans to update the epset
static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update");
@ -2088,6 +2090,76 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
int32_t mndBuildUpdateTaskStatusTrans(SStreamObj* pStream, STrans* pTrans) {
pStream->status = STREAM_STATUS__STOP;
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_STOP, &pTask->info.epSet);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__STOP) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
}
}
}
return 0;
}
int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
const char *p = strdup(pVgroup->dbName);
mndReleaseVgroup(pMnode, pVgroup);
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
if (strcmp(pStream->targetDb, p) == 0 || strcmp(pStream->sourceDb, p) == 0) {
int32_t code = mndBuildUpdateTaskStatusTrans(pStream, pTrans);
// mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
}
mndReleaseStream(pMnode, pStream);
}
return 0;
}
// todo: this process should be executed by the write queue worker of the mnode
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;

View File

@ -1680,6 +1680,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
pNew1->memUsed += vgMem;
}
// mndStopInvolvedStreamTasks(pMnode, pVgroup->vgId, pTrans);
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
}

View File

@ -1031,7 +1031,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
" child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
@ -1889,3 +1889,47 @@ _end:
tmsgSendRsp(&rsp);
return code;
}
int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId,
pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr);
streamTaskStop(pTask);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskStop(pHistoryTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
// tDecoderClear(&decoder);
tmsgSendRsp(&rsp);
return 0;
}

View File

@ -20,7 +20,6 @@
#define MIN_STREAM_EXEC_BATCH_NUM 4
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo(SStreamTask* pTask);
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
bool streamTaskShouldStop(const SStreamStatus* pStatus) {