more work
This commit is contained in:
parent
3698016ad9
commit
605e89b6a1
|
@ -182,8 +182,12 @@ int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
|||
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
|
||||
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
|
||||
int32_t *szOut, uint8_t **ppBuf);
|
||||
int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut,
|
||||
uint8_t **ppBuf);
|
||||
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut,
|
||||
uint8_t **ppBuf);
|
||||
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
|
||||
uint8_t **ppBuf);
|
||||
// tsdbMemTable ==============================================================================================
|
||||
// SMemTable
|
||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||
|
@ -254,6 +258,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa
|
|||
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
|
||||
|
||||
#if 0
|
||||
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
||||
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
|
||||
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
|
||||
|
@ -262,6 +267,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl
|
|||
uint8_t **ppBuf2);
|
||||
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
|
||||
uint8_t **ppBuf2);
|
||||
#endif
|
||||
// SDelFWriter
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
||||
|
|
|
@ -677,6 +677,159 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) {
|
||||
int32_t code = 0;
|
||||
SSmaInfo *pSmaInfo = &pBlock->smaInfo;
|
||||
|
||||
ASSERT(pSmaInfo->size > 0);
|
||||
|
||||
taosArrayClear(aColumnDataAgg);
|
||||
|
||||
// alloc
|
||||
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
|
||||
code = tRealloc(&pReader->pBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
int64_t n = taosReadFile(pReader->pSmaFD, pReader->pBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// decode
|
||||
n = 0;
|
||||
while (n < pSmaInfo->size) {
|
||||
SColumnDataAgg sma;
|
||||
|
||||
n += tGetColumnDataAgg(pReader->pBuf1 + n, &sma);
|
||||
if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, int8_t fromLast, int16_t *aColId,
|
||||
int32_t nColId, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
|
||||
tBlockDataReset(pBlockData);
|
||||
|
||||
TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD;
|
||||
|
||||
// seek
|
||||
int64_t n = taosLSeekFile(pFD, pBlkInfo->offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
code = tRealloc(&pReader->pBuf1, pBlkInfo->szBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
n = taosReadFile(pFD, pReader->pBuf1, pBlkInfo->szBlock);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < pBlkInfo->szBlock) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
uint8_t *p = pReader->pBuf1;
|
||||
// check & decode
|
||||
SDiskDataHdr hdr;
|
||||
if (!taosCheckChecksumWhole(p, pBlkInfo->szKey)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
p += tGetDiskDataHdr(p, &hdr);
|
||||
|
||||
tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid);
|
||||
pBlockData->nRow = hdr.nRow;
|
||||
|
||||
if (hdr.uid == 0) {
|
||||
ASSERT(hdr.szUid);
|
||||
code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
|
||||
sizeof(int64_t) * hdr.nRow, &pReader->pBuf2);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
ASSERT(hdr.szUid == 0);
|
||||
}
|
||||
p += hdr.szUid;
|
||||
|
||||
code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
|
||||
sizeof(int64_t) * hdr.nRow, &pReader->pBuf2);
|
||||
if (code) goto _err;
|
||||
p += hdr.szVer;
|
||||
|
||||
code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
|
||||
sizeof(TSKEY) * hdr.nRow, &pReader->pBuf2);
|
||||
if (code) goto _err;
|
||||
p += hdr.szKey;
|
||||
p += sizeof(TSCKSUM);
|
||||
|
||||
// SBlockCol
|
||||
if (hdr.szBlkCol > 0) {
|
||||
if (!taosCheckChecksumWhole(p, hdr.szBlkCol + sizeof(TSCKSUM))) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int32_t iColData = 0;
|
||||
uint8_t *pt = p + hdr.szBlkCol + sizeof(TSCKSUM);
|
||||
n = 0;
|
||||
while (n < hdr.szBlkCol) {
|
||||
SBlockCol blockCol;
|
||||
|
||||
n += tGetBlockCol(p + n, &blockCol);
|
||||
|
||||
ASSERT(blockCol.flag && blockCol.flag != HAS_NONE);
|
||||
|
||||
SColData *pColData;
|
||||
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
|
||||
if (code) goto _err;
|
||||
iColData++;
|
||||
|
||||
tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn);
|
||||
|
||||
if (blockCol.flag == HAS_NULL) {
|
||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type));
|
||||
if (code) goto _err;
|
||||
}
|
||||
} else {
|
||||
code = tsdbDecmprColData(pt + blockCol.offset, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, pReader->pBuf2);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
|
@ -1350,54 +1503,7 @@ _err:
|
|||
#endif
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) {
|
||||
int32_t code = 0;
|
||||
SSmaInfo *pSmaInfo = &pBlock->smaInfo;
|
||||
|
||||
ASSERT(pSmaInfo->size > 0);
|
||||
|
||||
taosArrayClear(aColumnDataAgg);
|
||||
|
||||
// alloc
|
||||
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
|
||||
code = tRealloc(&pReader->pBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
int64_t n = taosReadFile(pReader->pSmaFD, pReader->pBuf1, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
} else if (n < size) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// decode
|
||||
n = 0;
|
||||
while (n < pSmaInfo->size) {
|
||||
SColumnDataAgg sma;
|
||||
|
||||
n += tGetColumnDataAgg(pReader->pBuf1 + n, &sma);
|
||||
if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#endif
|
||||
|
||||
// SDataFWriter ====================================================
|
||||
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
|
||||
|
|
|
@ -1751,6 +1751,36 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut,
|
||||
uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
|
||||
code = tRealloc(ppOut, szOut);
|
||||
if (code) goto _exit;
|
||||
|
||||
if (cmprAlg == NO_COMPRESSION) {
|
||||
ASSERT(szIn == szOut);
|
||||
memcpy(*ppOut, pIn, szOut);
|
||||
} else {
|
||||
if (cmprAlg == TWO_STAGE_COMP) {
|
||||
code = tRealloc(ppBuf, szOut + COMP_OVERFLOW_BYTES);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
int32_t size = tDataTypes[type].decompFunc(pIn, szIn, szOut / tDataTypes[type].bytes, *ppOut, szOut, cmprAlg,
|
||||
*ppBuf, szOut + COMP_OVERFLOW_BYTES);
|
||||
if (size <= 0) {
|
||||
code = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
ASSERT(size == szOut);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut,
|
||||
uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
|
@ -1793,3 +1823,48 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
|
|||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
|
||||
uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
|
||||
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
|
||||
if (!taosCheckChecksumWhole(pIn, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pColData->cid = pBlockCol->cid;
|
||||
pColData->type = pBlockCol->type;
|
||||
pColData->smaOn = pBlockCol->smaOn;
|
||||
pColData->flag = pBlockCol->flag;
|
||||
pColData->nVal = nVal;
|
||||
pColData->nData = pBlockCol->szOrigin;
|
||||
|
||||
uint8_t *p = pIn;
|
||||
// bitmap
|
||||
if (pBlockCol->szBitmap) {
|
||||
code = tsdbDecmprData(p, pBlockCol->szBitmap, TSDB_DATA_TYPE_TINYINT, cmprAlg, &pColData->pBitMap,
|
||||
BIT2_SIZE(pColData->nVal), ppBuf);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
p += pBlockCol->szBitmap;
|
||||
|
||||
// offset
|
||||
if (pBlockCol->szOffset) {
|
||||
code = tsdbDecmprData(p, pBlockCol->szOffset, TSDB_DATA_TYPE_INT, cmprAlg, (uint8_t **)&pColData->aOffset,
|
||||
sizeof(int32_t) * pColData->nVal, ppBuf);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
p += pBlockCol->szOffset;
|
||||
|
||||
// value
|
||||
if (pBlockCol->szValue) {
|
||||
code = tsdbDecmprData(p, pBlockCol->szValue, pColData->type, cmprAlg, &pColData->pData, pColData->nData, ppBuf);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
p += pBlockCol->szValue;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue