Merge pull request #2783 from taosdata/feature/syncFC
first draft for sync flow control
This commit is contained in:
commit
10e2b030f3
|
@ -78,6 +78,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
|
|||
// when role is changed, call this to notify app
|
||||
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
||||
|
||||
// if a number of retrieving data failed, call this to start flow control
|
||||
typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds);
|
||||
|
||||
// when data file is synced successfully, notity app
|
||||
typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
|
||||
|
||||
|
@ -93,6 +96,7 @@ typedef struct {
|
|||
FWriteToCache writeToCache;
|
||||
FConfirmForward confirmForward;
|
||||
FNotifyRole notifyRole;
|
||||
FNotifyFlowCtrl notifyFlowCtrl;
|
||||
FNotifyFileSynced notifyFileSynced;
|
||||
} SSyncInfo;
|
||||
|
||||
|
|
|
@ -125,6 +125,8 @@ typedef struct SsyncPeer {
|
|||
uint64_t sversion; // track the peer version in retrieve process
|
||||
int syncFd;
|
||||
int peerFd; // forward FD
|
||||
int numOfRetrieves; // number of retrieves tried
|
||||
int fileChanged; // a flag to indicate file is changed during retrieving process
|
||||
void *timer;
|
||||
void *pConn;
|
||||
int notifyFd;
|
||||
|
@ -152,6 +154,7 @@ typedef struct SSyncNode {
|
|||
FWriteToCache writeToCache;
|
||||
FConfirmForward confirmForward;
|
||||
FNotifyRole notifyRole;
|
||||
FNotifyFlowCtrl notifyFlowCtrl;
|
||||
FNotifyFileSynced notifyFileSynced;
|
||||
pthread_mutex_t mutex;
|
||||
} SSyncNode;
|
||||
|
|
|
@ -137,6 +137,7 @@ void *syncStart(const SSyncInfo *pInfo)
|
|||
pNode->writeToCache = pInfo->writeToCache;
|
||||
pNode->notifyRole = pInfo->notifyRole;
|
||||
pNode->confirmForward = pInfo->confirmForward;
|
||||
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
|
||||
pNode->notifyFileSynced = pInfo->notifyFileSynced;
|
||||
|
||||
pNode->selfIndex = -1;
|
||||
|
@ -530,6 +531,16 @@ void syncBroadcastStatus(SSyncNode *pNode)
|
|||
}
|
||||
}
|
||||
|
||||
static void syncResetFlowCtrl(SSyncNode *pNode) {
|
||||
|
||||
for (int i = 0; i < pNode->replica; ++i) {
|
||||
pNode->peerInfo[i]->numOfRetrieves = 0;
|
||||
}
|
||||
|
||||
if (pNode->notifyFlowCtrl)
|
||||
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
|
||||
}
|
||||
|
||||
static void syncChooseMaster(SSyncNode *pNode) {
|
||||
SSyncPeer *pPeer;
|
||||
int onlineNum = 0;
|
||||
|
@ -575,6 +586,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
|||
if (index == pNode->selfIndex) {
|
||||
sInfo("vgId:%d, start to work as master", pNode->vgId);
|
||||
nodeRole = TAOS_SYNC_ROLE_MASTER;
|
||||
syncResetFlowCtrl(pNode);
|
||||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||
} else {
|
||||
pPeer = pNode->peerInfo[index];
|
||||
|
@ -706,6 +718,9 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
|
|||
|
||||
if (peerOldRole != newRole || nodeRole != selfOldRole)
|
||||
syncBroadcastStatus(pNode);
|
||||
|
||||
if (nodeRole != TAOS_SYNC_ROLE_MASTER)
|
||||
syncResetFlowCtrl(pNode);
|
||||
}
|
||||
|
||||
static void syncRestartPeer(SSyncPeer *pPeer) {
|
||||
|
|
|
@ -83,6 +83,7 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
|
|||
int code = 0;
|
||||
if (len >0) {
|
||||
sDebug("%s, processed file is changed", pPeer->id);
|
||||
pPeer->fileChanged = 1;
|
||||
code = 1;
|
||||
}
|
||||
|
||||
|
@ -454,9 +455,11 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
|
|||
|
||||
void *syncRetrieveData(void *param)
|
||||
{
|
||||
SSyncPeer *pPeer = (SSyncPeer *)param;
|
||||
SSyncPeer *pPeer = (SSyncPeer *)param;
|
||||
SSyncNode *pNode = pPeer->pSyncNode;
|
||||
taosBlockSIGPIPE();
|
||||
|
||||
pPeer->fileChanged = 0;
|
||||
pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
|
||||
if (pPeer->syncFd < 0) {
|
||||
sError("%s, failed to open socket to sync", pPeer->id);
|
||||
|
@ -471,6 +474,18 @@ void *syncRetrieveData(void *param)
|
|||
}
|
||||
}
|
||||
|
||||
if (pPeer->fileChanged) {
|
||||
// if file is changed 3 times continuously, start flow control
|
||||
pPeer->numOfRetrieves++;
|
||||
if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl)
|
||||
(*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2));
|
||||
} else {
|
||||
pPeer->numOfRetrieves = 0;
|
||||
if (pNode->notifyFlowCtrl)
|
||||
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
|
||||
}
|
||||
|
||||
pPeer->fileChanged = 0;
|
||||
tclose(pPeer->notifyFd);
|
||||
tclose(pPeer->syncFd);
|
||||
syncDecPeerRef(pPeer);
|
||||
|
|
|
@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
|
|||
typedef struct {
|
||||
int32_t vgId; // global vnode group ID
|
||||
int32_t refCount; // reference count
|
||||
int32_t delay;
|
||||
int8_t status;
|
||||
int8_t role;
|
||||
int8_t accessState;
|
||||
|
|
|
@ -44,6 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
|
|||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
|
||||
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
||||
static void vnodeNotifyRole(void *ahandle, int8_t role);
|
||||
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
|
||||
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
|
||||
|
||||
int32_t vnodeInitResources() {
|
||||
|
@ -277,6 +278,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
syncInfo.writeToCache = vnodeWriteToQueue;
|
||||
syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp;
|
||||
syncInfo.notifyRole = vnodeNotifyRole;
|
||||
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
|
||||
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
|
||||
pVnode->sync = syncStart(&syncInfo);
|
||||
|
||||
|
@ -549,6 +551,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
|||
cqStop(pVnode->cq);
|
||||
}
|
||||
|
||||
static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) {
|
||||
SVnodeObj *pVnode = ahandle;
|
||||
if (pVnode->delay != mseconds)
|
||||
vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds);
|
||||
pVnode->delay = mseconds;
|
||||
}
|
||||
|
||||
static int vnodeResetTsdb(SVnodeObj *pVnode)
|
||||
{
|
||||
char rootDir[128] = "\0";
|
||||
|
|
|
@ -78,6 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
|||
// assign version
|
||||
pVnode->version++;
|
||||
pHead->version = pVnode->version;
|
||||
if (pVnode->delay) usleep(pVnode->delay*1000);
|
||||
|
||||
} else { // from wal or forward
|
||||
// for data from WAL or forward, version may be smaller
|
||||
if (pHead->version <= pVnode->version) return 0;
|
||||
|
|
Loading…
Reference in New Issue