refact: change sst to stt
This commit is contained in:
parent
07c19b736c
commit
e70db79d10
|
@ -43,14 +43,14 @@ typedef struct STbDataIter STbDataIter;
|
||||||
typedef struct SMapData SMapData;
|
typedef struct SMapData SMapData;
|
||||||
typedef struct SBlockIdx SBlockIdx;
|
typedef struct SBlockIdx SBlockIdx;
|
||||||
typedef struct SDataBlk SDataBlk;
|
typedef struct SDataBlk SDataBlk;
|
||||||
typedef struct SSstBlk SSstBlk;
|
typedef struct SSttBlk SSttBlk;
|
||||||
typedef struct SColData SColData;
|
typedef struct SColData SColData;
|
||||||
typedef struct SDiskDataHdr SDiskDataHdr;
|
typedef struct SDiskDataHdr SDiskDataHdr;
|
||||||
typedef struct SBlockData SBlockData;
|
typedef struct SBlockData SBlockData;
|
||||||
typedef struct SDelFile SDelFile;
|
typedef struct SDelFile SDelFile;
|
||||||
typedef struct SHeadFile SHeadFile;
|
typedef struct SHeadFile SHeadFile;
|
||||||
typedef struct SDataFile SDataFile;
|
typedef struct SDataFile SDataFile;
|
||||||
typedef struct SSstFile SSstFile;
|
typedef struct SSttFile SSttFile;
|
||||||
typedef struct SSmaFile SSmaFile;
|
typedef struct SSmaFile SSmaFile;
|
||||||
typedef struct SDFileSet SDFileSet;
|
typedef struct SDFileSet SDFileSet;
|
||||||
typedef struct SDataFWriter SDataFWriter;
|
typedef struct SDataFWriter SDataFWriter;
|
||||||
|
@ -69,8 +69,8 @@ typedef struct SLDataIter SLDataIter;
|
||||||
|
|
||||||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
#define TSDB_MAX_SST_FILE 16
|
#define TSDB_MAX_STT_FILE 16
|
||||||
#define TSDB_DEFAULT_SST_FILE 8
|
#define TSDB_DEFAULT_STT_FILE 8
|
||||||
#define TSDB_FHDR_SIZE 512
|
#define TSDB_FHDR_SIZE 512
|
||||||
#define TSDB_DEFAULT_PAGE_SIZE 4096
|
#define TSDB_DEFAULT_PAGE_SIZE 4096
|
||||||
|
|
||||||
|
@ -130,9 +130,9 @@ int32_t tPutDataBlk(uint8_t *p, void *ph);
|
||||||
int32_t tGetDataBlk(uint8_t *p, void *ph);
|
int32_t tGetDataBlk(uint8_t *p, void *ph);
|
||||||
int32_t tDataBlkCmprFn(const void *p1, const void *p2);
|
int32_t tDataBlkCmprFn(const void *p1, const void *p2);
|
||||||
bool tDataBlkHasSma(SDataBlk *pDataBlk);
|
bool tDataBlkHasSma(SDataBlk *pDataBlk);
|
||||||
// SSstBlk
|
// SSttBlk
|
||||||
int32_t tPutSstBlk(uint8_t *p, void *ph);
|
int32_t tPutSttBlk(uint8_t *p, void *ph);
|
||||||
int32_t tGetSstBlk(uint8_t *p, void *ph);
|
int32_t tGetSttBlk(uint8_t *p, void *ph);
|
||||||
// SBlockIdx
|
// SBlockIdx
|
||||||
int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
||||||
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
||||||
|
@ -228,7 +228,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2);
|
||||||
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype);
|
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype);
|
||||||
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile);
|
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile);
|
||||||
int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile);
|
int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile);
|
||||||
int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile);
|
int32_t tPutSttFile(uint8_t *p, SSttFile *pSttFile);
|
||||||
int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile);
|
int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile);
|
||||||
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
|
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
|
||||||
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
|
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
|
||||||
|
@ -237,7 +237,7 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet);
|
||||||
|
|
||||||
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]);
|
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]);
|
||||||
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
|
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
|
||||||
void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]);
|
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]);
|
||||||
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
|
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
|
||||||
// SDelFile
|
// SDelFile
|
||||||
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
||||||
|
@ -263,7 +263,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
|
||||||
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
|
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
|
||||||
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
|
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
|
||||||
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
|
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
|
||||||
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk);
|
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
||||||
int8_t cmprAlg, int8_t toLast);
|
int8_t cmprAlg, int8_t toLast);
|
||||||
|
|
||||||
|
@ -273,10 +273,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
||||||
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
||||||
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
|
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
|
||||||
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk);
|
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
|
||||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
|
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
|
||||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
|
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
|
||||||
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
|
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
|
||||||
// SDelFWriter
|
// SDelFWriter
|
||||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
||||||
|
@ -448,7 +448,7 @@ struct SDataBlk {
|
||||||
SSmaInfo smaInfo;
|
SSmaInfo smaInfo;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSstBlk {
|
struct SSttBlk {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t minUid;
|
int64_t minUid;
|
||||||
int64_t maxUid;
|
int64_t maxUid;
|
||||||
|
@ -550,7 +550,7 @@ struct SDataFile {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSstFile {
|
struct SSttFile {
|
||||||
volatile int32_t nRef;
|
volatile int32_t nRef;
|
||||||
|
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
|
@ -571,8 +571,8 @@ struct SDFileSet {
|
||||||
SHeadFile *pHeadF;
|
SHeadFile *pHeadF;
|
||||||
SDataFile *pDataF;
|
SDataFile *pDataF;
|
||||||
SSmaFile *pSmaF;
|
SSmaFile *pSmaF;
|
||||||
uint8_t nSstF;
|
uint8_t nSttF;
|
||||||
SSstFile *aSstF[TSDB_MAX_SST_FILE];
|
SSttFile *aSttF[TSDB_MAX_STT_FILE];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRowIter {
|
struct SRowIter {
|
||||||
|
@ -617,12 +617,12 @@ struct SDataFWriter {
|
||||||
STsdbFD *pHeadFD;
|
STsdbFD *pHeadFD;
|
||||||
STsdbFD *pDataFD;
|
STsdbFD *pDataFD;
|
||||||
STsdbFD *pSmaFD;
|
STsdbFD *pSmaFD;
|
||||||
STsdbFD *pSstFD;
|
STsdbFD *pSttFD;
|
||||||
|
|
||||||
SHeadFile fHead;
|
SHeadFile fHead;
|
||||||
SDataFile fData;
|
SDataFile fData;
|
||||||
SSmaFile fSma;
|
SSmaFile fSma;
|
||||||
SSstFile fSst[TSDB_MAX_SST_FILE];
|
SSttFile fStt[TSDB_MAX_STT_FILE];
|
||||||
|
|
||||||
uint8_t *aBuf[4];
|
uint8_t *aBuf[4];
|
||||||
};
|
};
|
||||||
|
@ -633,7 +633,7 @@ struct SDataFReader {
|
||||||
STsdbFD *pHeadFD;
|
STsdbFD *pHeadFD;
|
||||||
STsdbFD *pDataFD;
|
STsdbFD *pDataFD;
|
||||||
STsdbFD *pSmaFD;
|
STsdbFD *pSmaFD;
|
||||||
STsdbFD *aSstFD[TSDB_MAX_SST_FILE];
|
STsdbFD *aSttFD[TSDB_MAX_STT_FILE];
|
||||||
uint8_t *aBuf[3];
|
uint8_t *aBuf[3];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -32,12 +32,12 @@ typedef struct {
|
||||||
STbDataIter iter;
|
STbDataIter iter;
|
||||||
}; // memory data iter
|
}; // memory data iter
|
||||||
struct {
|
struct {
|
||||||
int32_t iSst;
|
int32_t iStt;
|
||||||
SArray *aSstBlk;
|
SArray *aSttBlk;
|
||||||
int32_t iSstBlk;
|
int32_t iSttBlk;
|
||||||
SBlockData bData;
|
SBlockData bData;
|
||||||
int32_t iRow;
|
int32_t iRow;
|
||||||
}; // sst file data iter
|
}; // stt file data iter
|
||||||
};
|
};
|
||||||
} SDataIter;
|
} SDataIter;
|
||||||
|
|
||||||
|
@ -71,13 +71,13 @@ typedef struct {
|
||||||
SDataIter *pIter;
|
SDataIter *pIter;
|
||||||
SRBTree rbt;
|
SRBTree rbt;
|
||||||
SDataIter dataIter;
|
SDataIter dataIter;
|
||||||
SDataIter aDataIter[TSDB_MAX_SST_FILE];
|
SDataIter aDataIter[TSDB_MAX_STT_FILE];
|
||||||
int8_t toLastOnly;
|
int8_t toLastOnly;
|
||||||
};
|
};
|
||||||
struct {
|
struct {
|
||||||
SDataFWriter *pWriter;
|
SDataFWriter *pWriter;
|
||||||
SArray *aBlockIdx; // SArray<SBlockIdx>
|
SArray *aBlockIdx; // SArray<SBlockIdx>
|
||||||
SArray *aSstBlk; // SArray<SSstBlk>
|
SArray *aSttBlk; // SArray<SSttBlk>
|
||||||
SMapData mBlock; // SMapData<SDataBlk>
|
SMapData mBlock; // SMapData<SDataBlk>
|
||||||
SBlockData bData;
|
SBlockData bData;
|
||||||
SBlockData bDatal;
|
SBlockData bDatal;
|
||||||
|
@ -428,21 +428,21 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
||||||
pCommitter->toLastOnly = 0;
|
pCommitter->toLastOnly = 0;
|
||||||
SDataFReader *pReader = pCommitter->dReader.pReader;
|
SDataFReader *pReader = pCommitter->dReader.pReader;
|
||||||
if (pReader) {
|
if (pReader) {
|
||||||
if (pReader->pSet->nSstF >= pCommitter->maxLast) {
|
if (pReader->pSet->nSttF >= pCommitter->maxLast) {
|
||||||
int8_t iIter = 0;
|
int8_t iIter = 0;
|
||||||
for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
|
||||||
pIter = &pCommitter->aDataIter[iIter];
|
pIter = &pCommitter->aDataIter[iIter];
|
||||||
pIter->type = LAST_DATA_ITER;
|
pIter->type = LAST_DATA_ITER;
|
||||||
pIter->iSst = iSst;
|
pIter->iStt = iStt;
|
||||||
|
|
||||||
code = tsdbReadSstBlk(pCommitter->dReader.pReader, iSst, pIter->aSstBlk);
|
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
if (taosArrayGetSize(pIter->aSstBlk) == 0) continue;
|
if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
|
||||||
|
|
||||||
pIter->iSstBlk = 0;
|
pIter->iSttBlk = 0;
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0);
|
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
|
||||||
code = tsdbReadSstBlock(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData);
|
code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pIter->iRow = 0;
|
pIter->iRow = 0;
|
||||||
|
@ -454,9 +454,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
||||||
iIter++;
|
iIter++;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
|
||||||
SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
|
SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
|
||||||
if (pSstFile->size > pSstFile->offset) {
|
if (pSttFile->size > pSttFile->offset) {
|
||||||
pCommitter->toLastOnly = 1;
|
pCommitter->toLastOnly = 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -512,34 +512,34 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
SHeadFile fHead = {.commitID = pCommitter->commitID};
|
SHeadFile fHead = {.commitID = pCommitter->commitID};
|
||||||
SDataFile fData = {.commitID = pCommitter->commitID};
|
SDataFile fData = {.commitID = pCommitter->commitID};
|
||||||
SSmaFile fSma = {.commitID = pCommitter->commitID};
|
SSmaFile fSma = {.commitID = pCommitter->commitID};
|
||||||
SSstFile fSst = {.commitID = pCommitter->commitID};
|
SSttFile fStt = {.commitID = pCommitter->commitID};
|
||||||
SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
|
SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
|
||||||
if (pRSet) {
|
if (pRSet) {
|
||||||
ASSERT(pRSet->nSstF <= pCommitter->maxLast);
|
ASSERT(pRSet->nSttF <= pCommitter->maxLast);
|
||||||
fData = *pRSet->pDataF;
|
fData = *pRSet->pDataF;
|
||||||
fSma = *pRSet->pSmaF;
|
fSma = *pRSet->pSmaF;
|
||||||
wSet.diskId = pRSet->diskId;
|
wSet.diskId = pRSet->diskId;
|
||||||
if (pRSet->nSstF < pCommitter->maxLast) {
|
if (pRSet->nSttF < pCommitter->maxLast) {
|
||||||
for (int32_t iSst = 0; iSst < pRSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
|
||||||
wSet.aSstF[iSst] = pRSet->aSstF[iSst];
|
wSet.aSttF[iStt] = pRSet->aSttF[iStt];
|
||||||
}
|
}
|
||||||
wSet.nSstF = pRSet->nSstF + 1;
|
wSet.nSttF = pRSet->nSttF + 1;
|
||||||
} else {
|
} else {
|
||||||
wSet.nSstF = 1;
|
wSet.nSttF = 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SDiskID did = {0};
|
SDiskID did = {0};
|
||||||
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
|
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
|
||||||
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
|
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
|
||||||
wSet.diskId = did;
|
wSet.diskId = did;
|
||||||
wSet.nSstF = 1;
|
wSet.nSttF = 1;
|
||||||
}
|
}
|
||||||
wSet.aSstF[wSet.nSstF - 1] = &fSst;
|
wSet.aSttF[wSet.nSttF - 1] = &fStt;
|
||||||
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
|
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
taosArrayClear(pCommitter->dWriter.aBlockIdx);
|
taosArrayClear(pCommitter->dWriter.aBlockIdx);
|
||||||
taosArrayClear(pCommitter->dWriter.aSstBlk);
|
taosArrayClear(pCommitter->dWriter.aSttBlk);
|
||||||
tMapDataReset(&pCommitter->dWriter.mBlock);
|
tMapDataReset(&pCommitter->dWriter.mBlock);
|
||||||
tBlockDataReset(&pCommitter->dWriter.bData);
|
tBlockDataReset(&pCommitter->dWriter.bData);
|
||||||
tBlockDataReset(&pCommitter->dWriter.bDatal);
|
tBlockDataReset(&pCommitter->dWriter.bDatal);
|
||||||
|
@ -610,7 +610,7 @@ _err:
|
||||||
|
|
||||||
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
|
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSstBlk blockL;
|
SSttBlk blockL;
|
||||||
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
|
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
|
||||||
|
|
||||||
ASSERT(pBlockData->nRow > 0);
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
@ -635,8 +635,8 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
|
||||||
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
|
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// push SSstBlk
|
// push SSttBlk
|
||||||
if (taosArrayPush(pCommitter->dWriter.aSstBlk, &blockL) == NULL) {
|
if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -658,8 +658,8 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
|
||||||
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
|
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// write aSstBlk
|
// write aSttBlk
|
||||||
code = tsdbWriteSstBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSstBlk);
|
code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// update file header
|
// update file header
|
||||||
|
@ -757,7 +757,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
||||||
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||||
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||||
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||||
pCommitter->maxLast = TSDB_DEFAULT_SST_FILE; // TODO: make it as a config
|
pCommitter->maxLast = TSDB_DEFAULT_STT_FILE; // TODO: make it as a config
|
||||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||||
if (pCommitter->aTbDataP == NULL) {
|
if (pCommitter->aTbDataP == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -787,10 +787,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
// merger
|
// merger
|
||||||
for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
|
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) {
|
||||||
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
SDataIter *pIter = &pCommitter->aDataIter[iStt];
|
||||||
pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if (pIter->aSstBlk == NULL) {
|
if (pIter->aSttBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -806,8 +806,8 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCommitter->dWriter.aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if (pCommitter->dWriter.aSstBlk == NULL) {
|
if (pCommitter->dWriter.aSttBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -829,15 +829,15 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
|
||||||
tBlockDataDestroy(&pCommitter->dReader.bData, 1);
|
tBlockDataDestroy(&pCommitter->dReader.bData, 1);
|
||||||
|
|
||||||
// merger
|
// merger
|
||||||
for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
|
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) {
|
||||||
SDataIter *pIter = &pCommitter->aDataIter[iSst];
|
SDataIter *pIter = &pCommitter->aDataIter[iStt];
|
||||||
taosArrayDestroy(pIter->aSstBlk);
|
taosArrayDestroy(pIter->aSttBlk);
|
||||||
tBlockDataDestroy(&pIter->bData, 1);
|
tBlockDataDestroy(&pIter->bData, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// writer
|
// writer
|
||||||
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
|
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
|
||||||
taosArrayDestroy(pCommitter->dWriter.aSstBlk);
|
taosArrayDestroy(pCommitter->dWriter.aSttBlk);
|
||||||
tMapDataClear(&pCommitter->dWriter.mBlock);
|
tMapDataClear(&pCommitter->dWriter.mBlock);
|
||||||
tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
|
tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
|
||||||
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
|
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
|
||||||
|
@ -1052,11 +1052,11 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
||||||
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||||
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||||
} else {
|
} else {
|
||||||
pIter->iSstBlk++;
|
pIter->iSttBlk++;
|
||||||
if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) {
|
if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
|
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||||
|
|
||||||
code = tsdbReadSstBlock(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData);
|
code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
||||||
pIter->iRow = 0;
|
pIter->iRow = 0;
|
||||||
|
|
|
@ -113,7 +113,7 @@ _err:
|
||||||
// taosRemoveFile(fname);
|
// taosRemoveFile(fname);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// // sst
|
// // stt
|
||||||
// if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) {
|
// if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) {
|
||||||
// if (pFrom->pLastF->size > pTo->pLastF->size) {
|
// if (pFrom->pLastF->size > pTo->pLastF->size) {
|
||||||
// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
|
// code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
|
||||||
|
@ -143,7 +143,7 @@ _err:
|
||||||
// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
|
// tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
|
||||||
// taosRemoveFile(fname);
|
// taosRemoveFile(fname);
|
||||||
|
|
||||||
// // sst
|
// // stt
|
||||||
// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
|
// tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
|
||||||
// taosRemoveFile(fname);
|
// taosRemoveFile(fname);
|
||||||
|
|
||||||
|
@ -258,8 +258,8 @@ void tsdbFSDestroy(STsdbFS *pFS) {
|
||||||
taosMemoryFree(pSet->pHeadF);
|
taosMemoryFree(pSet->pHeadF);
|
||||||
taosMemoryFree(pSet->pDataF);
|
taosMemoryFree(pSet->pDataF);
|
||||||
taosMemoryFree(pSet->pSmaF);
|
taosMemoryFree(pSet->pSmaF);
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
taosMemoryFree(pSet->aSstF[iSst]);
|
taosMemoryFree(pSet->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,14 +328,14 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sst ===========
|
// stt ===========
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
|
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||||
if (taosStatFile(fname, &size, NULL)) {
|
if (taosStatFile(fname, &size, NULL)) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (size != LOGIC_TO_FILE_SIZE(pSet->aSstF[iSst]->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
if (size != LOGIC_TO_FILE_SIZE(pSet->aSttF[iStt]->size, TSDB_DEFAULT_PAGE_SIZE)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -519,10 +519,10 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
|
||||||
ASSERT(pSet->pSmaF->nRef == 1);
|
ASSERT(pSet->pSmaF->nRef == 1);
|
||||||
taosMemoryFree(pSet->pSmaF);
|
taosMemoryFree(pSet->pSmaF);
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
ASSERT(pSet->aSstF[iSst]->nRef == 1);
|
ASSERT(pSet->aSttF[iStt]->nRef == 1);
|
||||||
taosMemoryFree(pSet->aSstF[iSst]);
|
taosMemoryFree(pSet->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,14 +579,14 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
}
|
}
|
||||||
*fSet.pSmaF = *pSet->pSmaF;
|
*fSet.pSmaF = *pSet->pSmaF;
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
for (fSet.nSstF = 0; fSet.nSstF < pSet->nSstF; fSet.nSstF++) {
|
for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) {
|
||||||
fSet.aSstF[fSet.nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (fSet.aSstF[fSet.nSstF] == NULL) {
|
if (fSet.aSttF[fSet.nSttF] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
*fSet.aSstF[fSet.nSstF] = *pSet->aSstF[fSet.nSstF];
|
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
||||||
|
@ -639,28 +639,28 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||||
*pDFileSet->pHeadF = *pSet->pHeadF;
|
*pDFileSet->pHeadF = *pSet->pHeadF;
|
||||||
*pDFileSet->pDataF = *pSet->pDataF;
|
*pDFileSet->pDataF = *pSet->pDataF;
|
||||||
*pDFileSet->pSmaF = *pSet->pSmaF;
|
*pDFileSet->pSmaF = *pSet->pSmaF;
|
||||||
// sst
|
// stt
|
||||||
if (pSet->nSstF > pDFileSet->nSstF) {
|
if (pSet->nSttF > pDFileSet->nSttF) {
|
||||||
ASSERT(pSet->nSstF == pDFileSet->nSstF + 1);
|
ASSERT(pSet->nSttF == pDFileSet->nSttF + 1);
|
||||||
|
|
||||||
pDFileSet->aSstF[pDFileSet->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (pDFileSet->aSstF[pDFileSet->nSstF] == NULL) {
|
if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
*pDFileSet->aSstF[pDFileSet->nSstF] = *pSet->aSstF[pSet->nSstF - 1];
|
*pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1];
|
||||||
pDFileSet->nSstF++;
|
pDFileSet->nSttF++;
|
||||||
} else if (pSet->nSstF < pDFileSet->nSstF) {
|
} else if (pSet->nSttF < pDFileSet->nSttF) {
|
||||||
ASSERT(pSet->nSstF == 1);
|
ASSERT(pSet->nSttF == 1);
|
||||||
for (int32_t iSst = 1; iSst < pDFileSet->nSstF; iSst++) {
|
for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) {
|
||||||
taosMemoryFree(pDFileSet->aSstF[iSst]);
|
taosMemoryFree(pDFileSet->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pDFileSet->aSstF[0] = *pSet->aSstF[0];
|
*pDFileSet->aSttF[0] = *pSet->aSttF[0];
|
||||||
pDFileSet->nSstF = 1;
|
pDFileSet->nSttF = 1;
|
||||||
} else {
|
} else {
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
*pDFileSet->aSstF[iSst] = *pSet->aSstF[iSst];
|
*pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -668,8 +668,8 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pSet->nSstF == 1);
|
ASSERT(pSet->nSttF == 1);
|
||||||
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSstF = 1};
|
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1};
|
||||||
|
|
||||||
// head
|
// head
|
||||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||||
|
@ -695,13 +695,13 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
|
||||||
}
|
}
|
||||||
*fSet.pSmaF = *pSet->pSmaF;
|
*fSet.pSmaF = *pSet->pSmaF;
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (fSet.aSstF[0] == NULL) {
|
if (fSet.aSttF[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
*fSet.aSstF[0] = *pSet->aSstF[0];
|
*fSet.aSttF[0] = *pSet->aSttF[0];
|
||||||
|
|
||||||
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
|
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -869,81 +869,81 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||||
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
|
pSetOld->pSmaF->size = pSetNew->pSmaF->size;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
if (sameDisk) {
|
if (sameDisk) {
|
||||||
if (pSetNew->nSstF > pSetOld->nSstF) {
|
if (pSetNew->nSttF > pSetOld->nSttF) {
|
||||||
ASSERT(pSetNew->nSstF = pSetOld->nSstF + 1);
|
ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1);
|
||||||
pSetOld->aSstF[pSetOld->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (pSetOld->aSstF[pSetOld->nSstF] == NULL) {
|
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*pSetOld->aSstF[pSetOld->nSstF] = *pSetNew->aSstF[pSetOld->nSstF];
|
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
|
||||||
pSetOld->aSstF[pSetOld->nSstF]->nRef = 1;
|
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
|
||||||
pSetOld->nSstF++;
|
pSetOld->nSttF++;
|
||||||
} else if (pSetNew->nSstF < pSetOld->nSstF) {
|
} else if (pSetNew->nSttF < pSetOld->nSttF) {
|
||||||
ASSERT(pSetNew->nSstF == 1);
|
ASSERT(pSetNew->nSttF == 1);
|
||||||
for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||||
SSstFile *pSstFile = pSetOld->aSstF[iSst];
|
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||||
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
|
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
taosMemoryFree(pSstFile);
|
taosMemoryFree(pSttFile);
|
||||||
}
|
}
|
||||||
pSetOld->aSstF[iSst] = NULL;
|
pSetOld->aSttF[iStt] = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSetOld->nSstF = 1;
|
pSetOld->nSttF = 1;
|
||||||
pSetOld->aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (pSetOld->aSstF[0] == NULL) {
|
if (pSetOld->aSttF[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*pSetOld->aSstF[0] = *pSetNew->aSstF[0];
|
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
|
||||||
pSetOld->aSstF[0]->nRef = 1;
|
pSetOld->aSttF[0]->nRef = 1;
|
||||||
} else {
|
} else {
|
||||||
for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||||
if (pSetOld->aSstF[iSst]->commitID != pSetNew->aSstF[iSst]->commitID) {
|
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
|
||||||
SSstFile *pSstFile = pSetOld->aSstF[iSst];
|
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||||
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
|
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
taosMemoryFree(pSstFile);
|
taosMemoryFree(pSttFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (pSetOld->aSstF[iSst] == NULL) {
|
if (pSetOld->aSttF[iStt] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
|
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||||
pSetOld->aSstF[iSst]->nRef = 1;
|
pSetOld->aSttF[iStt]->nRef = 1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pSetOld->aSstF[iSst]->size == pSetOld->aSstF[iSst]->size);
|
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
||||||
ASSERT(pSetOld->aSstF[iSst]->offset == pSetOld->aSstF[iSst]->offset);
|
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pSetOld->nSstF == pSetNew->nSstF);
|
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
|
||||||
for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||||
SSstFile *pSstFile = pSetOld->aSstF[iSst];
|
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||||
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
|
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
taosMemoryFree(pSstFile);
|
taosMemoryFree(pSttFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (pSetOld->aSstF[iSst] == NULL) {
|
if (pSetOld->aSttF[iStt] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
|
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||||
pSetOld->aSstF[iSst]->nRef = 1;
|
pSetOld->aSttF[iStt]->nRef = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -977,12 +977,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||||
taosMemoryFree(pSetOld->pSmaF);
|
taosMemoryFree(pSetOld->pSmaF);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int8_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
|
for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||||
nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iSst]->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iSst], fname);
|
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
taosMemoryFree(pSetOld->aSstF[iSst]);
|
taosMemoryFree(pSetOld->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -990,7 +990,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
_add_new:
|
_add_new:
|
||||||
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSstF = 1};
|
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};
|
||||||
|
|
||||||
// head
|
// head
|
||||||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||||
|
@ -1019,15 +1019,15 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
|
||||||
*fSet.pSmaF = *pSetNew->pSmaF;
|
*fSet.pSmaF = *pSetNew->pSmaF;
|
||||||
fSet.pSmaF->nRef = 1;
|
fSet.pSmaF->nRef = 1;
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
ASSERT(pSetNew->nSstF == 1);
|
ASSERT(pSetNew->nSttF == 1);
|
||||||
fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
|
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||||
if (fSet.aSstF[0] == NULL) {
|
if (fSet.aSttF[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
*fSet.aSstF[0] = *pSetNew->aSstF[0];
|
*fSet.aSttF[0] = *pSetNew->aSttF[0];
|
||||||
fSet.aSstF[0]->nRef = 1;
|
fSet.aSttF[0]->nRef = 1;
|
||||||
|
|
||||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1075,8 +1075,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
|
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
|
||||||
ASSERT(nRef > 0);
|
ASSERT(nRef > 0);
|
||||||
|
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
nRef = atomic_fetch_add_32(&pSet->aSstF[iSst]->nRef, 1);
|
nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||||
ASSERT(nRef > 0);
|
ASSERT(nRef > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1134,14 +1134,14 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
|
||||||
taosMemoryFree(pSet->pSmaF);
|
taosMemoryFree(pSet->pSmaF);
|
||||||
}
|
}
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
nRef = atomic_sub_fetch_32(&pSet->aSstF[iSst]->nRef, 1);
|
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||||
ASSERT(nRef >= 0);
|
ASSERT(nRef >= 0);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
|
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||||
taosRemoveFile(fname);
|
taosRemoveFile(fname);
|
||||||
taosMemoryFree(pSet->aSstF[iSst]);
|
taosMemoryFree(pSet->aSttF[iStt]);
|
||||||
/* code */
|
/* code */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,22 +53,22 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile) {
|
int32_t tPutSttFile(uint8_t *p, SSttFile *pSttFile) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
|
||||||
n += tPutI64v(p ? p + n : p, pSstFile->commitID);
|
n += tPutI64v(p ? p + n : p, pSttFile->commitID);
|
||||||
n += tPutI64v(p ? p + n : p, pSstFile->size);
|
n += tPutI64v(p ? p + n : p, pSttFile->size);
|
||||||
n += tPutI64v(p ? p + n : p, pSstFile->offset);
|
n += tPutI64v(p ? p + n : p, pSttFile->offset);
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tGetSstFile(uint8_t *p, SSstFile *pSstFile) {
|
static int32_t tGetSttFile(uint8_t *p, SSttFile *pSttFile) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
|
||||||
n += tGetI64v(p + n, &pSstFile->commitID);
|
n += tGetI64v(p + n, &pSttFile->commitID);
|
||||||
n += tGetI64v(p + n, &pSstFile->size);
|
n += tGetI64v(p + n, &pSttFile->size);
|
||||||
n += tGetI64v(p + n, &pSstFile->offset);
|
n += tGetI64v(p + n, &pSttFile->offset);
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -102,9 +102,9 @@ void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF,
|
||||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data");
|
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data");
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]) {
|
void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
|
||||||
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSstF->commitID, ".sst");
|
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSttF->commitID, ".stt");
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
|
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
|
||||||
|
@ -194,10 +194,10 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
|
||||||
n += tPutDataFile(p ? p + n : p, pSet->pDataF);
|
n += tPutDataFile(p ? p + n : p, pSet->pDataF);
|
||||||
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
|
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
n += tPutU8(p ? p + n : p, pSet->nSstF);
|
n += tPutU8(p ? p + n : p, pSet->nSttF);
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
n += tPutSstFile(p ? p + n : p, pSet->aSstF[iSst]);
|
n += tPutSttFile(p ? p + n : p, pSet->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
|
@ -234,15 +234,15 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
|
||||||
pSet->pSmaF->nRef = 1;
|
pSet->pSmaF->nRef = 1;
|
||||||
n += tGetSmaFile(p + n, pSet->pSmaF);
|
n += tGetSmaFile(p + n, pSet->pSmaF);
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
n += tGetU8(p + n, &pSet->nSstF);
|
n += tGetU8(p + n, &pSet->nSttF);
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
pSet->aSstF[iSst] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile));
|
pSet->aSttF[iStt] = (SSttFile *)taosMemoryCalloc(1, sizeof(SSttFile));
|
||||||
if (pSet->aSstF[iSst] == NULL) {
|
if (pSet->aSttF[iStt] == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pSet->aSstF[iSst]->nRef = 1;
|
pSet->aSttF[iStt]->nRef = 1;
|
||||||
n += tGetSstFile(p + n, pSet->aSstF[iSst]);
|
n += tGetSttFile(p + n, pSet->aSttF[iStt]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
// SLDataIter =================================================
|
// SLDataIter =================================================
|
||||||
struct SLDataIter {
|
struct SLDataIter {
|
||||||
SRBTreeNode node;
|
SRBTreeNode node;
|
||||||
SSstBlk *pSstBlk;
|
SSttBlk *pSttBlk;
|
||||||
SDataFReader *pReader;
|
SDataFReader *pReader;
|
||||||
int32_t iSst;
|
int32_t iStt;
|
||||||
int8_t backward;
|
int8_t backward;
|
||||||
SArray *aSstBlk;
|
SArray *aSttBlk;
|
||||||
int32_t iSstBlk;
|
int32_t iSttBlk;
|
||||||
SBlockData bData[2];
|
SBlockData bData[2];
|
||||||
int32_t loadIndex;
|
int32_t loadIndex;
|
||||||
int32_t iRow;
|
int32_t iRow;
|
||||||
|
@ -40,7 +40,7 @@ static SBlockData *getNextBlock(SLDataIter *pIter) {
|
||||||
return getCurrentBlock(pIter);
|
return getCurrentBlock(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pRange) {
|
STimeWindow *pTimeWindow, SVersionRange *pRange) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
|
@ -53,10 +53,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
(*pIter)->timeWindow = *pTimeWindow;
|
(*pIter)->timeWindow = *pTimeWindow;
|
||||||
(*pIter)->verRange = *pRange;
|
(*pIter)->verRange = *pRange;
|
||||||
(*pIter)->pReader = pReader;
|
(*pIter)->pReader = pReader;
|
||||||
(*pIter)->iSst = iSst;
|
(*pIter)->iStt = iStt;
|
||||||
(*pIter)->backward = backward;
|
(*pIter)->backward = backward;
|
||||||
(*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if ((*pIter)->aSstBlk == NULL) {
|
if ((*pIter)->aSttBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -71,18 +71,18 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk);
|
code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = taosArrayGetSize((*pIter)->aSstBlk);
|
size_t size = taosArrayGetSize((*pIter)->aSttBlk);
|
||||||
|
|
||||||
// find the start block
|
// find the start block
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
if (!backward) { // asc
|
if (!backward) { // asc
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
|
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
||||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||||
index = i;
|
index = i;
|
||||||
break;
|
break;
|
||||||
|
@ -90,7 +90,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
}
|
}
|
||||||
} else { // desc
|
} else { // desc
|
||||||
for (int32_t i = size - 1; i >= 0; --i) {
|
for (int32_t i = size - 1; i >= 0; --i) {
|
||||||
SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
|
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
||||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||||
index = i;
|
index = i;
|
||||||
break;
|
break;
|
||||||
|
@ -98,9 +98,9 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pIter)->iSstBlk = index;
|
(*pIter)->iSttBlk = index;
|
||||||
if (index != -1) {
|
if (index != -1) {
|
||||||
(*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk);
|
(*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk);
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -110,18 +110,18 @@ _exit:
|
||||||
void tLDataIterClose(SLDataIter *pIter) {
|
void tLDataIterClose(SLDataIter *pIter) {
|
||||||
tBlockDataDestroy(&pIter->bData[0], 1);
|
tBlockDataDestroy(&pIter->bData[0], 1);
|
||||||
tBlockDataDestroy(&pIter->bData[1], 1);
|
tBlockDataDestroy(&pIter->bData[1], 1);
|
||||||
taosArrayDestroy(pIter->aSstBlk);
|
taosArrayDestroy(pIter->aSttBlk);
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tLDataIterNextBlock(SLDataIter *pIter) {
|
void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
pIter->iSstBlk += step;
|
pIter->iSttBlk += step;
|
||||||
|
|
||||||
int32_t index = -1;
|
int32_t index = -1;
|
||||||
size_t size = taosArrayGetSize(pIter->aSstBlk);
|
size_t size = taosArrayGetSize(pIter->aSttBlk);
|
||||||
for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) {
|
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
|
||||||
SSstBlk *p = taosArrayGet(pIter->aSstBlk, i);
|
SSttBlk *p = taosArrayGet(pIter->aSttBlk, i);
|
||||||
if ((!pIter->backward) && p->minUid > pIter->uid) {
|
if ((!pIter->backward) && p->minUid > pIter->uid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -137,9 +137,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
pIter->pSstBlk = NULL;
|
pIter->pSttBlk = NULL;
|
||||||
} else {
|
} else {
|
||||||
pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
|
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,16 +212,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
|
|
||||||
// no qualified last file block in current file, no need to fetch row
|
// no qualified last file block in current file, no need to fetch row
|
||||||
if (pIter->pSstBlk == NULL) {
|
if (pIter->pSttBlk == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t iBlockL = pIter->iSstBlk;
|
int32_t iBlockL = pIter->iSttBlk;
|
||||||
SBlockData *pBlockData = getCurrentBlock(pIter);
|
SBlockData *pBlockData = getCurrentBlock(pIter);
|
||||||
|
|
||||||
if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
|
if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet
|
||||||
pBlockData = getNextBlock(pIter);
|
pBlockData = getNextBlock(pIter);
|
||||||
code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -236,16 +236,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
|
||||||
|
|
||||||
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
||||||
tLDataIterNextBlock(pIter);
|
tLDataIterNextBlock(pIter);
|
||||||
if (pIter->pSstBlk == NULL) { // no more data
|
if (pIter->pSttBlk == NULL) { // no more data
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iBlockL != pIter->iSstBlk) {
|
if (iBlockL != pIter->iSttBlk) {
|
||||||
pBlockData = getNextBlock(pIter);
|
pBlockData = getNextBlock(pIter);
|
||||||
code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
|
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,7 @@ _exit:
|
||||||
terrno = code;
|
terrno = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL);
|
return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
|
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
|
||||||
|
@ -302,8 +302,8 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0};
|
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
|
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
|
|
@ -621,7 +621,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSstF;
|
pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF;
|
||||||
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
|
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
@ -1163,7 +1163,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
||||||
bool overlapWithlastBlock = false;
|
bool overlapWithlastBlock = false;
|
||||||
#if 0
|
#if 0
|
||||||
if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
|
if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
|
||||||
SSstBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
|
SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
|
||||||
overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
|
overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -2204,7 +2204,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSstF > 0) {
|
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
|
||||||
code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
|
code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(pIndexList);
|
taosArrayDestroy(pIndexList);
|
||||||
|
|
|
@ -226,13 +226,13 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
||||||
.pHeadF = &pWriter->fHead,
|
.pHeadF = &pWriter->fHead,
|
||||||
.pDataF = &pWriter->fData,
|
.pDataF = &pWriter->fData,
|
||||||
.pSmaF = &pWriter->fSma,
|
.pSmaF = &pWriter->fSma,
|
||||||
.nSstF = pSet->nSstF};
|
.nSttF = pSet->nSttF};
|
||||||
pWriter->fHead = *pSet->pHeadF;
|
pWriter->fHead = *pSet->pHeadF;
|
||||||
pWriter->fData = *pSet->pDataF;
|
pWriter->fData = *pSet->pDataF;
|
||||||
pWriter->fSma = *pSet->pSmaF;
|
pWriter->fSma = *pSet->pSmaF;
|
||||||
for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst];
|
pWriter->wSet.aSttF[iStt] = &pWriter->fStt[iStt];
|
||||||
pWriter->fSst[iSst] = *pSet->aSstF[iSst];
|
pWriter->fStt[iStt] = *pSet->aSttF[iStt];
|
||||||
}
|
}
|
||||||
|
|
||||||
// head
|
// head
|
||||||
|
@ -276,15 +276,15 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
|
||||||
pWriter->fSma.size += TSDB_FHDR_SIZE;
|
pWriter->fSma.size += TSDB_FHDR_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
|
ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0);
|
||||||
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
|
||||||
tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
|
tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname);
|
||||||
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD);
|
code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
|
code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;
|
pWriter->fStt[pWriter->wSet.nSttF - 1].size += TSDB_FHDR_SIZE;
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
return code;
|
||||||
|
@ -312,14 +312,14 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
|
||||||
code = tsdbFsyncFile((*ppWriter)->pSmaFD);
|
code = tsdbFsyncFile((*ppWriter)->pSmaFD);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbFsyncFile((*ppWriter)->pSstFD);
|
code = tsdbFsyncFile((*ppWriter)->pSttFD);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCloseFile(&(*ppWriter)->pHeadFD);
|
tsdbCloseFile(&(*ppWriter)->pHeadFD);
|
||||||
tsdbCloseFile(&(*ppWriter)->pDataFD);
|
tsdbCloseFile(&(*ppWriter)->pDataFD);
|
||||||
tsdbCloseFile(&(*ppWriter)->pSmaFD);
|
tsdbCloseFile(&(*ppWriter)->pSmaFD);
|
||||||
tsdbCloseFile(&(*ppWriter)->pSstFD);
|
tsdbCloseFile(&(*ppWriter)->pSttFD);
|
||||||
|
|
||||||
for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
|
for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) {
|
||||||
tFree((*ppWriter)->aBuf[iBuf]);
|
tFree((*ppWriter)->aBuf[iBuf]);
|
||||||
|
@ -357,10 +357,10 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
|
||||||
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
|
code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// sst ==============
|
// stt ==============
|
||||||
memset(hdr, 0, TSDB_FHDR_SIZE);
|
memset(hdr, 0, TSDB_FHDR_SIZE);
|
||||||
tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
|
tPutSttFile(hdr, &pWriter->fStt[pWriter->wSet.nSttF - 1]);
|
||||||
code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE);
|
code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -454,22 +454,22 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
|
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
|
SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1];
|
||||||
int64_t size;
|
int64_t size;
|
||||||
int64_t n;
|
int64_t n;
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (taosArrayGetSize(aSstBlk) == 0) {
|
if (taosArrayGetSize(aSttBlk) == 0) {
|
||||||
pSstFile->offset = pSstFile->size;
|
pSttFile->offset = pSttFile->size;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
// size
|
// size
|
||||||
size = 0;
|
size = 0;
|
||||||
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
|
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) {
|
||||||
size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
|
size += tPutSttBlk(NULL, taosArrayGet(aSttBlk, iBlockL));
|
||||||
}
|
}
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
|
@ -478,21 +478,21 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
|
||||||
|
|
||||||
// encode
|
// encode
|
||||||
n = 0;
|
n = 0;
|
||||||
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
|
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) {
|
||||||
n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
|
n += tPutSttBlk(pWriter->aBuf[0] + n, taosArrayGet(aSttBlk, iBlockL));
|
||||||
}
|
}
|
||||||
|
|
||||||
// write
|
// write
|
||||||
code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size);
|
code = tsdbWriteFile(pWriter->pSttFD, pSttFile->size, pWriter->aBuf[0], size);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// update
|
// update
|
||||||
pSstFile->offset = pSstFile->size;
|
pSttFile->offset = pSttFile->size;
|
||||||
pSstFile->size += size;
|
pSttFile->size += size;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
|
tsdbTrace("vgId:%d tsdb write stt block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
|
||||||
pSstFile->offset, size);
|
pSttFile->offset, size);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -546,7 +546,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
ASSERT(pBlockData->nRow > 0);
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
if (toLast) {
|
if (toLast) {
|
||||||
pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size;
|
pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
|
||||||
} else {
|
} else {
|
||||||
pBlkInfo->offset = pWriter->fData.size;
|
pBlkInfo->offset = pWriter->fData.size;
|
||||||
}
|
}
|
||||||
|
@ -558,7 +558,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// write =================
|
// write =================
|
||||||
STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD;
|
STsdbFD *pFD = toLast ? pWriter->pSttFD : pWriter->pDataFD;
|
||||||
|
|
||||||
pBlkInfo->szKey = aBufN[3] + aBufN[2];
|
pBlkInfo->szKey = aBufN[3] + aBufN[2];
|
||||||
pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
|
pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
|
||||||
|
@ -585,7 +585,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
|
|
||||||
// update info
|
// update info
|
||||||
if (toLast) {
|
if (toLast) {
|
||||||
pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock;
|
pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
|
||||||
} else {
|
} else {
|
||||||
pWriter->fData.size += pBlkInfo->szBlock;
|
pWriter->fData.size += pBlkInfo->szBlock;
|
||||||
}
|
}
|
||||||
|
@ -664,9 +664,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
||||||
taosCloseFile(&pOutFD);
|
taosCloseFile(&pOutFD);
|
||||||
taosCloseFile(&PInFD);
|
taosCloseFile(&PInFD);
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom);
|
tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[0], fNameFrom);
|
||||||
tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo);
|
tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[0], fNameTo);
|
||||||
|
|
||||||
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||||
if (pOutFD == NULL) {
|
if (pOutFD == NULL) {
|
||||||
|
@ -680,7 +680,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size);
|
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSttF[0]->size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -750,10 +750,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
||||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
|
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
|
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||||
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
|
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||||
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSstFD[iSst]);
|
code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,10 +779,10 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
|
||||||
// sma
|
// sma
|
||||||
tsdbCloseFile(&(*ppReader)->pSmaFD);
|
tsdbCloseFile(&(*ppReader)->pSmaFD);
|
||||||
|
|
||||||
// sst
|
// stt
|
||||||
for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
|
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) {
|
||||||
if ((*ppReader)->aSstFD[iSst]) {
|
if ((*ppReader)->aSttFD[iStt]) {
|
||||||
tsdbCloseFile(&(*ppReader)->aSstFD[iSst]);
|
tsdbCloseFile(&(*ppReader)->aSttFD[iStt]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -835,13 +835,13 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
|
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
|
SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
|
||||||
int64_t offset = pSstFile->offset;
|
int64_t offset = pSttFile->offset;
|
||||||
int64_t size = pSstFile->size - offset;
|
int64_t size = pSttFile->size - offset;
|
||||||
|
|
||||||
taosArrayClear(aSstBlk);
|
taosArrayClear(aSttBlk);
|
||||||
if (size == 0) return code;
|
if (size == 0) return code;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
|
@ -849,16 +849,16 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// read
|
// read
|
||||||
code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size);
|
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
while (n < size) {
|
while (n < size) {
|
||||||
SSstBlk sstBlk;
|
SSttBlk sttBlk;
|
||||||
n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk);
|
n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
|
||||||
|
|
||||||
if (taosArrayPush(aSstBlk, &sstBlk) == NULL) {
|
if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -868,7 +868,7 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1107,25 +1107,25 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
|
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], pSstBlk->bInfo.szBlock);
|
code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// read
|
// read
|
||||||
code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock);
|
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// decmpr
|
// decmpr
|
||||||
code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
|
code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb read sst block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb read stt block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
||||||
if (expLevel < 0) {
|
if (expLevel < 0) {
|
||||||
taosMemoryFree(pSet->pHeadF);
|
taosMemoryFree(pSet->pHeadF);
|
||||||
taosMemoryFree(pSet->pDataF);
|
taosMemoryFree(pSet->pDataF);
|
||||||
taosMemoryFree(pSet->aSstF[0]);
|
taosMemoryFree(pSet->aSttF[0]);
|
||||||
taosMemoryFree(pSet->pSmaF);
|
taosMemoryFree(pSet->pSmaF);
|
||||||
taosArrayRemove(fs.aDFileSet, iSet);
|
taosArrayRemove(fs.aDFileSet, iSet);
|
||||||
iSet--;
|
iSet--;
|
||||||
|
|
|
@ -27,9 +27,9 @@ struct STsdbSnapReader {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
SDataFReader* pDataFReader;
|
SDataFReader* pDataFReader;
|
||||||
SArray* aBlockIdx; // SArray<SBlockIdx>
|
SArray* aBlockIdx; // SArray<SBlockIdx>
|
||||||
SArray* aSstBlk; // SArray<SSstBlk>
|
SArray* aSstBlk; // SArray<SSttBlk>
|
||||||
SBlockIdx* pBlockIdx;
|
SBlockIdx* pBlockIdx;
|
||||||
SSstBlk* pSstBlk;
|
SSttBlk* pSstBlk;
|
||||||
|
|
||||||
int32_t iBlockIdx;
|
int32_t iBlockIdx;
|
||||||
int32_t iBlockL;
|
int32_t iBlockL;
|
||||||
|
@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
|
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbReadSstBlk(pReader->pDataFReader, 0, pReader->aSstBlk);
|
code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// init
|
// init
|
||||||
|
@ -87,7 +87,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->pSstBlk = (SSstBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL);
|
pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL);
|
||||||
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
|
if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
|
||||||
// TODO
|
// TODO
|
||||||
break;
|
break;
|
||||||
|
@ -151,7 +151,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||||
// next
|
// next
|
||||||
pReader->iBlockL++;
|
pReader->iBlockL++;
|
||||||
if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) {
|
if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) {
|
||||||
pReader->pSstBlk = (SSstBlk*)taosArrayGetSize(pReader->aSstBlk);
|
pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk);
|
||||||
} else {
|
} else {
|
||||||
pReader->pSstBlk = NULL;
|
pReader->pSstBlk = NULL;
|
||||||
}
|
}
|
||||||
|
@ -298,7 +298,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pReader->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if (pReader->aSstBlk == NULL) {
|
if (pReader->aSstBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -431,7 +431,7 @@ struct STsdbSnapWriter {
|
||||||
SBlockData* pBlockData;
|
SBlockData* pBlockData;
|
||||||
int32_t iRow;
|
int32_t iRow;
|
||||||
SBlockData bDataR;
|
SBlockData bDataR;
|
||||||
SArray* aSstBlk; // SArray<SSstBlk>
|
SArray* aSstBlk; // SArray<SSttBlk>
|
||||||
int32_t iBlockL;
|
int32_t iBlockL;
|
||||||
SBlockData lDataR;
|
SBlockData lDataR;
|
||||||
|
|
||||||
|
@ -443,7 +443,7 @@ struct STsdbSnapWriter {
|
||||||
|
|
||||||
SMapData mBlockW; // SMapData<SDataBlk>
|
SMapData mBlockW; // SMapData<SDataBlk>
|
||||||
SArray* aBlockIdxW; // SArray<SBlockIdx>
|
SArray* aBlockIdxW; // SArray<SBlockIdx>
|
||||||
SArray* aBlockLW; // SArray<SSstBlk>
|
SArray* aBlockLW; // SArray<SSttBlk>
|
||||||
|
|
||||||
// for del file
|
// for del file
|
||||||
SDelFReader* pDelFReader;
|
SDelFReader* pDelFReader;
|
||||||
|
@ -845,7 +845,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
|
||||||
|
|
||||||
// write remain stuff
|
// write remain stuff
|
||||||
if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
|
if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
|
||||||
code = tsdbWriteSstBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -911,7 +911,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
|
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
code = tsdbReadSstBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk);
|
code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pWriter->pDataFReader == NULL);
|
ASSERT(pWriter->pDataFReader == NULL);
|
||||||
|
@ -931,25 +931,25 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
|
||||||
// write
|
// write
|
||||||
SHeadFile fHead;
|
SHeadFile fHead;
|
||||||
SDataFile fData;
|
SDataFile fData;
|
||||||
SSstFile fLast;
|
SSttFile fLast;
|
||||||
SSmaFile fSma;
|
SSmaFile fSma;
|
||||||
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSstF[0] = &fLast, .pSmaF = &fSma};
|
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma};
|
||||||
|
|
||||||
if (pSet) {
|
if (pSet) {
|
||||||
wSet.diskId = pSet->diskId;
|
wSet.diskId = pSet->diskId;
|
||||||
wSet.fid = fid;
|
wSet.fid = fid;
|
||||||
wSet.nSstF = 1;
|
wSet.nSttF = 1;
|
||||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||||
fData = *pSet->pDataF;
|
fData = *pSet->pDataF;
|
||||||
fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0};
|
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
fSma = *pSet->pSmaF;
|
fSma = *pSet->pSmaF;
|
||||||
} else {
|
} else {
|
||||||
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
wSet.diskId = (SDiskID){.level = 0, .id = 0};
|
||||||
wSet.fid = fid;
|
wSet.fid = fid;
|
||||||
wSet.nSstF = 1;
|
wSet.nSttF = 1;
|
||||||
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||||
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
|
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
|
fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
|
||||||
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
|
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1147,7 +1147,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
code = tBlockDataCreate(&pWriter->bDataR);
|
code = tBlockDataCreate(&pWriter->bDataR);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pWriter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
|
pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if (pWriter->aSstBlk == NULL) {
|
if (pWriter->aSstBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1161,7 +1161,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
|
||||||
code = tBlockDataCreate(&pWriter->bDataW);
|
code = tBlockDataCreate(&pWriter->bDataW);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
pWriter->aBlockLW = taosArrayInit(0, sizeof(SSstBlk));
|
pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if (pWriter->aBlockLW == NULL) {
|
if (pWriter->aBlockLW == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -214,7 +214,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
|
||||||
|
|
||||||
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
|
int32_t tCmprBlockL(void const *lhs, void const *rhs) {
|
||||||
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
|
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
|
||||||
SSstBlk *rBlockL = (SSstBlk *)rhs;
|
SSttBlk *rBlockL = (SSttBlk *)rhs;
|
||||||
|
|
||||||
if (lBlockIdx->suid < rBlockL->suid) {
|
if (lBlockIdx->suid < rBlockL->suid) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -311,41 +311,41 @@ bool tDataBlkHasSma(SDataBlk *pDataBlk) {
|
||||||
return pDataBlk->smaInfo.size > 0;
|
return pDataBlk->smaInfo.size > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SSstBlk ======================================================
|
// SSttBlk ======================================================
|
||||||
int32_t tPutSstBlk(uint8_t *p, void *ph) {
|
int32_t tPutSttBlk(uint8_t *p, void *ph) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)ph;
|
SSttBlk *pSttBlk = (SSttBlk *)ph;
|
||||||
|
|
||||||
n += tPutI64(p ? p + n : p, pSstBlk->suid);
|
n += tPutI64(p ? p + n : p, pSttBlk->suid);
|
||||||
n += tPutI64(p ? p + n : p, pSstBlk->minUid);
|
n += tPutI64(p ? p + n : p, pSttBlk->minUid);
|
||||||
n += tPutI64(p ? p + n : p, pSstBlk->maxUid);
|
n += tPutI64(p ? p + n : p, pSttBlk->maxUid);
|
||||||
n += tPutI64v(p ? p + n : p, pSstBlk->minKey);
|
n += tPutI64v(p ? p + n : p, pSttBlk->minKey);
|
||||||
n += tPutI64v(p ? p + n : p, pSstBlk->maxKey);
|
n += tPutI64v(p ? p + n : p, pSttBlk->maxKey);
|
||||||
n += tPutI64v(p ? p + n : p, pSstBlk->minVer);
|
n += tPutI64v(p ? p + n : p, pSttBlk->minVer);
|
||||||
n += tPutI64v(p ? p + n : p, pSstBlk->maxVer);
|
n += tPutI64v(p ? p + n : p, pSttBlk->maxVer);
|
||||||
n += tPutI32v(p ? p + n : p, pSstBlk->nRow);
|
n += tPutI32v(p ? p + n : p, pSttBlk->nRow);
|
||||||
n += tPutI64v(p ? p + n : p, pSstBlk->bInfo.offset);
|
n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset);
|
||||||
n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szBlock);
|
n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock);
|
||||||
n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szKey);
|
n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey);
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tGetSstBlk(uint8_t *p, void *ph) {
|
int32_t tGetSttBlk(uint8_t *p, void *ph) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
SSstBlk *pSstBlk = (SSstBlk *)ph;
|
SSttBlk *pSttBlk = (SSttBlk *)ph;
|
||||||
|
|
||||||
n += tGetI64(p + n, &pSstBlk->suid);
|
n += tGetI64(p + n, &pSttBlk->suid);
|
||||||
n += tGetI64(p + n, &pSstBlk->minUid);
|
n += tGetI64(p + n, &pSttBlk->minUid);
|
||||||
n += tGetI64(p + n, &pSstBlk->maxUid);
|
n += tGetI64(p + n, &pSttBlk->maxUid);
|
||||||
n += tGetI64v(p + n, &pSstBlk->minKey);
|
n += tGetI64v(p + n, &pSttBlk->minKey);
|
||||||
n += tGetI64v(p + n, &pSstBlk->maxKey);
|
n += tGetI64v(p + n, &pSttBlk->maxKey);
|
||||||
n += tGetI64v(p + n, &pSstBlk->minVer);
|
n += tGetI64v(p + n, &pSttBlk->minVer);
|
||||||
n += tGetI64v(p + n, &pSstBlk->maxVer);
|
n += tGetI64v(p + n, &pSttBlk->maxVer);
|
||||||
n += tGetI32v(p + n, &pSstBlk->nRow);
|
n += tGetI32v(p + n, &pSttBlk->nRow);
|
||||||
n += tGetI64v(p + n, &pSstBlk->bInfo.offset);
|
n += tGetI64v(p + n, &pSttBlk->bInfo.offset);
|
||||||
n += tGetI32v(p + n, &pSstBlk->bInfo.szBlock);
|
n += tGetI32v(p + n, &pSttBlk->bInfo.szBlock);
|
||||||
n += tGetI32v(p + n, &pSstBlk->bInfo.szKey);
|
n += tGetI32v(p + n, &pSttBlk->bInfo.szKey);
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue