fix(tsdb): fix deadlock when stopping reader.
This commit is contained in:
parent
c581432054
commit
241f8b3d2e
|
@ -824,14 +824,17 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
||||||
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
||||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||||
|
|
||||||
taosWLockLatch(&pTaskInfo->lock);
|
while(1) {
|
||||||
while (qTaskIsExecuting(pTaskInfo)) {
|
taosWLockLatch(&pTaskInfo->lock);
|
||||||
taosMsleep(10);
|
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
|
||||||
|
taosWUnLockLatch(&pTaskInfo->lock);
|
||||||
|
taosMsleep(100);
|
||||||
|
} else { // not running now
|
||||||
|
pTaskInfo->code = rspCode;
|
||||||
|
taosWUnLockLatch(&pTaskInfo->lock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pTaskInfo->code = rspCode;
|
|
||||||
taosWUnLockLatch(&pTaskInfo->lock);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
|
||||||
|
|
|
@ -1631,6 +1631,8 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||||
streamId, taskId, pMeta->vgId);
|
streamId, taskId, pMeta->vgId);
|
||||||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
|
Loading…
Reference in New Issue