more work
This commit is contained in:
parent
7c32e099d0
commit
85556a3c90
|
@ -47,7 +47,6 @@ typedef struct SBlockL SBlockL;
|
||||||
typedef struct SColData SColData;
|
typedef struct SColData SColData;
|
||||||
typedef struct SDiskDataHdr SDiskDataHdr;
|
typedef struct SDiskDataHdr SDiskDataHdr;
|
||||||
typedef struct SBlockData SBlockData;
|
typedef struct SBlockData SBlockData;
|
||||||
typedef struct SDiskData SDiskData;
|
|
||||||
typedef struct SDelFile SDelFile;
|
typedef struct SDelFile SDelFile;
|
||||||
typedef struct SHeadFile SHeadFile;
|
typedef struct SHeadFile SHeadFile;
|
||||||
typedef struct SDataFile SDataFile;
|
typedef struct SDataFile SDataFile;
|
||||||
|
@ -152,11 +151,6 @@ SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
|
||||||
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
|
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
|
||||||
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
|
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
|
||||||
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
|
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
|
||||||
// SDiskData
|
|
||||||
int32_t tDiskDataInit(SDiskData *pDiskData);
|
|
||||||
void tDiskDataClear(SDiskData *pDiskData);
|
|
||||||
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
|
|
||||||
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
|
|
||||||
// SDiskDataHdr
|
// SDiskDataHdr
|
||||||
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
|
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
|
||||||
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
|
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
|
||||||
|
@ -185,6 +179,9 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
||||||
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
|
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
|
||||||
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
||||||
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
||||||
|
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
|
||||||
|
int32_t *szOut, uint8_t **ppBuf);
|
||||||
|
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf);
|
||||||
// tsdbMemTable ==============================================================================================
|
// tsdbMemTable ==============================================================================================
|
||||||
// SMemTable
|
// SMemTable
|
||||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||||
|
@ -413,7 +410,6 @@ typedef struct {
|
||||||
int32_t szOffset; // offset size, 0 only for non-variant-length type
|
int32_t szOffset; // offset size, 0 only for non-variant-length type
|
||||||
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
|
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
|
||||||
int32_t offset;
|
int32_t offset;
|
||||||
uint8_t **ppData;
|
|
||||||
} SBlockCol;
|
} SBlockCol;
|
||||||
|
|
||||||
struct SBlockInfo {
|
struct SBlockInfo {
|
||||||
|
@ -588,14 +584,6 @@ struct SDelFWriter {
|
||||||
uint8_t *pBuf1;
|
uint8_t *pBuf1;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDiskData {
|
|
||||||
SDiskDataHdr hdr;
|
|
||||||
uint8_t **ppKey;
|
|
||||||
SArray *aBlockCol; // SArray<SBlockCol>
|
|
||||||
int32_t nBuf;
|
|
||||||
SArray *aBuf; // SArray<uint8_t*>
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SDataFWriter {
|
struct SDataFWriter {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
SDFileSet wSet;
|
SDFileSet wSet;
|
||||||
|
@ -612,8 +600,6 @@ struct SDataFWriter {
|
||||||
|
|
||||||
uint8_t *pBuf1;
|
uint8_t *pBuf1;
|
||||||
uint8_t *pBuf2;
|
uint8_t *pBuf2;
|
||||||
|
|
||||||
SDiskData dData;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STsdbReadSnap {
|
struct STsdbReadSnap {
|
||||||
|
|
|
@ -1911,72 +1911,58 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
ASSERT(pBlockData->nRow > 0);
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
// ================= DATA ====================
|
// ================= DATA ====================
|
||||||
#if 0
|
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
|
||||||
// convert
|
.suid = pBlockData->suid,
|
||||||
code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg);
|
.uid = pBlockData->uid,
|
||||||
|
.nRow = pBlockData->nRow,
|
||||||
|
.cmprAlg = cmprAlg};
|
||||||
|
|
||||||
|
SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol));
|
||||||
|
if (aBlockCol == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid
|
||||||
|
if (pBlockData->uid == 0) {
|
||||||
|
ASSERT(toLast);
|
||||||
|
code = tsdbCmprData();
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// version
|
||||||
|
code = tsdbCmprData();
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// write the block
|
// ts
|
||||||
if (toLast) {
|
code = tsdbCmprData();
|
||||||
pBlkInfo->offset = pWriter->fLast.size;
|
|
||||||
} else {
|
|
||||||
pBlkInfo->offset = pWriter->fData.size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// HDR and KEY
|
|
||||||
int32_t size = tPutDiskDataHdr(NULL, &pDiskData->hdr);
|
|
||||||
code = tRealloc(ppBuf, size);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
tPutDiskDataHdr(*ppBuf, &pDiskData->hdr);
|
// columns
|
||||||
|
int32_t offset = 0;
|
||||||
|
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
||||||
|
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
||||||
|
|
||||||
TSCKSUM cksm = taosCalcChecksum(0, *ppBuf, size);
|
ASSERT(pColData->flag);
|
||||||
|
|
||||||
int64_t n = taosWriteFile(pFD, *ppBuf, size);
|
if (pColData->flag == HAS_NONE) continue;
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
SBlockCol blockCol = {.cid = pColData->cid,
|
||||||
|
.type = pColData->type,
|
||||||
|
.smaOn = pColData->smaOn,
|
||||||
|
.flag = pColData->flag,
|
||||||
|
.szOrigin = pColData->nData};
|
||||||
|
|
||||||
|
if (pColData->flag != HAS_NULL) {
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayPush(aBlockCol, &blockCol) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
cksm = taosCalcChecksum(cksm, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey);
|
|
||||||
n = taosWriteFile(pFD, *pDiskData->ppKey, pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n = taosWriteFile(pFD, &cksm, sizeof(cksm));
|
// write
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlkInfo->szKey = size + pDiskData->hdr.szUid + pDiskData->hdr.szVer + pDiskData->hdr.szKey + sizeof(TSCKSUM);
|
|
||||||
|
|
||||||
// SBlockCol
|
|
||||||
if (pDiskData->hdr.szBlkCol == 0) {
|
|
||||||
pBlkInfo->szBlock = pBlkInfo->szKey;
|
|
||||||
goto _write_sma;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tRealloc(ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
n = 0;
|
|
||||||
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) {
|
|
||||||
n += tPutBlockCol(*ppBuf + n, taosArrayGet(pDiskData->aBlockCol, iBlockCol));
|
|
||||||
}
|
|
||||||
taosCalcChecksumAppend(0, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
n = taosWriteFile(pFD, *ppBuf, pDiskData->hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(pDiskData->aBlockCol); iBlockCol++) {
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// ================= SMA ====================
|
// ================= SMA ====================
|
||||||
if (pSmaInfo) {
|
if (pSmaInfo) {
|
||||||
|
|
|
@ -1509,219 +1509,6 @@ int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SDiskData ==============================
|
|
||||||
static int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
|
|
||||||
int32_t *szOut, uint8_t **ppBuf) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
ASSERT(szIn > 0 && ppOut);
|
|
||||||
|
|
||||||
if (cmprAlg == NO_COMPRESSION) {
|
|
||||||
code = tRealloc(ppOut, nOut + szIn);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
memcpy(*ppOut + nOut, pIn, szIn);
|
|
||||||
*szOut = szIn;
|
|
||||||
} else {
|
|
||||||
int32_t size = szIn + COMP_OVERFLOW_BYTES;
|
|
||||||
|
|
||||||
code = tRealloc(ppOut, nOut + size);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
if (cmprAlg == TWO_STAGE_COMP) {
|
|
||||||
ASSERT(ppBuf);
|
|
||||||
code = tRealloc(ppBuf, size);
|
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
*szOut =
|
|
||||||
tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size);
|
|
||||||
if (*szOut <= 0) {
|
|
||||||
code = TSDB_CODE_COMPRESS_ERROR;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
int32_t n = 0;
|
|
||||||
// bitmap
|
|
||||||
if (pColData->flag != HAS_VALUE) {
|
|
||||||
code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg,
|
|
||||||
pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf);
|
|
||||||
if (code) goto _exit;
|
|
||||||
} else {
|
|
||||||
pBlockCol->szBitmap = 0;
|
|
||||||
}
|
|
||||||
n += pBlockCol->szBitmap;
|
|
||||||
|
|
||||||
// offset
|
|
||||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
|
||||||
code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg,
|
|
||||||
pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf);
|
|
||||||
if (code) goto _exit;
|
|
||||||
} else {
|
|
||||||
pBlockCol->szOffset = 0;
|
|
||||||
}
|
|
||||||
n += pBlockCol->szOffset;
|
|
||||||
|
|
||||||
// value
|
|
||||||
if (pColData->flag != (HAS_NULL | HAS_NONE)) {
|
|
||||||
code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n,
|
|
||||||
&pBlockCol->szValue, ppBuf);
|
|
||||||
if (code) goto _exit;
|
|
||||||
} else {
|
|
||||||
pBlockCol->szValue = 0;
|
|
||||||
}
|
|
||||||
n += pBlockCol->szValue;
|
|
||||||
|
|
||||||
// checksum
|
|
||||||
n += sizeof(TSCKSUM);
|
|
||||||
taosCalcChecksumAppend(0, *ppBuf, n);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbDecmprData() {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDiskDataInit(SDiskData *pDiskData) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
pDiskData->aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
|
|
||||||
if (pDiskData->aBlockCol == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDiskData->aBuf = taosArrayInit(0, sizeof(uint8_t *));
|
|
||||||
if (pDiskData->aBuf == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tDiskDataClear(SDiskData *pDiskData) {
|
|
||||||
taosArrayDestroy(pDiskData->aBlockCol);
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pDiskData->aBuf); i++) {
|
|
||||||
tFree((uint8_t *)taosArrayGet(pDiskData->aBuf, i));
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pDiskData->aBuf);
|
|
||||||
}
|
|
||||||
|
|
||||||
static uint8_t **tDiskDataAllocBuf(SDiskData *pDiskData) {
|
|
||||||
if (pDiskData->nBuf >= taosArrayGetSize(pDiskData->aBuf)) {
|
|
||||||
uint8_t *p = NULL;
|
|
||||||
if (taosArrayPush(pDiskData->aBuf, &p) == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pDiskData->nBuf < taosArrayGetSize(pDiskData->aBuf));
|
|
||||||
uint8_t **pp = taosArrayGet(pDiskData->aBuf, pDiskData->nBuf);
|
|
||||||
pDiskData->nBuf++;
|
|
||||||
return pp;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
ASSERT(pBlockData->nRow > 0);
|
|
||||||
|
|
||||||
pDiskData->cmprAlg = cmprAlg;
|
|
||||||
pDiskData->nRow = pBlockData->nRow;
|
|
||||||
pDiskData->suid = pBlockData->suid;
|
|
||||||
pDiskData->uid = pBlockData->uid;
|
|
||||||
pDiskData->szUid = 0;
|
|
||||||
pDiskData->szVer = 0;
|
|
||||||
pDiskData->szKey = 0;
|
|
||||||
taosArrayClear(pDiskData->aBlockCol);
|
|
||||||
pDiskData->nBuf = 0;
|
|
||||||
|
|
||||||
{
|
|
||||||
pDiskData->ppKey = tDiskDataAllocBuf(pDiskData);
|
|
||||||
if (pDiskData->ppKey == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t n = 0;
|
|
||||||
// uid
|
|
||||||
if (pDiskData->uid == 0) {
|
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
|
||||||
cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL);
|
|
||||||
if (code) goto _exit;
|
|
||||||
} else {
|
|
||||||
pDiskData->szUid = 0;
|
|
||||||
}
|
|
||||||
n += pDiskData->szUid;
|
|
||||||
|
|
||||||
// version
|
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
|
||||||
cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL);
|
|
||||||
if (code) goto _exit;
|
|
||||||
n += pDiskData->szVer;
|
|
||||||
|
|
||||||
// tskey
|
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
|
|
||||||
cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL);
|
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// columns
|
|
||||||
int32_t offset = 0;
|
|
||||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
|
||||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
|
||||||
|
|
||||||
if (pColData->flag == HAS_NONE) continue;
|
|
||||||
|
|
||||||
SBlockCol blockCol = {.cid = pColData->cid,
|
|
||||||
.type = pColData->type,
|
|
||||||
.smaOn = pColData->smaOn,
|
|
||||||
.flag = pColData->flag,
|
|
||||||
.szOrigin = pColData->nData};
|
|
||||||
|
|
||||||
if (pColData->flag != HAS_NULL) {
|
|
||||||
// alloc a buffer
|
|
||||||
blockCol.ppData = tDiskDataAllocBuf(pDiskData);
|
|
||||||
if (blockCol.ppData == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// compress
|
|
||||||
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
// update offset
|
|
||||||
blockCol.offset = offset;
|
|
||||||
offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
|
|
||||||
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
|
|
||||||
|
|
||||||
// SDiskDataHdr ==============================
|
// SDiskDataHdr ==============================
|
||||||
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
|
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
@ -1927,3 +1714,168 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
|
||||||
|
int32_t *szOut, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
ASSERT(szIn > 0 && ppOut);
|
||||||
|
|
||||||
|
if (cmprAlg == NO_COMPRESSION) {
|
||||||
|
code = tRealloc(ppOut, nOut + szIn);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
memcpy(*ppOut + nOut, pIn, szIn);
|
||||||
|
*szOut = szIn;
|
||||||
|
} else {
|
||||||
|
int32_t size = szIn + COMP_OVERFLOW_BYTES;
|
||||||
|
|
||||||
|
code = tRealloc(ppOut, nOut + size);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
if (cmprAlg == TWO_STAGE_COMP) {
|
||||||
|
ASSERT(ppBuf);
|
||||||
|
code = tRealloc(ppBuf, size);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
*szOut =
|
||||||
|
tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size);
|
||||||
|
if (*szOut <= 0) {
|
||||||
|
code = TSDB_CODE_COMPRESS_ERROR;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int32_t n = 0;
|
||||||
|
// bitmap
|
||||||
|
if (pColData->flag != HAS_VALUE) {
|
||||||
|
code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg,
|
||||||
|
pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
pBlockCol->szBitmap = 0;
|
||||||
|
}
|
||||||
|
n += pBlockCol->szBitmap;
|
||||||
|
|
||||||
|
// offset
|
||||||
|
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||||
|
code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg,
|
||||||
|
pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
pBlockCol->szOffset = 0;
|
||||||
|
}
|
||||||
|
n += pBlockCol->szOffset;
|
||||||
|
|
||||||
|
// value
|
||||||
|
if (pColData->flag != (HAS_NULL | HAS_NONE)) {
|
||||||
|
code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n,
|
||||||
|
&pBlockCol->szValue, ppBuf);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
pBlockCol->szValue = 0;
|
||||||
|
}
|
||||||
|
n += pBlockCol->szValue;
|
||||||
|
|
||||||
|
// checksum
|
||||||
|
n += sizeof(TSCKSUM);
|
||||||
|
taosCalcChecksumAppend(0, *ppBuf, n);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
|
pDiskData->cmprAlg = cmprAlg;
|
||||||
|
pDiskData->nRow = pBlockData->nRow;
|
||||||
|
pDiskData->suid = pBlockData->suid;
|
||||||
|
pDiskData->uid = pBlockData->uid;
|
||||||
|
pDiskData->szUid = 0;
|
||||||
|
pDiskData->szVer = 0;
|
||||||
|
pDiskData->szKey = 0;
|
||||||
|
taosArrayClear(pDiskData->aBlockCol);
|
||||||
|
pDiskData->nBuf = 0;
|
||||||
|
|
||||||
|
{
|
||||||
|
pDiskData->ppKey = tDiskDataAllocBuf(pDiskData);
|
||||||
|
if (pDiskData->ppKey == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t n = 0;
|
||||||
|
// uid
|
||||||
|
if (pDiskData->uid == 0) {
|
||||||
|
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
||||||
|
cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
pDiskData->szUid = 0;
|
||||||
|
}
|
||||||
|
n += pDiskData->szUid;
|
||||||
|
|
||||||
|
// version
|
||||||
|
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
||||||
|
cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL);
|
||||||
|
if (code) goto _exit;
|
||||||
|
n += pDiskData->szVer;
|
||||||
|
|
||||||
|
// tskey
|
||||||
|
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
|
cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// columns
|
||||||
|
int32_t offset = 0;
|
||||||
|
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
||||||
|
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
||||||
|
|
||||||
|
if (pColData->flag == HAS_NONE) continue;
|
||||||
|
|
||||||
|
SBlockCol blockCol = {.cid = pColData->cid,
|
||||||
|
.type = pColData->type,
|
||||||
|
.smaOn = pColData->smaOn,
|
||||||
|
.flag = pColData->flag,
|
||||||
|
.szOrigin = pColData->nData};
|
||||||
|
|
||||||
|
if (pColData->flag != HAS_NULL) {
|
||||||
|
// alloc a buffer
|
||||||
|
blockCol.ppData = tDiskDataAllocBuf(pDiskData);
|
||||||
|
if (blockCol.ppData == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// compress
|
||||||
|
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL);
|
||||||
|
if (code) goto _exit;
|
||||||
|
|
||||||
|
// update offset
|
||||||
|
blockCol.offset = offset;
|
||||||
|
offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue