Merge pull request #11886 from taosdata/feature/3.0_mhli
enh: add flag in syncEnv, to deal with expired timer
This commit is contained in:
commit
f7cd02a970
|
@ -158,6 +158,8 @@ typedef enum {
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
||||||
|
|
||||||
|
bool syncEnvIsStart();
|
||||||
|
|
||||||
extern int32_t sDebugFlag;
|
extern int32_t sDebugFlag;
|
||||||
|
|
||||||
//-----------------------------------------
|
//-----------------------------------------
|
||||||
|
|
|
@ -193,6 +193,9 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
|
if (syncEnvIsStart()) {
|
||||||
|
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
@ -270,7 +273,9 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
|
} else {
|
||||||
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,8 @@ extern "C" {
|
||||||
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
||||||
|
|
||||||
typedef struct SSyncEnv {
|
typedef struct SSyncEnv {
|
||||||
|
uint8_t isStart;
|
||||||
|
|
||||||
// tick timer
|
// tick timer
|
||||||
tmr_h pEnvTickTimer;
|
tmr_h pEnvTickTimer;
|
||||||
int32_t envTickTimerMS;
|
int32_t envTickTimerMS;
|
||||||
|
|
|
@ -26,6 +26,14 @@ static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
|
||||||
static void syncEnvTick(void *param, void *tmrId);
|
static void syncEnvTick(void *param, void *tmrId);
|
||||||
// --------------------------------
|
// --------------------------------
|
||||||
|
|
||||||
|
bool syncEnvIsStart() {
|
||||||
|
if (gSyncEnv == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return atomic_load_8(&(gSyncEnv->isStart));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncEnvStart() {
|
int32_t syncEnvStart() {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
@ -88,12 +96,15 @@ static SSyncEnv *doSyncEnvStart() {
|
||||||
|
|
||||||
// start tmr thread
|
// start tmr thread
|
||||||
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
|
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
|
||||||
|
|
||||||
|
atomic_store_8(&(pSyncEnv->isStart), 1);
|
||||||
return pSyncEnv;
|
return pSyncEnv;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
|
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
|
||||||
assert(pSyncEnv == gSyncEnv);
|
assert(pSyncEnv == gSyncEnv);
|
||||||
if (pSyncEnv != NULL) {
|
if (pSyncEnv != NULL) {
|
||||||
|
atomic_store_8(&(pSyncEnv->isStart), 0);
|
||||||
taosTmrCleanUp(pSyncEnv->pTimerManager);
|
taosTmrCleanUp(pSyncEnv->pTimerManager);
|
||||||
taosMemoryFree(pSyncEnv);
|
taosMemoryFree(pSyncEnv);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue