fix(stream): remove related fill-history if task in stop status.
This commit is contained in:
parent
0a814e97b5
commit
80fd3e0445
|
@ -782,7 +782,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
||||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
|
||||||
|
|
||||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||||
|
|
|
@ -142,8 +142,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
|
||||||
if (ppHTask == NULL || *ppHTask == NULL) {
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
tqError(
|
||||||
vgId, req.taskId);
|
"vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
|
||||||
|
"stream task:0x%x",
|
||||||
|
vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
|
||||||
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
|
@ -612,23 +614,35 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
STaskId hTaskId = {0};
|
||||||
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaWLock(pMeta);
|
||||||
if (pTask != NULL) {
|
|
||||||
// drop the related fill-history task firstly
|
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
||||||
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
||||||
|
streamMetaAcquireOneTask(*ppTask);
|
||||||
|
SStreamTask* pTask = *ppTask;
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
hTaskId.streamId = pTask->hTaskInfo.id.streamId;
|
||||||
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
|
||||||
tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId,
|
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
|
||||||
(int32_t)pHTaskId->taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// drop the related fill-history task firstly
|
||||||
|
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
|
||||||
|
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
|
||||||
|
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
|
||||||
|
}
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
|
@ -766,21 +766,13 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
||||||
if (pTask == NULL) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||||
if (ppStreamTask != NULL) {
|
if (ppStreamTask != NULL) {
|
||||||
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
||||||
|
@ -798,10 +790,6 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
|
||||||
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaLock) {
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue