fix(stream): remove the related history task correctly.

This commit is contained in:
Haojun Liao 2024-06-11 18:35:59 +08:00
parent b22679c941
commit a41a04dc37
5 changed files with 26 additions and 11 deletions

View File

@ -3473,8 +3473,6 @@ typedef struct SVUpdateCheckpointInfoReq {
int64_t checkpointTs;
int32_t transId;
int8_t dropRelHTask;
int64_t hStreamId;
int64_t hTaskId;
} SVUpdateCheckpointInfoReq;
typedef struct {

View File

@ -734,6 +734,9 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs);
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta);
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);

View File

@ -185,7 +185,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
SStreamTask* pTask = *ppTask;
const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) {
if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) {
if (req.transId < pMeta->updateInfo.transId) {
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
pMeta->updateInfo.transId, req.transId);
@ -197,13 +197,12 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
} else {
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
streamMetaInitUpdateTaskList(pMeta, req.transId);
}
} else {
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
streamMetaInitUpdateTaskList(pMeta, req.transId);
}
// duplicate update epset msg received, discard this redundant message
@ -280,6 +279,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
// persist to disk
}
streamMetaClearUpdateTaskList(pMeta);
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;

View File

@ -418,6 +418,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
STaskId hTaskId = {0};
taosThreadMutexLock(&pTask->lock);
@ -433,12 +434,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
}
streamMetaWLock(pMeta);
}
@ -476,8 +476,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
}
if (pReq->dropRelHTask) {
hTaskId = pTask->hTaskInfo.id;
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, pReq->hTaskId);
pReq->taskId, vgId, hTaskId.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
}
@ -498,9 +499,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)hTaskId.taskId, numOfTasks);
}
streamMetaWLock(pMeta);

View File

@ -372,6 +372,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
pMeta->updateInfo.transId = -1;
pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -1740,4 +1741,14 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
id, vgId, transId, el);
}
}
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) {
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = -1;
}
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = transId;
}