fix(stream): fix dead lock when shutdown all tasks.

This commit is contained in:
Haojun Liao 2023-10-24 19:11:24 +08:00
parent 9a1a8feec2
commit 1ccbb77981
4 changed files with 20 additions and 11 deletions

View File

@ -181,11 +181,6 @@ typedef struct {
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamRefDataBlock; } SStreamRefDataBlock;
typedef struct {
int8_t type;
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamQueueNode SStreamQueueNode; typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode { struct SStreamQueueNode {
@ -804,6 +799,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -1924,8 +1924,15 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} else { } else {
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0; terrno = 0;
taosWUnLockLatch(&pMeta->lock);
while (streamMetaTaskInTimer(pMeta)) {
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
taosWLockLatch(&pMeta->lock);
int32_t code = streamMetaReopen(pMeta); int32_t code = streamMetaReopen(pMeta);
if (code != 0) { if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId); tqError("vgId:%d failed to reopen stream meta", vgId);

View File

@ -52,6 +52,11 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct {
int8_t type;
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamGlobalEnv { typedef struct SStreamGlobalEnv {
int8_t inited; int8_t inited;
void* timer; void* timer;

View File

@ -274,6 +274,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
} }
void streamMetaClear(SStreamMeta* pMeta) { void streamMetaClear(SStreamMeta* pMeta) {
// remove all existed tasks in this vnode
void* pIter = NULL; void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) { while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
SStreamTask* p = *(SStreamTask**)pIter; SStreamTask* p = *(SStreamTask**)pIter;
@ -694,7 +695,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
stError( stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream manually", vgId, tsDataDir); "stream manually",
vgId, tsDataDir);
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -855,7 +857,7 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
// not leader not send msg // not leader not send msg
if (pMeta->role == NODE_ROLE_FOLLOWER) { if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0; pMeta->pHbInfo->hbStart = 0;
@ -978,9 +980,8 @@ void metaHbToMnode(void* param, void* tmrId) {
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false; bool inTimer = false;
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
void* pIter = NULL; void* pIter = NULL;
@ -1034,7 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
stDebug("vgId:%d start to check all tasks", vgId); stDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) { while (streamMetaTaskInTimer(pMeta)) {
stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100); taosMsleep(100);
} }