more code

This commit is contained in:
Hongze Cheng 2022-09-27 17:36:02 +08:00
parent fe95393bb8
commit a4d16f1c00
5 changed files with 80 additions and 51 deletions

View File

@ -131,7 +131,7 @@ typedef struct SCompressor SCompressor;
int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorCreate(SCompressor **ppCmprsor);
int32_t tCompressorDestroy(SCompressor *pCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor);
int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin);
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -326,9 +326,9 @@ int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder); void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder);
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg, int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg,
uint8_t calcSma); uint8_t calcSma);
int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId); int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData); int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
int32_t tDiskDataDestroy(SDiskData *pDiskData); int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData);
// structs ======================= // structs =======================
struct STsdbFS { struct STsdbFS {

View File

@ -665,7 +665,7 @@ static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilde
.nRow = pBuilder->nRow}; .nRow = pBuilder->nRow};
// gnrt // gnrt
code = tGnrtDiskData(pBuilder, &pBuilder->dd); // code = tGnrtDiskData(pBuilder, &pBuilder->dd);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// write // write
@ -1369,7 +1369,6 @@ static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id)
if (!pBuilder->suid && !pBuilder->uid) { if (!pBuilder->suid && !pBuilder->uid) {
ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid); ASSERT(pCommitter->skmTable.uid == id.uid);
TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
code = code =
tDiskDataBuilderInit(pCommitter->dWriter.pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); tDiskDataBuilderInit(pCommitter->dWriter.pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -1396,7 +1395,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id); code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
@ -1435,7 +1434,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
pTSchema = pCommitter->skmRow.pTSchema; pTSchema = pCommitter->skmRow.pTSchema;
} }
code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id); code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); code = tsdbNextCommitRow(pCommitter);

View File

