feature(stream): handle task update in tq module.

This commit is contained in:
Haojun Liao 2023-08-02 10:33:41 +08:00
parent e1e5e9bb0f
commit 13543f7daa
8 changed files with 70 additions and 77 deletions

View File

@ -2965,20 +2965,6 @@ typedef struct {
int8_t reserved;
} SVPauseStreamTaskRsp;
typedef struct {
SMsgHead head;
int32_t taskId;
int32_t nodeId;
SEpSet epset;
} SVStreamTaskUpdateReq;
typedef struct {
int8_t reserved;
} SVStreamTaskUpdateRsp;
int32_t tSerializeVTaskUpdateReq(void* buf, int32_t bufLen, const SVStreamTaskUpdateReq* pReq);
int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;

View File

@ -551,12 +551,14 @@ int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteH
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
SEpSet epset;
} SStreamTaskUpdateInfo;
} SStreamTaskUpdateMsg;
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg);
int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg);
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg);
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg);
typedef struct {
int64_t streamId;

View File

@ -7919,24 +7919,6 @@ int32_t tDeserializeSMResumeStreamReq(void *buf, int32_t bufLen, SMResumeStreamR
return 0;
}
int32_t tSerializeVTaskUpdateReq(void *buf, int32_t bufLen, const SVStreamTaskUpdateReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
// if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->igUntreated) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeVTaskUpdateReq(void* buf, int32_t bufLen, SVStreamTaskUpdateReq* pReq) {
return 0;
}
int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);

View File

@ -1728,7 +1728,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
static int32_t doBuildStreamTaskUpdateMsg(void** pBuf, int32_t* pLen, int32_t nodeId, const SEpSet* pEpset) {
SStreamTaskUpdateInfo req = {0};
SStreamTaskUpdateMsg req = {0};
req.nodeId = nodeId;
req.epset = *pEpset;

View File

@ -223,6 +223,7 @@ int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);

View File

@ -1739,7 +1739,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
}
tDecoderClear(&decoder);
// todo handle this bug: task not in ready state.
// todo handle the case when the task not in ready state, and the checkpoint msg is arrived.
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
@ -1811,43 +1811,59 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
int32_t tqProcessTaskUpdateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
// SStreamTaskUpdateInfo* pReq = (SVPauseStreamTaskReq*)msg;
//
// SStreamMeta* pMeta = pTq->pStreamMeta;
// SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
// if (pTask == NULL) {
// tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
// pReq->taskId);
//
// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
// return TSDB_CODE_SUCCESS;
// }
//
// tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
// streamTaskPause(pTask);
//
// SStreamTask* pHistoryTask = NULL;
// if (pTask->historyTaskId.taskId != 0) {
// pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
// if (pHistoryTask == NULL) {
// tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
// pMeta->vgId, pTask->historyTaskId.taskId);
//
// streamMetaReleaseTask(pMeta, pTask);
//
// // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
// return TSDB_CODE_SUCCESS;
// }
//
// tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
// streamTaskPause(pHistoryTask);
// }
//
// streamMetaReleaseTask(pMeta, pTask);
// if (pHistoryTask != NULL) {
// streamMetaReleaseTask(pMeta, pHistoryTask);
// }
//
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code));
return code;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
// streamTaskUpdateEpInfo(pTask);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError(
"vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped "
"already",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
// streamTaskUpdateEpInfo(pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
if (pHistoryTask != NULL) {
streamMetaReleaseTask(pMeta, pHistoryTask);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -665,6 +665,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_UPDATE:
return tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
default:

View File

@ -959,16 +959,20 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
return 0;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) {
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg) {
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateMsg* pMsg) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1;
tEndDecode(pDecoder);