fix(stream): fix dead lock when shutdown all tasks.

This commit is contained in:
Haojun Liao 2023-10-24 19:11:24 +08:00
parent 6ffb5ee476
commit 8cde39eebd
4 changed files with 19 additions and 12 deletions

View File

@ -181,11 +181,6 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamRefDataBlock;
typedef struct {
int8_t type;
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode {
@ -804,6 +799,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -1924,8 +1924,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
taosWUnLockLatch(&pMeta->lock);
} else {
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0;
taosWUnLockLatch(&pMeta->lock);
while (streamMetaTaskInTimer(pMeta)) {
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
taosWLockLatch(&pMeta->lock);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);

View File

@ -52,6 +52,11 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct {
int8_t type;
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamGlobalEnv {
int8_t inited;
void* timer;

View File

@ -274,6 +274,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
void streamMetaClear(SStreamMeta* pMeta) {
// remove all existed tasks in this vnode
void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
SStreamTask* p = *(SStreamTask**)pIter;
@ -694,8 +695,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream "
"manually",
"stream manually",
vgId, tsDataDir);
return -1;
}
@ -857,7 +857,7 @@ void metaHbToMnode(void* param, void* tmrId) {
}
// not leader not send msg
if (pMeta->role == NODE_ROLE_FOLLOWER) {
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
@ -980,9 +980,8 @@ void metaHbToMnode(void* param, void* tmrId) {
taosReleaseRef(streamMetaId, rid);
}
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
@ -1036,7 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
stDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) {
while (streamMetaTaskInTimer(pMeta)) {
stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}