Merge pull request #23523 from taosdata/fix/liaohj
fix(stream): fix deadlock in transfer state.
This commit is contained in:
commit
70388bafd2
|
@ -804,6 +804,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||
#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102)
|
||||
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
|
||||
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
|
||||
|
||||
// TDLite
|
||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||
|
|
|
@ -1164,10 +1164,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
ASSERT(pTask->status.pauseAllowed == true);
|
||||
}
|
||||
|
||||
streamScanHistoryData(pTask);
|
||||
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
|
@ -1821,7 +1817,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (pMeta->updateInfo.transId != req.transId) {
|
||||
pMeta->updateInfo.transId = req.transId;
|
||||
tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||
taosHashClear(pMeta->updateInfo.pTasks);
|
||||
} else {
|
||||
|
@ -1903,13 +1899,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pMeta->startInfo.startAllTasksFlag = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||
terrno = 0;
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
while (streamMetaTaskInTimer(pMeta)) {
|
||||
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
|
@ -1931,11 +1927,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||
tqResetStreamTaskStatus(pTq);
|
||||
tqLaunchStreamTaskAsync(pTq);
|
||||
} else {
|
||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -285,13 +285,14 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
|
||||
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
stError(
|
||||
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
||||
"fill-history task",
|
||||
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
|
||||
id, (int32_t) pTask->streamTaskId.taskId);
|
||||
|
||||
// 1. free it and remove fill-history task from disk meta-store
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
|
@ -304,7 +305,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
streamMetaWUnLock(pMeta);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
} else {
|
||||
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id,
|
||||
pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
|
@ -318,7 +319,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
} else {
|
||||
ASSERT(status == TASK_STATUS__READY|| status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
|
||||
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, id);
|
||||
}
|
||||
|
||||
// wait for the stream task to handle all in the inputQ, and to be idle
|
||||
|
@ -328,7 +329,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
|
||||
// start the task state transfer procedure.
|
||||
char* p = NULL;
|
||||
streamTaskGetStatus(pStreamTask, &p);
|
||||
status = streamTaskGetStatus(pStreamTask, &p);
|
||||
if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) {
|
||||
stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// update the scan data range for source task.
|
||||
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||
|
@ -347,22 +354,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
|
||||
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
|
||||
// pause, since the pause allowed attribute is not set yet.
|
||||
streamTaskResume(pStreamTask); // todo refactor: use streamTaskResume.
|
||||
// 3. resume the state of stream task, after this function, the stream task will run immediately.
|
||||
streamTaskResume(pStreamTask);
|
||||
|
||||
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
|
||||
|
||||
// 4. free it and remove fill-history task from disk meta-store
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
|
||||
// 5. save to disk
|
||||
streamMetaWLock(pMeta);
|
||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// 7. pause allowed.
|
||||
streamTaskEnablePause(pStreamTask);
|
||||
// 6. pause allowed.
|
||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
|
||||
|
|
|
@ -260,13 +260,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
|||
}
|
||||
}
|
||||
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
||||
while (pMeta->streamBackend == NULL) {
|
||||
taosMsleep(100);
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
||||
if (pMeta->streamBackend == NULL) {
|
||||
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
|
||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||
}
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
|
|
|
@ -665,7 +665,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
|||
// stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state transfer")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS,"Invalid task status to proceed")
|
||||
|
||||
// TDLite
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||
|
|
Loading…
Reference in New Issue