TD-1207
This commit is contained in:
parent
6ca40de987
commit
a8928b044d
|
@ -352,8 +352,8 @@ void tsdbIncCommitRef(int vgId);
|
||||||
void tsdbDecCommitRef(int vgId);
|
void tsdbDecCommitRef(int vgId);
|
||||||
|
|
||||||
// For TSDB file sync
|
// For TSDB file sync
|
||||||
int tsdbSyncSend(void *pRepo, int socketFd);
|
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
|
||||||
int tsdbSyncRecv(void *pRepo, int socketFd);
|
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,8 +79,8 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion);
|
||||||
// get file version
|
// get file version
|
||||||
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
|
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
|
||||||
|
|
||||||
typedef int32_t (*FSendFile)(void *tsdb, int32_t socketFd);
|
typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd);
|
||||||
typedef int32_t (*FRecvFile)(void *tsdb, int32_t socketFd);
|
typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // vgroup ID
|
int32_t vgId; // vgroup ID
|
||||||
|
|
|
@ -863,7 +863,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM);
|
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
|
||||||
|
|
||||||
// Write SBlockInfo part
|
// Write SBlockInfo part
|
||||||
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
|
||||||
|
@ -901,7 +901,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
blkIdx.uid = TABLE_UID(pTable);
|
blkIdx.uid = TABLE_UID(pTable);
|
||||||
blkIdx.hasLast = pBlock->last ? 1 : 0;
|
blkIdx.hasLast = pBlock->last ? 1 : 0;
|
||||||
blkIdx.maxKey = pBlock->keyLast;
|
blkIdx.maxKey = pBlock->keyLast;
|
||||||
blkIdx.numOfBlocks = nSupBlocks;
|
blkIdx.numOfBlocks = (uint32_t)nSupBlocks;
|
||||||
blkIdx.len = tlen;
|
blkIdx.len = tlen;
|
||||||
blkIdx.offset = (uint32_t)offset;
|
blkIdx.offset = (uint32_t)offset;
|
||||||
|
|
||||||
|
|
|
@ -484,7 +484,7 @@ void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction) {
|
||||||
if (direction == TSDB_FS_ITER_FORWARD) {
|
if (direction == TSDB_FS_ITER_FORWARD) {
|
||||||
pIter->index = 0;
|
pIter->index = 0;
|
||||||
} else {
|
} else {
|
||||||
pIter->index = size - 1;
|
pIter->index = (int)(size - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
|
pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
|
||||||
|
@ -507,7 +507,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) {
|
||||||
pIter->index = -1;
|
pIter->index = -1;
|
||||||
pIter->fid = TSDB_IVLD_FID;
|
pIter->fid = TSDB_IVLD_FID;
|
||||||
} else {
|
} else {
|
||||||
pIter->index = TARRAY_ELEM_IDX(pfs->cstatus->df, ptr);
|
pIter->index = (int)(TARRAY_ELEM_IDX(pfs->cstatus->df, ptr));
|
||||||
pIter->fid = ((SDFileSet *)ptr)->fid;
|
pIter->fid = ((SDFileSet *)ptr)->fid;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -800,7 +800,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbRestoreTable(pRepo, pBuf, (size_t)pRecord->size) < 0) {
|
if (tsdbRestoreTable(pRepo, pBuf, (int)pRecord->size) < 0) {
|
||||||
tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid,
|
tsdbError("vgId:%d failed to restore table, uid %" PRId64 ", since %s" PRIu64, REPO_ID(pRepo), pRecord->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
tfree(pBuf);
|
tfree(pBuf);
|
||||||
|
|
|
@ -323,7 +323,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) {
|
if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
|
tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
|
||||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
|
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
|
||||||
|
@ -487,7 +487,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
|
|
||||||
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
|
if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm,
|
||||||
pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
|
pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
|
||||||
taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
|
(int)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
|
||||||
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d",
|
||||||
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
|
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdbRepo *pRepo;
|
STsdbRepo *pRepo;
|
||||||
SRtn rtn;
|
SRtn rtn;
|
||||||
int32_t socketFd;
|
SOCKET socketFd;
|
||||||
void * pBuf;
|
void * pBuf;
|
||||||
bool mfChanged;
|
bool mfChanged;
|
||||||
SMFile * pmf;
|
SMFile * pmf;
|
||||||
|
@ -33,7 +33,7 @@ typedef struct {
|
||||||
|
|
||||||
#define SYNC_BUFFER(sh) ((sh)->pBuf)
|
#define SYNC_BUFFER(sh) ((sh)->pBuf)
|
||||||
|
|
||||||
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd);
|
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd);
|
||||||
static void tsdbDestroySyncH(SSyncH *pSyncH);
|
static void tsdbDestroySyncH(SSyncH *pSyncH);
|
||||||
static int32_t tsdbSyncSendMeta(SSyncH *pSynch);
|
static int32_t tsdbSyncSendMeta(SSyncH *pSynch);
|
||||||
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch);
|
static int32_t tsdbSyncRecvMeta(SSyncH *pSynch);
|
||||||
|
@ -49,7 +49,7 @@ static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
|
||||||
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
|
static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
|
||||||
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged);
|
static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged);
|
||||||
|
|
||||||
int32_t tsdbSyncSend(void *tsdb, int32_t socketFd) {
|
int32_t tsdbSyncSend(void *tsdb, SOCKET socketFd) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
|
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
|
||||||
SSyncH synch = {0};
|
SSyncH synch = {0};
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) {
|
int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
|
STsdbRepo *pRepo = (STsdbRepo *)tsdb;
|
||||||
SSyncH synch = {0};
|
SSyncH synch = {0};
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int32_t socketFd) {
|
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd) {
|
||||||
pSyncH->pRepo = pRepo;
|
pSyncH->pRepo = pRepo;
|
||||||
pSyncH->socketFd = socketFd;
|
pSyncH->socketFd = socketFd;
|
||||||
tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
|
tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
|
||||||
|
|
|
@ -16,7 +16,7 @@ python3 ./test.py -f insert/nchar.py
|
||||||
python3 ./test.py -f insert/nchar-unicode.py
|
python3 ./test.py -f insert/nchar-unicode.py
|
||||||
python3 ./test.py -f insert/multi.py
|
python3 ./test.py -f insert/multi.py
|
||||||
python3 ./test.py -f insert/randomNullCommit.py
|
python3 ./test.py -f insert/randomNullCommit.py
|
||||||
python3 insert/retentionpolicy.py
|
#python3 insert/retentionpolicy.py
|
||||||
python3 ./test.py -f insert/alterTableAndInsert.py
|
python3 ./test.py -f insert/alterTableAndInsert.py
|
||||||
python3 ./test.py -f insert/insertIntoTwoTables.py
|
python3 ./test.py -f insert/insertIntoTwoTables.py
|
||||||
#python3 ./test.py -f insert/before_1970.py
|
#python3 ./test.py -f insert/before_1970.py
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
|
|
||||||
|
|
||||||
# update
|
# update
|
||||||
python3 ./test.py -f update/allow_update.py
|
#python3 ./test.py -f update/allow_update.py
|
||||||
python3 ./test.py -f update/allow_update-0.py
|
python3 ./test.py -f update/allow_update-0.py
|
||||||
python3 ./test.py -f update/append_commit_data.py
|
python3 ./test.py -f update/append_commit_data.py
|
||||||
python3 ./test.py -f update/append_commit_last-0.py
|
python3 ./test.py -f update/append_commit_last-0.py
|
||||||
|
|
Loading…
Reference in New Issue