fix(stream): wakeup the hb timer every 200ms, to speeup the close db procedure.

This commit is contained in:
Haojun Liao 2023-08-24 17:54:50 +08:00
parent caf20d7b5d
commit 70396aa5a5
3 changed files with 35 additions and 16 deletions

View File

@ -369,10 +369,11 @@ struct SStreamTask {
SSHashObj* pNameMap; SSHashObj* pNameMap;
}; };
typedef struct SMgmtInfo { typedef struct SMetaHbInfo {
SEpSet epset; tmr_h hbTmr;
int32_t mnodeId; int32_t stopFlag;
} SMgmtInfo; int32_t tickCounter;
} SMetaHbInfo;
// meta // meta
typedef struct SStreamMeta { typedef struct SStreamMeta {
@ -393,11 +394,9 @@ typedef struct SStreamMeta {
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
tmr_h hbTmr; SMetaHbInfo hbInfo;
int32_t closedTask;
int32_t killed; int32_t chkptNotReadyTasks;
int32_t closedTask;
int32_t chkptNotReadyTasks;
int64_t chkpId; int64_t chkpId;
SArray* chkpSaved; SArray* chkpSaved;

View File

@ -218,8 +218,9 @@ void tqNotifyClose(STQ* pTq) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
pMeta->killed = STREAM_META_WILL_STOP; // wait for the stream meta hb function stopping
while(pMeta->killed != STREAM_META_OK_TO_STOP) { pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100); taosMsleep(100);
tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
} }

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <common/tmisce.h> #include "tmisce.h"
#include "executor.h" #include "executor.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
@ -21,6 +21,9 @@
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#define META_HB_CHECK_INTERVAL 200
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0; int32_t streamBackendCfWrapperId = 0;
@ -89,7 +92,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->stage = stage; pMeta->stage = stage;
// send heartbeat every 5sec. // send heartbeat every 5sec.
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 5000, pMeta, streamEnv.timer); pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0;
pMeta->pTaskBackendUnique = pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -605,16 +610,30 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
return 0; return 0;
} }
static bool readyToSendHb(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) {
// reset the counter
pInfo->tickCounter = 0;
return true;
}
return false;
}
void metaHbToMnode(void* param, void* tmrId) { void metaHbToMnode(void* param, void* tmrId) {
SStreamMeta* pMeta = param; SStreamMeta* pMeta = param;
SStreamHbMsg hbMsg = {0}; SStreamHbMsg hbMsg = {0};
if (pMeta->killed == STREAM_META_WILL_STOP) { // need to stop, stop now
pMeta->killed = STREAM_META_OK_TO_STOP; if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) {
pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP;
qDebug("vgId:%d jump out of meta timer", pMeta->vgId); qDebug("vgId:%d jump out of meta timer", pMeta->vgId);
return; return;
} }
if (!readyToSendHb(&pMeta->hbInfo)) {
return;
}
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -679,5 +698,5 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, &pMeta->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr);
} }