TD-2083
This commit is contained in:
parent
52e99a8a2b
commit
0edf4333f3
|
@ -106,7 +106,7 @@ typedef struct {
|
||||||
int8_t nacks;
|
int8_t nacks;
|
||||||
int8_t confirmed;
|
int8_t confirmed;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
uint64_t time;
|
int64_t time;
|
||||||
} SFwdInfo;
|
} SFwdInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1204,14 +1204,17 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
||||||
|
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
|
|
||||||
if (pSyncFwds) {;
|
if (pSyncFwds) {
|
||||||
uint64_t time = taosGetTimestampMs();
|
int64_t time = taosGetTimestampMs();
|
||||||
|
|
||||||
if (pSyncFwds->fwds > 0) {
|
if (pSyncFwds->fwds > 0) {
|
||||||
pthread_mutex_lock(&(pNode->mutex));
|
pthread_mutex_lock(&(pNode->mutex));
|
||||||
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
|
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
|
||||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
|
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
|
||||||
if (time - pFwdInfo->time < 2000) break;
|
if (ABS(time - pFwdInfo->time) < 2000) break;
|
||||||
|
|
||||||
|
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
|
||||||
|
pFwdInfo->version, time, pFwdInfo->time);
|
||||||
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1219,12 +1222,9 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
|
||||||
pthread_mutex_unlock(&(pNode->mutex));
|
pthread_mutex_unlock(&(pNode->mutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, (void *)pNode->rid, tsSyncTmrCtrl);
|
taosReleaseRef(tsSyncRefId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosReleaseRef(tsSyncRefId, rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
|
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
|
||||||
SSyncPeer *pPeer;
|
SSyncPeer *pPeer;
|
||||||
SSyncHead *pSyncHead;
|
SSyncHead *pSyncHead;
|
||||||
|
|
Loading…
Reference in New Issue