This commit is contained in:
Shengliang Guan 2021-01-20 20:08:59 +08:00
parent b01addfd65
commit 9ce66f4a5e
1 changed files with 173 additions and 83 deletions

View File

@ -13,13 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tsdbint.h"
// Sync handle
typedef struct {
STsdbRepo *pRepo;
SRtn rtn;
int socketFd;
int32_t socketFd;
void * pBuf;
SMFile * pmf;
SMFile mf;
@ -29,22 +32,22 @@ typedef struct {
#define SYNC_BUFFER(sh) ((sh)->pBuf)
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd);
static void tsdbDestroySyncH(SSyncH *pSyncH);
static int tsdbSyncSendMeta(SSyncH *pSynch);
static int tsdbSyncRecvMeta(SSyncH *pSynch);
static int tsdbSendMetaInfo(SSyncH *pSynch);
static int tsdbRecvMetaInfo(SSyncH *pSynch);
static int tsdbSendDecision(SSyncH *pSynch, bool toSend);
static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend);
static int tsdbSyncSendDFileSetArray(SSyncH *pSynch);
static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch);
static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
static int tsdbRecvDFileSetInfo(SSyncH *pSynch);
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd);
static void tsdbDestroySyncH(SSyncH *pSyncH);
static int32_t tsdbSyncSendMeta(SSyncH *pSynch);
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch);
static int32_t tsdbSendMetaInfo(SSyncH *pSynch);
static int32_t tsdbRecvMetaInfo(SSyncH *pSynch);
static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend);
static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend);
static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch);
static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch);
static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) {
int32_t tsdbSyncSend(STsdbRepo *pRepo, int32_t socketFd) {
SSyncH synch = {0};
tsdbInitSyncH(&synch, pRepo, socketFd);
@ -52,12 +55,12 @@ int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) {
sem_wait(&(pRepo->readyToCommit));
if (tsdbSyncSendMeta(&synch) < 0) {
tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to send metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbSyncSendDFileSetArray(&synch) < 0) {
tsdbError("vgId:%d failed to send data file set array since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to send filesets since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
@ -72,19 +75,19 @@ _err:
return -1;
}
int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) {
int32_t tsdbSyncRecv(STsdbRepo *pRepo, int32_t socketFd) {
SSyncH synch;
tsdbInitSyncH(&synch, pRepo, socketFd);
tsdbStartFSTxn(pRepo, 0, 0);
if (tsdbSyncRecvMeta(&synch) < 0) {
tsdbError("vgId:%d failed to sync recv meta file from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbSyncRecvDFileSetArray(&synch) < 0) {
tsdbError("vgId:%d failed to sync recv data file set from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to recv filesets since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
@ -100,7 +103,7 @@ _err:
return -1;
}
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) {
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd) {
pSyncH->pRepo = pRepo;
pSyncH->socketFd = socketFd;
tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
@ -108,90 +111,109 @@ static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) {
static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); }
// ============ SYNC META API
static int tsdbSyncSendMeta(SSyncH *pSynch) {
static int32_t tsdbSyncSendMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
bool toSendMeta = false;
SMFile mf;
// Send meta info to remote
if (tsdbSendMetaInfo(pSynch) < 0) {
tsdbError("vgId:%d failed to send meta file info since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (pRepo->fs->cstatus->pmf == NULL) {
// No meta file, not need to wait to retrieve meta file
tsdbInfo("vgId:%d, metafile not exist, no need to send", REPO_ID(pRepo));
return 0;
}
if (tsdbRecvDecision(pSynch, &toSendMeta) < 0) {
tsdbError("vgId:%d failed to recv send file decision since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to recv decision while send meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (toSendMeta) {
tsdbInfo("vgId:%d, metafile will be sent", REPO_ID(pRepo));
tsdbInitMFileEx(&mf, pRepo->fs->cstatus->pmf);
if (tsdbOpenMFile(&mf, O_RDONLY) < 0) {
tsdbError("vgId:%d failed to open meta file while sync send meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to open file while send metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, mf.info.size) < mf.info.size) {
tsdbError("vgId:%d failed to copy meta file to remote since %s", REPO_ID(pRepo), tstrerror(terrno));
int32_t writeLen = mf.info.size;
int32_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to send metafile since %s, ret:%d writeLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret,
writeLen);
tsdbCloseMFile(&mf);
return -1;
}
tsdbCloseMFile(&mf);
tsdbInfo("vgId:%d, metafile is sent, size:%d", REPO_ID(pRepo), writeLen);
} else {
tsdbInfo("vgId:%d, metafile is same, no need to send", REPO_ID(pRepo));
}
return 0;
}
static int tsdbSyncRecvMeta(SSyncH *pSynch) {
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
SMFile * pLMFile = pRepo->fs->cstatus->pmf;
// Recv meta info from remote
if (tsdbRecvMetaInfo(pSynch) < 0) {
tsdbError("vgId:%d failed to recv meta info from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbError("vgId:%d, failed to recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// No meta file, do nothing (rm local meta file)
if (pSynch->pmf == NULL) {
tsdbInfo("vgId:%d, metafile not exist in remote, no need to recv", REPO_ID(pRepo));
return 0;
}
if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) {
// Local has no meta file or has a different meta file, need to copy from remote
if (tsdbSendDecision(pSynch, true) < 0) {
// TODO
tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
tsdbInfo("vgId:%d, metafile will be received", REPO_ID(pRepo));
// Recv from remote
SMFile mf;
SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf, false) < 0) {
// TODO
tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), pSynch->pmf->info.size) < pSynch->pmf->info.size) {
// TODO
int32_t readLen = pSynch->pmf->info.size;
int32_t ret = taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), readLen);
if (ret != readLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv metafile since %s, ret:%d readLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret,
readLen);
tsdbCloseMFile(&mf);
tsdbRemoveMFile(&mf);
return -1;
}
tsdbInfo("vgId:%d, metafile is received, size:%d", REPO_ID(pRepo), readLen);
tsdbCloseMFile(&mf);
tsdbUpdateMFile(REPO_FS(pRepo), &mf);
} else {
tsdbInfo("vgId:%d, metafile is same, no need to recv", REPO_ID(pRepo));
if (tsdbSendDecision(pSynch, false) < 0) {
// TODO
tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
@ -199,7 +221,7 @@ static int tsdbSyncRecvMeta(SSyncH *pSynch) {
return 0;
}
static int tsdbSendMetaInfo(SSyncH *pSynch) {
static int32_t tsdbSendMetaInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
SMFile * pMFile = pRepo->fs->cstatus->pmf;
@ -209,6 +231,7 @@ static int tsdbSendMetaInfo(SSyncH *pSynch) {
}
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
tsdbError("vgId:%d, failed to makeroom while send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -220,45 +243,55 @@ static int tsdbSendMetaInfo(SSyncH *pSynch) {
taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
}
if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(uint32_t)) < tlen + sizeof(uint32_t)) {
tsdbError("vgId:%d failed to send sync meta file info to remote since %s", REPO_ID(pRepo), strerror(errno));
int32_t writeLen = tlen + sizeof(uint32_t);
int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to send metainfo since %s, ret:%d writeLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret,
writeLen);
return -1;
}
return 0;
}
static int tsdbRecvMetaInfo(SSyncH *pSynch) {
uint32_t tlen;
char buf[64] = "\0";
static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
char buf[64] = {0};
if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) {
// TODO
int32_t readLen = sizeof(uint32_t);
int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen);
if (ret != readLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv metainfo len, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen);
return -1;
}
taosDecodeFixedU32(buf, &tlen);
tsdbInfo("vgId:%d, metainfo len:%d is received", REPO_ID(pRepo), tlen);
if (tlen == 0) {
pSynch->pmf = NULL;
return 0;
}
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
tsdbError("vgId:%d, failed to makeroom while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) {
// TODO
ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen);
if (ret != tlen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv metainfo, ret:%d readLen:%d", REPO_ID(pRepo), ret, tlen);
return -1;
}
tsdbInfo("vgId:%d, metainfo is received", REPO_ID(pRepo));
if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
// TODO
terrno = TSDB_CODE_TDB_MESSED_MSG;
tsdbError("vgId:%d, failed to checksum while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -268,34 +301,38 @@ static int tsdbRecvMetaInfo(SSyncH *pSynch) {
return 0;
}
static int tsdbSendDecision(SSyncH *pSynch, bool toSend) {
uint8_t decision = toSend;
static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) {
STsdbRepo *pRepo = pSynch->pRepo;
uint8_t decision = toSend;
if (taosWriteMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) {
// TODO
int32_t writeLen = sizeof(uint8_t);
int32_t ret = taosWriteMsg(pSynch->socketFd, (void *)(&decision), writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to send decison, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen);
return -1;
}
return 0;
}
static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
uint8_t decision;
static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
STsdbRepo *pRepo = pSynch->pRepo;
uint8_t decision = 0;
if (taosReadMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) {
// TODO
int32_t readLen = sizeof(uint8_t);
int32_t ret = taosReadMsg(pSynch->socketFd, (void *)(&decision), readLen);
if (ret != readLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv decison, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen);
return -1;
}
*toSend = decision;
return 0;
}
// ========== SYNC DATA FILE SET ARRAY API
static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
@ -306,12 +343,13 @@ static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
do {
pSet = tsdbFSIterNext(&fsiter);
if (tsdbSyncSendDFileSet(pSynch, pSet) < 0) {
// TODO
tsdbError("vgId:%d, failed to send fileset:%d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
// No more file set to send, jut break
if (pSet == NULL) {
tsdbInfo("vgId:%d, no filesets any more", REPO_ID(pRepo));
break;
}
} while (true);
@ -319,7 +357,7 @@ static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
return 0;
}
static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
@ -329,34 +367,44 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
pLSet = tsdbFSIterNext(&fsiter);
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO
tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
while (true) {
if (pLSet == NULL && pSynch->pdf == NULL) break;
if (pLSet == NULL && pSynch->pdf == NULL) {
tsdbInfo("vgId:%d, all filesets is disposed", REPO_ID(pRepo));
break;
} else {
tsdbInfo("vgId:%d, fileset local:%d remote:%d, will be disposed", REPO_ID(pRepo), pLSet != NULL ? pLSet->fid : -1,
pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
}
if (pLSet && (pSynch->pdf == NULL || pLSet->fid < pSynch->pdf->fid)) {
// remote not has pLSet->fid set, just remove local (do nothing to remote the fset)
tsdbInfo("vgId:%d, fileset:%d smaller than remote:%d, remove it", REPO_ID(pRepo), pLSet->fid, pSynch->pdf->fid);
pLSet = tsdbFSIterNext(&fsiter);
} else {
if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) {
// Just keep local files and notify remote not to send
tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);
if (tsdbUpdateDFileSet(pfs, pLSet) < 0) {
// TODO
tsdbError("vgId:%d, failed to update fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (tsdbSendDecision(pSynch, false) < 0) {
// TODO
tsdbError("vgId:%d, filed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
} else {
// Need to copy from remote
tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid);
// Notify remote to send there file here
if (tsdbSendDecision(pSynch, true) < 0) {
// TODO
tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -367,6 +415,7 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -374,34 +423,45 @@ static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
// Create new FSET
if (tsdbCreateDFileSet(&fset, false) < 0) {
// TODO
tsdbError("vgId:%d, failed to create fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file
SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file
if (taosCopyFds(pSynch->socketFd, pDFile->fd, pRDFile->info.size) < pRDFile->info.size) {
// TODO
tsdbInfo("vgId:%d, file:%s will be received, osize:%" PRIu64 " rsize:%" PRIu64, REPO_ID(pRepo),
pDFile->f.aname, pDFile->info.size, pRDFile->info.size);
int32_t writeLen = pRDFile->info.size;
int32_t ret = taosCopyFds(pSynch->socketFd, pDFile->fd, writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv file:%s since %s, ret:%d writeLen:%d", REPO_ID(pRepo), pDFile->f.aname,
tstrerror(terrno), ret, writeLen);
tsdbCloseDFileSet(&fset);
tsdbRemoveDFileSet(&fset);
return -1;
}
// Update new file info
pDFile->info = pRDFile->info;
tsdbInfo("vgId:%d, file:%s is received, size:%d", REPO_ID(pRepo), pDFile->f.aname, writeLen);
}
tsdbCloseDFileSet(&fset);
if (tsdbUpdateDFileSet(pfs, &fset) < 0) {
// TODO
tsdbInfo("vgId:%d, fileset:%d failed to update since %s", REPO_ID(pRepo), fset.fid, tstrerror(terrno));
return -1;
}
tsdbInfo("vgId:%d, fileset:%d is received", REPO_ID(pRepo), pSynch->pdf->fid);
}
// Move forward
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO
tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -455,11 +515,12 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
return true;
}
static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo *pRepo = pSynch->pRepo;
bool toSend = false;
if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) {
// TODO
tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
@ -469,37 +530,56 @@ static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
}
if (tsdbRecvDecision(pSynch, &toSend) < 0) {
tsdbError("vgId:%d, failed to recv decision while send fileset:%d since %s", REPO_ID(pRepo), pSet->fid,
tstrerror(terrno));
return -1;
}
if (toSend) {
tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid);
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype);
if (tsdbOpenDFile(&df, O_RDONLY) < 0) {
tsdbError("vgId:%d, failed to file:%s since %s", REPO_ID(pRepo), df.f.aname, tstrerror(terrno));
return -1;
}
if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, df.info.size) < df.info.size) {
int32_t writeLen = df.info.size;
tsdbInfo("vgId:%d, file:%s will be sent, size:%d", REPO_ID(pRepo), df.f.aname, writeLen);
int32_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to send file:%s since %s, ret:%d writeLen:%d", REPO_ID(pRepo), df.f.aname,
tstrerror(terrno), ret, writeLen);
tsdbCloseDFile(&df);
return -1;
}
tsdbInfo("vgId:%d, file:%s is sent", REPO_ID(pRepo), df.f.aname);
tsdbCloseDFile(&df);
}
tsdbInfo("vgId:%d, fileset:%d is sent", REPO_ID(pRepo), pSet->fid);
} else {
tsdbInfo("vgId:%d, fileset:%d is same, no need to send", REPO_ID(pRepo), pSet->fid);
}
return 0;
}
static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
uint32_t tlen = 0;
static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
STsdbRepo *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
if (pSet) {
tlen = tsdbEncodeDFileSetEx(NULL, pSet) + sizeof(TSCKSUM);
}
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) {
tsdbError("vgId:%d, failed to makeroom while send fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -511,42 +591,52 @@ static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
}
if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(tlen)) < tlen + sizeof(tlen)) {
// TODO
int32_t writeLen = tlen + sizeof(uint32_t);
int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen);
if (ret != writeLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to send fileinfo, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen);
return -1;
}
return 0;
}
static int tsdbRecvDFileSetInfo(SSyncH *pSynch) {
uint32_t tlen;
char buf[64] = "\0";
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
uint32_t tlen;
char buf[64] = {0};
if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) {
int32_t readLen = sizeof(uint32_t);
int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen);
if (ret != readLen) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
taosDecodeFixedU32(buf, &tlen);
tsdbInfo("vgId:%d, fileinfo len:%d is received", REPO_ID(pRepo), tlen);
if (tlen == 0) {
pSynch->pdf = NULL;
return 0;
}
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
tsdbError("vgId:%d, failed to makeroom while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) {
ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen);
if (ret != tlen) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbError("vgId:%d, failed to recv fileinfo, ret:%d readLen:%d", REPO_ID(pRepo), ret, tlen);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
terrno = TSDB_CODE_TDB_MESSED_MSG;
tsdbError("vgId:%d, failed to checksum while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}