enh(stream): start task async.

This commit is contained in:
Haojun Liao 2024-01-05 17:55:39 +08:00
parent 1912de9b49
commit 0c4b91dc72
6 changed files with 91 additions and 19 deletions

View File

@ -18,6 +18,7 @@
// message process
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId);
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored);
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
@ -27,7 +28,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);

View File

@ -52,9 +52,10 @@ extern "C" {
#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1)
#define STREAM_EXEC_T_START_ALL_TASKS (-2)
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-3)
#define STREAM_EXEC_T_STOP_ALL_TASKS (-4)
#define STREAM_EXEC_T_RESUME_TASK (-5)
#define STREAM_EXEC_T_START_ONE_TASK (-3)
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
#define STREAM_EXEC_T_RESUME_TASK (-6)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@ -880,6 +881,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool i
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -193,7 +193,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DEPLOY: {
void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqStreamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true);
return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb, -1, pReq, len, true, true);
}
case TDMT_STREAM_TASK_DROP:

View File

@ -887,7 +887,8 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
}
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {

View File

@ -50,6 +50,33 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
return 0;
}
int32_t tqStreamOneTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start task:0x%x, code:%s", vgId, taskId, terrstr());
return -1;
}
tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = STREAM_EXEC_T_START_ONE_TASK;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
return 0;
}
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -507,8 +534,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
return code;
}
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader,
bool restored) {
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
@ -560,18 +587,19 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
// only handled in the leader node
if (isLeader) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
if (restored) {
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && (p->info.fillHistory == 0)) {
tqStreamOneTaskStartAsync(pMeta, cb, streamId, taskId);
}
if (p != NULL) {
streamMetaReleaseTask(pMeta, p);
}
} else {
tqWarn("s-task:0x%x not launched since vnode(vgId:%d) not ready", taskId, vgId);
}
if (p != NULL) {
streamMetaReleaseTask(pMeta, p);
}
} else {
tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
}
@ -696,7 +724,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t type = pReq->reqType;
int32_t vgId = pMeta->vgId;
if (type == STREAM_EXEC_T_START_ALL_TASKS) {
if (type == STREAM_EXEC_T_START_ONE_TASK) {
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaStartAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
@ -755,7 +786,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
} else {
streamMetaWUnLock(pMeta);
tqDebug("vgId:%d start all tasks completed", pMeta->vgId);
tqDebug("vgId:%d start all tasks completed in callbackFn", pMeta->vgId);
}
return TSDB_CODE_SUCCESS;

View File

@ -1506,4 +1506,40 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
stInfo("vgId:%d start tasks completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// todo: may be we should find the related fill-history task and set it failed.
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 0);
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, pInfo->init, pInfo->start, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
streamMetaUpdateTaskDownstreamStatus(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
}
}
return ret;
}