fix(stream): remove wait for quit in meta hb timer.

This commit is contained in:
Haojun Liao 2024-08-29 16:00:22 +08:00
parent 5bffb0c675
commit 90bec3cd3f
1 changed files with 26 additions and 28 deletions

View File

@ -25,7 +25,6 @@ int32_t streamMetaId = 0;
struct SMetaHbInfo { struct SMetaHbInfo {
tmr_h hbTmr; tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter; int32_t tickCounter;
int32_t hbCount; int32_t hbCount;
int64_t hbStart; int64_t hbStart;
@ -242,6 +241,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
void streamMetaHbToMnode(void* param, void* tmrId) { void streamMetaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param; int64_t rid = *(int64_t*)param;
int32_t code = 0; int32_t code = 0;
int32_t vgId = 0;
int32_t role = 0;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) { if (pMeta == NULL) {
@ -249,29 +250,41 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
return; return;
} }
vgId = pMeta->vgId;
role = pMeta->role;
// need to stop, stop now // need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta if (pMeta->closeFlag) {
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid); code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stDebug("vgId:%d jump out of meta timer", pMeta->vgId); stDebug("vgId:%d jump out of meta timer", vgId);
} else { } else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
} }
return; return;
} }
// not leader not send msg // not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) { if (pMeta->role != NODE_ROLE_LEADER) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid); code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
} else { } else {
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, pMeta->vgId, stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
pMeta->role, rid);
} }
return;
}
pMeta->pHbInfo->hbStart = 0; if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
return; return;
} }
@ -280,17 +293,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs(); pMeta->pHbInfo->hbStart = taosGetTimestampMs();
} }
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
}
return;
}
streamMetaRLock(pMeta); streamMetaRLock(pMeta);
code = streamMetaSendHbHelper(pMeta); code = streamMetaSendHbHelper(pMeta);
if (code) { if (code) {
@ -300,10 +302,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr"); "meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaId, rid);
if (code) { if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
} }
} }
@ -316,7 +318,6 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pInfo->tickCounter = 0; pInfo->tickCounter = 0;
pInfo->stopFlag = 0;
pInfo->msgSendTs = -1; pInfo->msgSendTs = -1;
pInfo->hbCount = 0; pInfo->hbCount = 0;
@ -340,11 +341,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; taosMsleep(2 * META_HB_CHECK_INTERVAL);
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
} }
} }