fix(stream): not handle the check msg for follower tasks.

This commit is contained in:
Haojun Liao 2023-09-19 09:43:14 +08:00
parent e6fb9ee46c
commit b95ad74c7f
5 changed files with 30 additions and 14 deletions

View File

@ -661,6 +661,7 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetStatusUnint(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta);

View File

@ -877,6 +877,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTaskCheckReq req;
SDecoder decoder;
@ -897,10 +898,17 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
// only the leader node handle the check request
if (!pMeta->leader) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
return -1;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
@ -912,7 +920,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -95,7 +95,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
}
pTask->taskExecInfo.init = taosGetTimestampMs();
tqDebug("s-task:%s set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
tqDebug("s-task:%s start check downstream tasks, set the init ts:%"PRId64, pTask->id.idStr, pTask->taskExecInfo.init);
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
@ -111,12 +111,9 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
// taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}
@ -124,7 +121,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
// taosWUnLockLatch(&pMeta->lock);
return -1;
}
@ -135,8 +131,6 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
// taosWUnLockLatch(&pMeta->lock);
return 0;
}

View File

@ -750,7 +750,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
qDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);

View File

@ -205,21 +205,22 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
const char* id = pTask->id.idStr;
if (stage == -1) {
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", id,
upstreamTaskId, stage);
return 0;
}
if (pInfo->stage == -1) {
pInfo->stage = stage;
qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr,
qDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id,
upstreamTaskId, stage);
}
if (pInfo->stage < stage) {
qError("s-task:%s receive msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
pTask->id.idStr, upstreamTaskId, vgId, stage, pInfo->stage);
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0;
@ -355,6 +356,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
}
int32_t streamSetStatusUnint(SStreamTask* pTask) {
int32_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
qError("s-task:%s cannot be set uninit, since in dropping state", pTask->id.idStr);
return -1;
} else {
qDebug("s-task:%s set task status to be uninit, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__UNINIT);
return 0;
}
}
// source
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);