@ -31,9 +31,16 @@ struct SDiskColBuilder {
SColumnDataAgg sma; SColumnDataAgg sma;
uint8_t minSet; uint8_t minSet;
uint8_t maxSet; uint8_t maxSet;
uint8_t *aBuf[1]; uint8_t *aBuf[2];
}; };
// SDiskData ================================================
static int32_t tDiskDataDestroy(SDiskData *pDiskData) {
int32_t code = 0;
pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol);
return code;
}
// SDiskColBuilder ================================================ // SDiskColBuilder ================================================
#define tDiskColBuilderCreate() \ #define tDiskColBuilderCreate() \
(SDiskColBuilder) { 0 } (SDiskColBuilder) { 0 }
@ -91,7 +98,7 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
.type = pBuilder->type, .type = pBuilder->type,
.smaOn = pBuilder->calcSma, .smaOn = pBuilder->calcSma,
.flag = pBuilder->flag, .flag = pBuilder->flag,
.szOrigin = 0, // todo .szOrigin = 0,
.szBitmap = 0, .szBitmap = 0,
.szOffset = 0, .szOffset = 0,
.szValue = 0, .szValue = 0,
@ -109,20 +116,27 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
nBit = BIT1_SIZE(pBuilder->nVal); nBit = BIT1_SIZE(pBuilder->nVal);
} }
pDiskCol->bCol.szBitmap = tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], 0, pBuilder->cmprAlg, code = tRealloc(&pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES);
NULL, 0); // todo: alloc if (code) return code;
code = tRealloc(&pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES);
if (code) return code;
pDiskCol->bCol.szBitmap =
tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES,
pBuilder->cmprAlg, pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES);
pDiskCol->pBit = pBuilder->aBuf[0]; pDiskCol->pBit = pBuilder->aBuf[0];
} }
// OFFSET // OFFSET
if (IS_VAR_DATA_TYPE(pBuilder->type)) { if (IS_VAR_DATA_TYPE(pBuilder->type)) {
code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset); code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset, NULL);
if (code) return code; if (code) return code;
} }
// VALUE // VALUE
if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { if (pBuilder->flag != (HAS_NULL | HAS_NONE)) {
code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue); code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue, &pDiskCol->bCol.szOrigin);
if (code) return code; if (code) return code;
} }
@ -425,8 +439,8 @@ static int32_t tDiskColAddVal(SDiskColBuilder *pBuilder, SColVal *pColVal) {
} }
} }
if (tDiskColAddValImpl[pBuilder->type][pColVal->type]) { if (tDiskColAddValImpl[pBuilder->flag][pColVal->flag]) {
code = tDiskColAddValImpl[pBuilder->type][pColVal->type](pBuilder, pColVal); code = tDiskColAddValImpl[pBuilder->flag][pColVal->flag](pBuilder, pColVal);
if (code) return code; if (code) return code;
} }
@ -465,6 +479,7 @@ void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
tFree(pBuilder->aBuf[iBuf]); tFree(pBuilder->aBuf[iBuf]);
} }
tDiskDataDestroy(&pBuilder->dd);
taosMemoryFree(pBuilder); taosMemoryFree(pBuilder);
return NULL; return NULL;
@ -474,6 +489,8 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
uint8_t calcSma) { uint8_t calcSma) {
int32_t code = 0; int32_t code = 0;
ASSERT(pId->suid || pId->uid);
pBuilder->suid = pId->suid; pBuilder->suid = pId->suid;
pBuilder->uid = pId->uid; pBuilder->uid = pId->uid;
pBuilder->nRow = 0; pBuilder->nRow = 0;
@ -512,9 +529,9 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
} }
} }
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder); SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder);
code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg, code = tDiskColBuilderInit(pDCBuilder, pTColumn->colId, pTColumn->type, cmprAlg,
(calcSma && (pTColumn->flags & COL_SMA_ON))); (calcSma && (pTColumn->flags & COL_SMA_ON)));
if (code) return code; if (code) return code;
@ -524,14 +541,22 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
return code; return code;
} }
int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder) {
int32_t code = 0;
pBuilder->suid = 0;
pBuilder->uid = 0;
return code;
}
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) {
int32_t code = 0; int32_t code = 0;
ASSERT(pBuilder->suid || pBuilder->uid);
ASSERT(pId->suid == pBuilder->suid); ASSERT(pId->suid == pBuilder->suid);
// uid // uid
if (pBuilder->uid && pBuilder->uid != pId->uid) { if (pBuilder->uid && pBuilder->uid != pId->uid) {
ASSERT(!pBuilder->calcSma); ASSERT(pBuilder->suid);
for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) {
code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t)); code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t));
if (code) return code; if (code) return code;
@ -564,14 +589,13 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche
pColVal = tRowIterNext(&iter); pColVal = tRowIterNext(&iter);
} }
if (pColVal == NULL || pColVal->cid > pDCBuilder->cid) { if (pColVal && pColVal->cid == pDCBuilder->cid) {
SColVal cv = COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type);
code = tDiskColAddVal(pDCBuilder, &cv);
if (code) return code;
} else {
code = tDiskColAddVal(pDCBuilder, pColVal); code = tDiskColAddVal(pDCBuilder, pColVal);
if (code) return code; if (code) return code;
pColVal = tRowIterNext(&iter); pColVal = tRowIterNext(&iter);
} else {
code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type));
if (code) return code;
} }
} }
pBuilder->nRow++; pBuilder->nRow++;
@ -579,11 +603,14 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche
return code; return code;
} }
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData) {
int32_t code = 0; int32_t code = 0;
ASSERT(pBuilder->nRow); ASSERT(pBuilder->nRow);
*ppDiskData = NULL;
SDiskData *pDiskData = &pBuilder->dd;
// reset SDiskData // reset SDiskData
pDiskData->hdr = (SDiskDataHdr){.delimiter = TSDB_FILE_DLMT, pDiskData->hdr = (SDiskDataHdr){.delimiter = TSDB_FILE_DLMT,
.fmtVer = 0, .fmtVer = 0,
@ -598,6 +625,22 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
pDiskData->pUid = NULL; pDiskData->pUid = NULL;
pDiskData->pVer = NULL; pDiskData->pVer = NULL;
pDiskData->pKey = NULL; pDiskData->pKey = NULL;
// UID
if (pBuilder->uid == 0) {
code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid, NULL);
if (code) return code;
}
// VERSION
code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer, NULL);
if (code) return code;
// TSKEY
code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey, NULL);
if (code) return code;
// aDiskCol
if (pDiskData->aDiskCol) { if (pDiskData->aDiskCol) {
taosArrayClear(pDiskData->aDiskCol); taosArrayClear(pDiskData->aDiskCol);
} else { } else {
@ -608,20 +651,6 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
} }
} }
// UID
if (pBuilder->uid == 0) {
code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid);
if (code) return code;
}
// VERSION
code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer);
if (code) return code;
// TSKEY
code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey);
if (code) return code;
int32_t offset = 0; int32_t offset = 0;
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
@ -644,12 +673,6 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
pDiskData->hdr.szBlkCol += tPutBlockCol(NULL, &dCol.bCol); pDiskData->hdr.szBlkCol += tPutBlockCol(NULL, &dCol.bCol);
} }
return code; *ppDiskData = pDiskData;
}
// SDiskData ================================================
int32_t tDiskDataDestroy(SDiskData *pDiskData) {
int32_t code = 0;
pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol);
return code; return code;
} }

View File

@ -2000,16 +2000,23 @@ int32_t tCompressStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
return code; return code;
} }
int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppOut, int32_t *nOut, int32_t *nOrigin) {
int32_t code = 0; int32_t code = 0;
*ppData = NULL; *ppOut = NULL;
*nData = 0; *nOut = 0;
if (nOrigin) {
if (DATA_TYPE_INFO[pCmprsor->type].isVarLen) {
*nOrigin = pCmprsor->nBuf - 1;
} else {
*nOrigin = pCmprsor->nVal * DATA_TYPE_INFO[pCmprsor->type].bytes;
}
}
if (pCmprsor->nVal == 0) return code; if (pCmprsor->nVal == 0) return code;
if (DATA_TYPE_INFO[pCmprsor->type].endFn) { if (DATA_TYPE_INFO[pCmprsor->type].endFn) {
return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData); return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppOut, nOut);
} }
return code; return code;