more code
This commit is contained in:
parent
a53575a152
commit
8bd8ff1523
|
@ -74,6 +74,7 @@ typedef struct SLDataIter SLDataIter;
|
||||||
typedef struct SDiskCol SDiskCol;
|
typedef struct SDiskCol SDiskCol;
|
||||||
typedef struct SDiskData SDiskData;
|
typedef struct SDiskData SDiskData;
|
||||||
typedef struct SDiskDataBuilder SDiskDataBuilder;
|
typedef struct SDiskDataBuilder SDiskDataBuilder;
|
||||||
|
typedef struct SBlkInfo SBlkInfo;
|
||||||
|
|
||||||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
|
@ -173,7 +174,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut
|
||||||
int32_t aBufN[]);
|
int32_t aBufN[]);
|
||||||
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
|
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
|
||||||
// SDiskDataHdr
|
// SDiskDataHdr
|
||||||
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
|
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr);
|
||||||
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
|
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
|
||||||
// SDelIdx
|
// SDelIdx
|
||||||
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
||||||
|
@ -270,6 +271,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p
|
||||||
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
|
||||||
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
|
||||||
int8_t cmprAlg, int8_t toLast);
|
int8_t cmprAlg, int8_t toLast);
|
||||||
|
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo);
|
||||||
|
|
||||||
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
|
||||||
// SDataFReader
|
// SDataFReader
|
||||||
|
@ -328,7 +330,6 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
|
||||||
uint8_t calcSma);
|
uint8_t calcSma);
|
||||||
int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
|
int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
|
||||||
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
|
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
|
||||||
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo *pBlkInfo);
|
|
||||||
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo);
|
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo);
|
||||||
|
|
||||||
// structs =======================
|
// structs =======================
|
||||||
|
@ -451,14 +452,16 @@ struct SSmaInfo {
|
||||||
int32_t size;
|
int32_t size;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
struct SBlkInfo {
|
||||||
int64_t minUid;
|
int64_t minUid;
|
||||||
int64_t maxUid;
|
int64_t maxUid;
|
||||||
TSDBKEY minKey;
|
TSKEY minKey;
|
||||||
TSDBKEY maxKey;
|
TSKEY maxKey;
|
||||||
int64_t minVer;
|
int64_t minVer;
|
||||||
int64_t maxVer;
|
int64_t maxVer;
|
||||||
} SBlkInfo;
|
TSDBKEY minTKey;
|
||||||
|
TSDBKEY maxTKey;
|
||||||
|
};
|
||||||
|
|
||||||
struct SDataBlk {
|
struct SDataBlk {
|
||||||
TSDBKEY minKey;
|
TSDBKEY minKey;
|
||||||
|
|
|
@ -542,6 +542,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
taosArrayClear(pCommitter->dWriter.aSttBlk);
|
taosArrayClear(pCommitter->dWriter.aSttBlk);
|
||||||
tMapDataReset(&pCommitter->dWriter.mBlock);
|
tMapDataReset(&pCommitter->dWriter.mBlock);
|
||||||
tBlockDataReset(&pCommitter->dWriter.bData);
|
tBlockDataReset(&pCommitter->dWriter.bData);
|
||||||
|
tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
|
||||||
|
|
||||||
// open iter
|
// open iter
|
||||||
code = tsdbOpenCommitIter(pCommitter);
|
code = tsdbOpenCommitIter(pCommitter);
|
||||||
|
@ -655,21 +656,21 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
|
||||||
|
|
||||||
if (pBuilder->nRow == 0) return code;
|
if (pBuilder->nRow == 0) return code;
|
||||||
|
|
||||||
SSttBlk sttBlk = {.suid = pBuilder->suid,
|
|
||||||
.minUid = 0, // todo
|
|
||||||
.maxUid = 0, // todo
|
|
||||||
.minKey = 0, // todo
|
|
||||||
.maxKey = 0, // todo
|
|
||||||
.minVer = 0, // todo
|
|
||||||
.maxVer = 0, // todo
|
|
||||||
.nRow = pBuilder->nRow};
|
|
||||||
|
|
||||||
// gnrt
|
// gnrt
|
||||||
// code = tGnrtDiskData(pBuilder, &pBuilder->dd);
|
const SDiskData *pDiskData;
|
||||||
|
const SBlkInfo *pBlkInfo;
|
||||||
|
code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
SSttBlk sttBlk = {.suid = pBuilder->suid,
|
||||||
|
.minUid = pBlkInfo->minUid,
|
||||||
|
.maxUid = pBlkInfo->maxUid,
|
||||||
|
.minKey = pBlkInfo->minKey,
|
||||||
|
.maxKey = pBlkInfo->maxKey,
|
||||||
|
.minVer = pBlkInfo->minVer,
|
||||||
|
.maxVer = pBlkInfo->maxVer};
|
||||||
// write
|
// write
|
||||||
// code = tsdbWriteDiskData(pWriter, &pBuilder->dd);
|
code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// push
|
// push
|
||||||
|
@ -678,8 +679,8 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear (todo)
|
// clear
|
||||||
// tDiskDataBuilderClear(pBuilder);
|
tDiskDataBuilderClear(pBuilder);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -498,10 +498,12 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
|
||||||
pBuilder->calcSma = calcSma;
|
pBuilder->calcSma = calcSma;
|
||||||
pBuilder->bi = (SBlkInfo){.minUid = INT64_MAX,
|
pBuilder->bi = (SBlkInfo){.minUid = INT64_MAX,
|
||||||
.maxUid = INT64_MIN,
|
.maxUid = INT64_MIN,
|
||||||
.minKey = TSDBKEY_MAX,
|
.minKey = TSKEY_MAX,
|
||||||
.maxKey = TSDBKEY_MIN,
|
.maxKey = TSKEY_MIN,
|
||||||
.minVer = VERSION_MAX,
|
.minVer = VERSION_MAX,
|
||||||
.maxVer = VERSION_MIN};
|
.maxVer = VERSION_MIN,
|
||||||
|
.minTKey = TSDBKEY_MAX,
|
||||||
|
.maxTKey = TSDBKEY_MIN};
|
||||||
|
|
||||||
if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code;
|
if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code;
|
||||||
code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
||||||
|
@ -551,6 +553,7 @@ int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
pBuilder->suid = 0;
|
pBuilder->suid = 0;
|
||||||
pBuilder->uid = 0;
|
pBuilder->uid = 0;
|
||||||
|
pBuilder->nRow = 0;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,7 +563,9 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
|
||||||
ASSERT(pBuilder->suid || pBuilder->uid);
|
ASSERT(pBuilder->suid || pBuilder->uid);
|
||||||
ASSERT(pId->suid == pBuilder->suid);
|
ASSERT(pId->suid == pBuilder->suid);
|
||||||
|
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY kRow = TSDBROW_KEY(pRow);
|
||||||
|
if (tsdbKeyCmprFn(&pBuilder->bi.minTKey, &kRow) > 0) pBuilder->bi.minTKey = kRow;
|
||||||
|
if (tsdbKeyCmprFn(&pBuilder->bi.maxTKey, &kRow) < 0) pBuilder->bi.maxTKey = kRow;
|
||||||
|
|
||||||
// uid
|
// uid
|
||||||
if (pBuilder->uid && pBuilder->uid != pId->uid) {
|
if (pBuilder->uid && pBuilder->uid != pId->uid) {
|
||||||
|
@ -579,16 +584,16 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
|
||||||
if (pBuilder->bi.maxUid < pId->uid) pBuilder->bi.maxUid = pId->uid;
|
if (pBuilder->bi.maxUid < pId->uid) pBuilder->bi.maxUid = pId->uid;
|
||||||
|
|
||||||
// version
|
// version
|
||||||
code = tCompress(pBuilder->pVerC, &key.version, sizeof(int64_t));
|
code = tCompress(pBuilder->pVerC, &kRow.version, sizeof(int64_t));
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
if (pBuilder->bi.minVer > key.version) pBuilder->bi.minVer = key.version;
|
if (pBuilder->bi.minVer > kRow.version) pBuilder->bi.minVer = kRow.version;
|
||||||
if (pBuilder->bi.maxVer < key.version) pBuilder->bi.maxVer = key.version;
|
if (pBuilder->bi.maxVer < kRow.version) pBuilder->bi.maxVer = kRow.version;
|
||||||
|
|
||||||
// TSKEY
|
// TSKEY
|
||||||
code = tCompress(pBuilder->pKeyC, &key.ts, sizeof(int64_t));
|
code = tCompress(pBuilder->pKeyC, &kRow.ts, sizeof(int64_t));
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
if (tsdbKeyCmprFn(&pBuilder->bi.minKey, &key) > 0) pBuilder->bi.minKey = key;
|
if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts;
|
||||||
if (tsdbKeyCmprFn(&pBuilder->bi.maxKey, &key) < 0) pBuilder->bi.maxKey = key;
|
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
|
||||||
|
|
||||||
SRowIter iter = {0};
|
SRowIter iter = {0};
|
||||||
tRowIterInit(&iter, pRow, pTSchema);
|
tRowIterInit(&iter, pRow, pTSchema);
|
||||||
|
|
|
@ -522,9 +522,6 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
|
||||||
|
|
||||||
// write
|
// write
|
||||||
if (pSmaInfo->size) {
|
if (pSmaInfo->size) {
|
||||||
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
|
code = tsdbWriteFile(pWriter->pSmaFD, pWriter->fSma.size, pWriter->aBuf[0], pSmaInfo->size);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
|
@ -607,12 +604,20 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInfo *pBlkInfo) {
|
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, const SDiskData *pDiskData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
STsdbFD *pFD = pWriter->pDataFD; // todo
|
STsdbFD *pFD = NULL;
|
||||||
int64_t offset = pWriter->fData.size;
|
if (pSmaInfo) {
|
||||||
|
pFD = pWriter->pDataFD;
|
||||||
|
pBlkInfo->offset = pWriter->fData.size;
|
||||||
|
} else {
|
||||||
|
pFD = pWriter->pSttFD;
|
||||||
|
pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size;
|
||||||
|
}
|
||||||
|
pBlkInfo->szBlock = 0;
|
||||||
|
pBlkInfo->szKey = 0;
|
||||||
|
|
||||||
// hdr
|
// hdr
|
||||||
int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
|
int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
|
||||||
|
@ -621,26 +626,30 @@ int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInf
|
||||||
|
|
||||||
tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);
|
tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);
|
||||||
|
|
||||||
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], n);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset, pWriter->aBuf[0], n);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += n;
|
pBlkInfo->szKey += n;
|
||||||
|
pBlkInfo->szBlock += n;
|
||||||
|
|
||||||
// uid + ver + key
|
// uid + ver + key
|
||||||
if (pDiskData->hdr.szUid) {
|
if (pDiskData->pUid) {
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskData->pUid, pDiskData->hdr.szUid);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pUid, pDiskData->hdr.szUid);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskData->hdr.szUid;
|
pBlkInfo->szKey += pDiskData->hdr.szUid;
|
||||||
|
pBlkInfo->szBlock += pDiskData->hdr.szUid;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskData->pVer, pDiskData->hdr.szVer);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pVer, pDiskData->hdr.szVer);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskData->hdr.szVer;
|
pBlkInfo->szKey += pDiskData->hdr.szVer;
|
||||||
|
pBlkInfo->szBlock += pDiskData->hdr.szVer;
|
||||||
|
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskData->pKey, pDiskData->hdr.szKey);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskData->pKey, pDiskData->hdr.szKey);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskData->hdr.szKey;
|
pBlkInfo->szKey += pDiskData->hdr.szKey;
|
||||||
|
pBlkInfo->szBlock += pDiskData->hdr.szKey;
|
||||||
|
|
||||||
// SBlockCol
|
// aBlockCol
|
||||||
if (pDiskData->hdr.szBlkCol) {
|
if (pDiskData->hdr.szBlkCol) {
|
||||||
code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
|
code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -648,48 +657,75 @@ int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInf
|
||||||
n = 0;
|
n = 0;
|
||||||
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
|
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
|
||||||
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
|
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
|
||||||
|
n += tPutBlockCol(pWriter->aBuf[0] + n, pDiskCol);
|
||||||
n += tPutBlockCol(pWriter->aBuf[0] + n, &pDiskCol->bCol);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(n == pDiskData->hdr.szBlkCol);
|
ASSERT(n == pDiskData->hdr.szBlkCol);
|
||||||
|
|
||||||
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
offset += pDiskData->hdr.szBlkCol;
|
pBlkInfo->szBlock += pDiskData->hdr.szBlkCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pData
|
// aDiskCol
|
||||||
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
|
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
|
||||||
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
|
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
|
||||||
|
|
||||||
if (pDiskCol->bCol.flag == HAS_NULL) continue;
|
if (pDiskCol->pBit) {
|
||||||
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
|
||||||
if (pDiskCol->bCol.szBitmap) {
|
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskCol->bCol.szBitmap;
|
|
||||||
|
pBlkInfo->szBlock += pDiskCol->bCol.szBitmap;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDiskCol->bCol.szOffset) {
|
if (pDiskCol->pOff) {
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskCol->pOff, pDiskCol->bCol.szOffset);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pOff, pDiskCol->bCol.szOffset);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskCol->bCol.szOffset;
|
|
||||||
|
pBlkInfo->szBlock += pDiskCol->bCol.szOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDiskCol->bCol.szValue) {
|
if (pDiskCol->pVal) {
|
||||||
code = tsdbWriteFile(pFD, offset, pDiskCol->pVal, pDiskCol->bCol.szValue);
|
code = tsdbWriteFile(pFD, pBlkInfo->offset + pBlkInfo->szBlock, pDiskCol->pVal, pDiskCol->bCol.szValue);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
offset += pDiskCol->bCol.szValue;
|
|
||||||
|
pBlkInfo->szBlock += pDiskCol->bCol.szValue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSmaInfo) {
|
||||||
|
pWriter->fData.size += pBlkInfo->szBlock;
|
||||||
|
} else {
|
||||||
|
pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSmaInfo->offset = 0;
|
||||||
|
pSmaInfo->size = 0;
|
||||||
|
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
|
||||||
|
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pDiskCol->bCol.type)) continue;
|
||||||
|
if (pDiskCol->bCol.flag == HAS_NULL || pDiskCol->bCol.flag == (HAS_NULL | HAS_NONE)) continue;
|
||||||
|
if (!pDiskCol->bCol.smaOn) continue;
|
||||||
|
|
||||||
|
code = tRealloc(&pWriter->aBuf[0], pSmaInfo->size + tPutColumnDataAgg(NULL, &pDiskCol->agg));
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
pSmaInfo->size += tPutColumnDataAgg(pWriter->aBuf[0] + pSmaInfo->size, &pDiskCol->agg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSmaInfo->size) {
|
||||||
|
pSmaInfo->offset = pWriter->fSma.size;
|
||||||
|
|
||||||
|
code = tsdbWriteFile(pWriter->pSmaFD, pSmaInfo->offset, pWriter->aBuf[0], pSmaInfo->size);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
pWriter->fSma.size += pSmaInfo->size;
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
} else {
|
|
||||||
tsdbDebug("vgId:%d %s", TD_VID(pWriter->pTsdb->pVnode), __func__);
|
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1454,9 +1454,8 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
// SDiskDataHdr ==============================
|
// SDiskDataHdr ==============================
|
||||||
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
|
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
|
|
||||||
|
|
||||||
n += tPutU32(p ? p + n : p, pHdr->delimiter);
|
n += tPutU32(p ? p + n : p, pHdr->delimiter);
|
||||||
n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
|
n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
|
||||||
|
|
Loading…
Reference in New Issue