TD-353
This commit is contained in:
parent
209ebe5919
commit
7ca58999ed
|
@ -150,7 +150,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
uint32_t len;
|
||||
uint32_t offset;
|
||||
uint32_t padding;
|
||||
// uint32_t padding;
|
||||
uint32_t hasLast : 2;
|
||||
uint32_t numOfBlocks : 30;
|
||||
uint64_t uid;
|
||||
|
|
|
@ -498,36 +498,6 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
|
||||
buf = taosEncodeVariantU32(buf, pIdx->len);
|
||||
buf = taosEncodeVariantU32(buf, pIdx->offset);
|
||||
buf = taosEncodeFixedU8(buf, pIdx->hasLast);
|
||||
buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
|
||||
buf = taosEncodeFixedU64(buf, pIdx->uid);
|
||||
buf = taosEncodeFixedU64(buf, pIdx->maxKey);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
|
||||
uint8_t hasLast = 0;
|
||||
uint32_t numOfBlocks = 0;
|
||||
uint64_t value = 0;
|
||||
|
||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
|
||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
|
||||
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
|
||||
pIdx->hasLast = hasLast;
|
||||
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
|
||||
pIdx->numOfBlocks = numOfBlocks;
|
||||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->uid = (int64_t)value;
|
||||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->maxKey = (TSKEY)value;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
|
||||
|
@ -583,15 +553,18 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
int64_t offset = 0;
|
||||
|
||||
offset = lseek(pFile->fd, 0, SEEK_END);
|
||||
if (offset < 0) goto _err;
|
||||
if (offset < 0) {
|
||||
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int nColsNotAllNull = 0;
|
||||
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
|
||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
|
||||
|
||||
if (isNEleNull(pDataCol, rowsToWrite)) {
|
||||
// all data to commit are NULL, just ignore it
|
||||
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -630,7 +603,10 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
if (pHelper->config.compress) {
|
||||
if (pHelper->config.compress == TWO_STAGE_COMP) {
|
||||
pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
|
||||
if (pHelper->compBuffer == NULL) goto _err;
|
||||
if (pHelper->compBuffer == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
|
||||
|
@ -657,7 +633,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
|
||||
|
||||
// Write the whole block to file
|
||||
if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) goto _err;
|
||||
if (twrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
|
||||
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// Update pCompBlock membership vairables
|
||||
pCompBlock->last = isLast;
|
||||
|
@ -671,6 +651,11 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
|
|||
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
||||
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
|
||||
|
||||
tsdbTrace("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
|
||||
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
|
||||
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, pCompBlock->offset,
|
||||
pCompBlock->numOfRows, pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, pCompBlock->keyLast);
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
|
@ -1235,3 +1220,33 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
|||
_err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
|
||||
buf = taosEncodeVariantU32(buf, pIdx->len);
|
||||
buf = taosEncodeVariantU32(buf, pIdx->offset);
|
||||
buf = taosEncodeFixedU8(buf, pIdx->hasLast);
|
||||
buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
|
||||
buf = taosEncodeFixedU64(buf, pIdx->uid);
|
||||
buf = taosEncodeFixedU64(buf, pIdx->maxKey);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
|
||||
uint8_t hasLast = 0;
|
||||
uint32_t numOfBlocks = 0;
|
||||
uint64_t value = 0;
|
||||
|
||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
|
||||
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
|
||||
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
|
||||
pIdx->hasLast = hasLast;
|
||||
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
|
||||
pIdx->numOfBlocks = numOfBlocks;
|
||||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->uid = (int64_t)value;
|
||||
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
|
||||
pIdx->maxKey = (TSKEY)value;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue