more code

This commit is contained in:
Hongze Cheng 2024-03-06 15:37:57 +08:00
parent b06e1b8236
commit dfd725fbdb
5 changed files with 64 additions and 62 deletions

View File

@ -163,15 +163,16 @@ typedef struct {
typedef void *(*xMallocFn)(void *, int32_t);
void tColDataDestroy(void *ph);
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag);
void tColDataClear(SColData *pColData);
void tColDataDeepClear(SColData *pColData);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward);
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
void tColDataDestroy(void *ph);
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t cflag);
void tColDataClear(SColData *pColData);
void tColDataDeepClear(SColData *pColData);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward);
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer *output, SBuffer *assist);
@ -335,14 +336,13 @@ struct SValueColumn {
};
typedef struct {
int8_t dataType; // fill by caller
int8_t cmprAlg; // fill by caller
int32_t originalSize;
int8_t dataType; // filled by caller
int8_t cmprAlg; // filled by caller
int32_t originalSize; // filled by caller
int32_t compressedSize;
} SCompressInfo;
int32_t tCompressData(void *input, // input
int32_t inputSize, // input size
SCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
@ -355,8 +355,8 @@ int32_t tDecompressData(void *input, // input
int32_t outputSize, // output size
SBuffer *buffer // assistant buffer provided by caller, can be NULL
);
int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tCompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tDecompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist);
#endif

View File

@ -101,9 +101,13 @@ typedef struct {
int32_t kvRowSize;
} SRowBuildScanInfo;
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo) { sinfo->numOfNone++; }
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
sinfo->numOfNone++;
}
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
sinfo->numOfNull++;
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
@ -152,12 +156,9 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
// loop scan
for (int32_t i = 1; i < schema->numOfCols; i++) {
bool isPK = ((schema->columns[i].flags & COL_IS_KEY) != 0);
for (;;) {
if (colValIndex >= numOfColVals) {
ASSERTS(!isPK, "Primary key should not be NONE or NULL");
tRowBuildScanAddNone(sinfo);
tRowBuildScanAddNone(sinfo, schema->columns + i);
break;
}
@ -167,18 +168,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
ASSERTS(!isPK, "Primary key should not be NULL or NONE");
tRowBuildScanAddNull(sinfo, schema->columns + i);
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
ASSERTS(!isPK, "Primary key should not be NULL or NONE");
tRowBuildScanAddNone(sinfo);
tRowBuildScanAddNone(sinfo, schema->columns + i);
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
ASSERTS(!isPK, "Primary key should not be NONE or NULL");
tRowBuildScanAddNone(sinfo);
tRowBuildScanAddNone(sinfo, schema->columns + i);
break;
} else { // skip useless value
colValIndex++;
@ -259,7 +257,7 @@ static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo
uint8_t *primaryKeys = (*ppRow)->data;
uint8_t *bitmap = primaryKeys + sinfo->tuplePKSize;
uint8_t *fixed = bitmap + sinfo->tupleBitmapSize;
uint8_t *varlen = fixed + schema->flen;
uint8_t *varlen = fixed + sinfo->tupleFixedSize;
// primary keys
for (int32_t i = 0; i < sinfo->numOfPKs; i++) {
@ -1139,10 +1137,9 @@ int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, in
ASSERT(pRow->sver == pTSchema->version);
ASSERT(nColData > 0);
uint8_t tflag = pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE);
if (tflag == HAS_NONE) {
if (pRow->flag == HAS_NONE) {
return tRowNoneUpsertColData(aColData, nColData, flag);
} else if (tflag == HAS_NULL) {
} else if (pRow->flag == HAS_NULL) {
return tRowNullUpsertColData(aColData, nColData, pTSchema, flag);
} else if (pRow->flag >> 4) { // KV row
return tRowKVUpsertColData(pRow, pTSchema, aColData, nColData, flag);
@ -2622,9 +2619,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_TINYINT,
.cmprAlg = info->cmprAlg,
.originalSize = info->bitmapOriginalSize,
};
code = tCompressDataToBuffer(colData->pBitMap, info->bitmapOriginalSize, &cinfo, output, assist);
code = tCompressDataToBuffer(colData->pBitMap, &cinfo, output, assist);
if (code) {
tBufferDestroy(&local);
return code;
@ -2645,9 +2643,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer
SCompressInfo cinfo = {
.dataType = TSDB_DATA_TYPE_INT,
.cmprAlg = info->cmprAlg,
.originalSize = info->offsetOriginalSize,
};
code = tCompressDataToBuffer(colData->aOffset, info->offsetOriginalSize, &cinfo, output, assist);
code = tCompressDataToBuffer(colData->aOffset, &cinfo, output, assist);
if (code) {
tBufferDestroy(&local);
return code;
@ -2663,9 +2662,10 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer
SCompressInfo cinfo = {
.dataType = colData->type,
.cmprAlg = info->cmprAlg,
.originalSize = info->dataOriginalSize,
};
code = tCompressDataToBuffer(colData->pData, info->dataOriginalSize, &cinfo, output, assist);
code = tCompressDataToBuffer(colData->pData, &cinfo, output, assist);
if (code) {
tBufferDestroy(&local);
return code;
@ -4046,9 +4046,10 @@ int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *inf
SCompressInfo cinfo = {
.cmprAlg = info->cmprAlg,
.dataType = TSDB_DATA_TYPE_INT,
.originalSize = valCol->offsets.size,
};
code = tCompressDataToBuffer(valCol->offsets.data, valCol->offsets.size, &cinfo, output, assist);
code = tCompressDataToBuffer(valCol->offsets.data, &cinfo, output, assist);
if (code) return code;
info->offsetOriginalSize = cinfo.originalSize;
@ -4059,9 +4060,10 @@ int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *inf
SCompressInfo cinfo = {
.cmprAlg = info->cmprAlg,
.dataType = valCol->type,
.originalSize = valCol->data.size,
};
code = tCompressDataToBuffer(valCol->data.data, valCol->data.size, &cinfo, output, assist);
code = tCompressDataToBuffer(valCol->data.data, &cinfo, output, assist);
if (code) return code;
info->dataOriginalSize = cinfo.originalSize;
@ -4182,7 +4184,6 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
}
int32_t tCompressData(void *input, // input
int32_t inputSize, // input size
SCompressInfo *info, // compress info
void *output, // output
int32_t outputSize, // output size
@ -4191,13 +4192,12 @@ int32_t tCompressData(void *input, // input
int32_t extraSizeNeeded;
int32_t code;
extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? inputSize : inputSize + COMP_OVERFLOW_BYTES;
extraSizeNeeded = (info->cmprAlg == NO_COMPRESSION) ? info->originalSize : info->originalSize + COMP_OVERFLOW_BYTES;
ASSERT(outputSize >= extraSizeNeeded);
info->originalSize = inputSize;
if (info->cmprAlg == NO_COMPRESSION) {
memcpy(output, input, inputSize);
info->compressedSize = inputSize;
memcpy(output, input, info->originalSize);
info->compressedSize = info->originalSize;
} else {
SBuffer local;
@ -4216,8 +4216,8 @@ int32_t tCompressData(void *input, // input
info->compressedSize = tDataTypes[info->dataType].compFunc( //
input, // input
inputSize, // input size
inputSize / tDataTypes[info->dataType].bytes, // number of elements
info->originalSize, // input size
info->originalSize / tDataTypes[info->dataType].bytes, // number of elements
output, // output
outputSize, // output size
info->cmprAlg, // compression algorithm
@ -4287,26 +4287,27 @@ int32_t tDecompressData(void *input, // input
return 0;
}
int32_t tCompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) {
int32_t tCompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist) {
int32_t code;
code = tBufferEnsureCapacity(output, output->size + inputSize + COMP_OVERFLOW_BYTES);
code = tBufferEnsureCapacity(output, output->size + info->originalSize + COMP_OVERFLOW_BYTES);
if (code) return code;
code = tCompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist);
code = tCompressData(input, info, tBufferGetDataEnd(output), output->capacity - output->size, assist);
if (code) return code;
output->size += info->compressedSize;
return 0;
}
int32_t tDecompressDataToBuffer(void *input, int32_t inputSize, SCompressInfo *info, SBuffer *output, SBuffer *assist) {
int32_t tDecompressDataToBuffer(void *input, SCompressInfo *info, SBuffer *output, SBuffer *assist) {
int32_t code;
code = tBufferEnsureCapacity(output, output->size + info->originalSize);
if (code) return code;
code = tDecompressData(input, inputSize, info, tBufferGetDataEnd(output), output->capacity - output->size, assist);
code = tDecompressData(input, info->compressedSize, info, tBufferGetDataEnd(output), output->capacity - output->size,
assist);
if (code) return code;
output->size += info->originalSize;

View File

@ -207,8 +207,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
.compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int64_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo,
brinBlock->buffers + i, reader->buffers + 1);
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), &cinfo, brinBlock->buffers + i,
reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += brinBlk->size[i];
}
@ -220,8 +220,8 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
.compressedSize = brinBlk->size[i],
.originalSize = brinBlk->numRec * sizeof(int32_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), brinBlk->size[i], &cinfo,
brinBlock->buffers + i, reader->buffers + 1);
code = tDecompressDataToBuffer(tBufferGetDataAt(br.buffer, br.offset), &cinfo, brinBlock->buffers + i,
reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += brinBlk->size[i];
}
@ -490,7 +490,7 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
.originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i],
};
code = tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tData->buffers + i, reader->buffers + 1);
code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += tombBlk->size[i];
}
@ -747,6 +747,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
SBuffer *bf = &brinBlock->buffers[i];
SCompressInfo info = {
.cmprAlg = cmprAlg,
.originalSize = bf->size,
};
if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
@ -758,7 +759,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
}
tBufferClear(&buffers[0]);
code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffers[0].data, &buffers[1]);
code = tCompressDataToBuffer(tBufferGetData(bf), &info, buffers[0].data, &buffers[1]);
if (code) return code;
code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size);
@ -1250,7 +1251,7 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = tombBlock->buffers[i].size,
};
code = tCompressDataToBuffer(tombBlock->buffers[i].data, cinfo.originalSize, &cinfo, &buffers[0], &buffers[1]);
code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, &buffers[0], &buffers[1]);
if (code) return code;
code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size);

View File

@ -310,8 +310,7 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
.originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i],
};
code =
tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tombBlock->buffers + i, reader->buffers + 1);
code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += tombBlk->size[i];
}
@ -348,8 +347,8 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
.originalSize = statisBlk->numRec * sizeof(int64_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info,
&statisBlock->buffers[i], &reader->buffers[1]);
code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), &info, &statisBlock->buffers[i],
&reader->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i];
}
@ -519,11 +518,12 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
SCompressInfo info = {
.dataType = TSDB_DATA_TYPE_BIGINT,
.cmprAlg = statisBlk.cmprAlg,
.originalSize = statisBlock->buffers[i].size,
};
tBufferClear(&writer->buffers[0]);
code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]),
&info, &writer->buffers[0], &writer->buffers[1]);
code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), &info, &writer->buffers[0],
&writer->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize);

View File

@ -1390,7 +1390,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = sizeof(int64_t) * bData->nRow,
};
code = tCompressDataToBuffer(bData->aUid, cinfo.originalSize, &cinfo, buffer, assist);
code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szUid = cinfo.compressedSize;
}
@ -1401,7 +1401,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = sizeof(int64_t) * bData->nRow,
};
code = tCompressDataToBuffer((uint8_t *)bData->aVersion, cinfo.originalSize, &cinfo, buffer, assist);
code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szVer = cinfo.compressedSize;
@ -1411,7 +1411,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
.dataType = TSDB_DATA_TYPE_TIMESTAMP,
.originalSize = sizeof(TSKEY) * bData->nRow,
};
code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, cinfo.originalSize, &cinfo, buffer, assist);
code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szKey = cinfo.compressedSize;