commit
807e181b43
|
@ -212,7 +212,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
|
||||||
|
|
||||||
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
|
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
|
||||||
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
||||||
if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
if (pWrite->code == 0 && pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
||||||
|
|
||||||
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
|
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
|
||||||
}
|
}
|
||||||
|
@ -225,11 +225,13 @@ static void *dnodeProcessVWriteQueue(void *param) {
|
||||||
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
||||||
if (qtype == TAOS_QTYPE_RPC) {
|
if (qtype == TAOS_QTYPE_RPC) {
|
||||||
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
||||||
} else if (qtype == TAOS_QTYPE_FWD) {
|
|
||||||
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
|
|
||||||
taosFreeQitem(pWrite);
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
} else {
|
} else {
|
||||||
|
if (qtype == TAOS_QTYPE_FWD) {
|
||||||
|
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
|
||||||
|
}
|
||||||
|
if (pWrite->rspRet.rsp) {
|
||||||
|
rpcFreeCont(pWrite->rspRet.rsp);
|
||||||
|
}
|
||||||
taosFreeQitem(pWrite);
|
taosFreeQitem(pWrite);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
|
||||||
// the TSDB repository info
|
// the TSDB repository info
|
||||||
typedef struct STsdbRepoInfo {
|
typedef struct STsdbRepoInfo {
|
||||||
STsdbCfg tsdbCfg;
|
STsdbCfg tsdbCfg;
|
||||||
int64_t version; // version of the repository
|
uint64_t version; // version of the repository
|
||||||
int64_t tsdbTotalDataSize; // the original inserted data size
|
int64_t tsdbTotalDataSize; // the original inserted data size
|
||||||
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
|
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
|
||||||
// TODO: Other informations to add
|
// TODO: Other informations to add
|
||||||
|
@ -136,7 +136,7 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo);
|
||||||
// the meter information report structure
|
// the meter information report structure
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STableCfg tableCfg;
|
STableCfg tableCfg;
|
||||||
int64_t version;
|
uint64_t version;
|
||||||
int64_t tableTotalDataSize; // In bytes
|
int64_t tableTotalDataSize; // In bytes
|
||||||
int64_t tableTotalDiskSize; // In bytes
|
int64_t tableTotalDiskSize; // In bytes
|
||||||
} STableInfo;
|
} STableInfo;
|
||||||
|
|
|
@ -71,7 +71,7 @@ typedef struct _SSdbTable {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
ESyncRole role;
|
ESyncRole role;
|
||||||
ESdbStatus status;
|
ESdbStatus status;
|
||||||
int64_t version;
|
uint64_t version;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
void * wal;
|
void * wal;
|
||||||
SSyncCfg cfg;
|
SSyncCfg cfg;
|
||||||
|
|
|
@ -498,7 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
|
||||||
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
|
||||||
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
|
||||||
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
|
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
|
||||||
if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs;
|
if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
|
||||||
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs);
|
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs);
|
||||||
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
|
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
|
||||||
}
|
}
|
||||||
|
@ -575,6 +575,17 @@ static void syncChooseMaster(SSyncNode *pNode) {
|
||||||
if (index == pNode->selfIndex) {
|
if (index == pNode->selfIndex) {
|
||||||
sInfo("vgId:%d, start to work as master", pNode->vgId);
|
sInfo("vgId:%d, start to work as master", pNode->vgId);
|
||||||
nodeRole = TAOS_SYNC_ROLE_MASTER;
|
nodeRole = TAOS_SYNC_ROLE_MASTER;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
for (int32_t i = 0; i < pNode->replica; ++i) {
|
||||||
|
pPeer = pNode->peerInfo[i];
|
||||||
|
if (pPeer->version == nodeVersion) {
|
||||||
|
pPeer->role = TAOS_SYNC_ROLE_SLAVE;
|
||||||
|
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
|
||||||
|
sInfo("%s, it shall work as slave", pPeer->id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
syncResetFlowCtrl(pNode);
|
syncResetFlowCtrl(pNode);
|
||||||
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
(*pNode->notifyRole)(pNode->ahandle, nodeRole);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1209,13 +1220,17 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
int32_t fwdLen;
|
int32_t fwdLen;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
|
|
||||||
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
|
if (pWalHead->version > nodeVersion + 1) {
|
||||||
pWalHead->version, nodeVersion);
|
sError("vgId:%d, hver:%" PRIu64 ", inconsistent with ver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion);
|
||||||
for (int32_t i = 0; i < pNode->replica; ++i) {
|
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||||
pPeer = pNode->peerInfo[i];
|
sInfo("vgId:%d, restart connection", pNode->vgId);
|
||||||
syncRestartConnection(pPeer);
|
for (int32_t i = 0; i < pNode->replica; ++i) {
|
||||||
|
pPeer = pNode->peerInfo[i];
|
||||||
|
syncRestartConnection(pPeer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SYN_INVALID_VERSION;
|
return TSDB_CODE_SYN_INVALID_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue