more bug fix

This commit is contained in:
Hongze Cheng 2024-03-08 17:04:26 +08:00
parent fbc52d2a9c
commit acbde220bf
9 changed files with 136 additions and 75 deletions

View File

@ -108,6 +108,7 @@ int32_t tValueColumnInit(SValueColumn *valCol);
int32_t tValueColumnDestroy(SValueColumn *valCol);
int32_t tValueColumnClear(SValueColumn *valCol);
int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value);
int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value);
int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value);
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist);
int32_t tValueColumnDecompress(void *input, const SValueColumnCompressInfo *compressInfo, SValueColumn *valCol,

View File

@ -32,6 +32,7 @@ static int32_t tBufferDestroy(SBuffer *buffer);
static int32_t tBufferClear(SBuffer *buffer);
static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity);
static int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size);
static int32_t tBufferPutAt(SBuffer *buffer, uint32_t offset, const void *data, uint32_t size);
static int32_t tBufferPutI8(SBuffer *buffer, int8_t value);
static int32_t tBufferPutI16(SBuffer *buffer, int16_t value);
static int32_t tBufferPutI32(SBuffer *buffer, int32_t value);

View File

@ -74,6 +74,14 @@ static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32
return 0;
}
static int32_t tBufferPutAt(SBuffer *buffer, uint32_t offset, const void *data, uint32_t size) {
if (offset + size > buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
memcpy((char *)buffer->data + offset, data, size);
return 0;
}
static FORCE_INLINE int32_t tBufferPutI8(SBuffer *buffer, int8_t value) {
return tBufferPut(buffer, &value, sizeof(value));
}

View File

@ -4039,6 +4039,37 @@ int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value) {
return 0;
}
int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value) {
int32_t code;
if (idx < 0 || idx >= valCol->numOfValues) {
return TSDB_CODE_OUT_OF_RANGE;
}
if (IS_VAR_DATA_TYPE(valCol->type)) {
int32_t *offsets = (int32_t *)tBufferGetData(&valCol->offsets);
int32_t nextOffset = (idx == valCol->numOfValues - 1) ? tBufferGetSize(&valCol->data) : offsets[idx + 1];
int32_t oldDataSize = nextOffset - offsets[idx];
int32_t bytesAdded = value->nData - oldDataSize;
if (bytesAdded != 0) {
if ((code = tBufferEnsureCapacity(&valCol->data, tBufferGetSize(&valCol->data) + bytesAdded))) return code;
memmove(tBufferGetDataAt(&valCol->data, nextOffset + bytesAdded), tBufferGetDataAt(&valCol->data, nextOffset),
tBufferGetSize(&valCol->data) - nextOffset);
valCol->data.size += bytesAdded;
for (int32_t i = idx + 1; i < valCol->numOfValues; i++) {
offsets[i] += bytesAdded;
}
}
return tBufferPutAt(&valCol->data, offsets[idx], value->pData, value->nData);
} else {
return tBufferPutAt(&valCol->data, idx * tDataTypes[valCol->type].bytes, &value->val,
tDataTypes[valCol->type].bytes);
}
return 0;
}
int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) {
if (idx < 0 || idx >= valCol->numOfValues) {
return TSDB_CODE_OUT_OF_RANGE;

View File

@ -372,12 +372,17 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
} else {
while (i < rows && ((int64_t *)block.suids.data)[i] == suid) {
taosArrayPush(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pFirstKey,
tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pLastKey, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)));
STbStatisRecord record;
while (i < rows) {
tStatisBlockGet(&block, i, &record);
if (record.suid != suid) {
break;
}
taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.ts);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.ts);
taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
i += 1;
}
}
@ -414,6 +419,7 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code;
}
#if 0
// load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file
TStatisBlkArray *pStatisBlkArray = NULL;
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray);
@ -428,6 +434,7 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr);
return code;
}
#endif
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);

View File

@ -2080,7 +2080,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (conf.rspRows) {
if (0 /*conf.rspRows*/) {
pScanInfo->cleanSttBlocks =
isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order);

View File

@ -871,27 +871,16 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit);
}
STsdbRowKey key;
tsdbRowGetKey(&row->row, &key);
if (writer->ctx->tbid->uid != row->uid) {
writer->ctx->tbid->suid = row->suid;
writer->ctx->tbid->uid = row->uid;
if (STATIS_BLOCK_SIZE(writer->staticBlock) >= writer->config->maxRow) {
code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
STbStatisRecord record = {
.suid = row->suid,
.uid = row->uid,
.firstKey = key.key,
.lastKey = key.key,
.count = 1,
};
STsdbRowKey key;
tsdbRowGetKey(&row->row, &key);
for (;;) {
code = tStatisBlockPut(writer->staticBlock, &record);
code = tStatisBlockPut(writer->staticBlock, row, writer->config->maxRow);
if (code == TSDB_CODE_INVALID_PARA) {
code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
@ -901,18 +890,6 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
}
break;
}
} else {
// update last key and count
STbStatisRecord record;
tStatisBlockGet(writer->staticBlock, writer->staticBlock->numOfRecords, &record);
if (tRowKeyCompare(&key.key, &record.lastKey) > 0) {
// TODO: update count and last key
// TARRAY2_LAST(writer->staticBlock->count)++;
// TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts;
}
}
if (row->row.type == TSDBROW_ROW_FMT) {
code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, //

View File

@ -111,52 +111,88 @@ int32_t tStatisBlockClear(STbStatisBlock *statisBlock) {
return 0;
}
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record) {
static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) {
int32_t code;
STsdbRowKey key;
ASSERT(record->firstKey.numOfPKs == record->lastKey.numOfPKs);
if (statisBlock->numOfRecords == 0) {
statisBlock->numOfPKs = record->firstKey.numOfPKs;
} else if (statisBlock->numOfPKs != record->firstKey.numOfPKs) {
tsdbRowGetKey(&row->row, &key);
if (block->numOfRecords == 0) {
block->numOfPKs = key.key.numOfPKs;
} else if (block->numOfPKs != key.key.numOfPKs) {
return TSDB_CODE_INVALID_PARA;
} else {
for (int i = 0; i < statisBlock->numOfPKs; i++) {
if (record->firstKey.pks[i].type != statisBlock->firstKeyPKs[i].type) {
for (int i = 0; i < block->numOfPKs; i++) {
if (key.key.pks[i].type != block->firstKeyPKs[i].type) {
return TSDB_CODE_INVALID_PARA;
}
}
}
ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs);
ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs);
if ((code = tBufferPutI64(&block->suids, row->suid))) return code;
if ((code = tBufferPutI64(&block->uids, row->uid))) return code;
if ((code = tBufferPutI64(&block->firstKeyTimestamps, key.key.ts))) return code;
if ((code = tBufferPutI64(&block->lastKeyTimestamps, key.key.ts))) return code;
if ((code = tBufferPutI64(&block->counts, 1))) return code;
for (int32_t i = 0; i < block->numOfPKs; ++i) {
if ((code = tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i))) return code;
if ((code = tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i))) return code;
}
code = tBufferPutI64(&statisBlock->suids, record->suid);
block->numOfRecords++;
return 0;
}
static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) {
STbStatisRecord record;
STsdbRowKey key;
int32_t c;
int32_t code;
tStatisBlockGet(block, block->numOfRecords - 1, &record);
tsdbRowGetKey(&row->row, &key);
c = tRowKeyCompare(&record.lastKey, &key.key);
if (c == 0) {
return 0;
} else if (c < 0) {
// last ts
code = tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts), &key.key.ts,
sizeof(key.key.ts));
if (code) return code;
code = tBufferPutI64(&statisBlock->uids, record->uid);
if (code) return code;
code = tBufferPutI64(&statisBlock->firstKeyTimestamps, record->firstKey.ts);
if (code) return code;
code = tBufferPutI64(&statisBlock->lastKeyTimestamps, record->lastKey.ts);
if (code) return code;
code = tBufferPutI64(&statisBlock->counts, record->count);
if (code) return code;
for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) {
code = tValueColumnAppend(&statisBlock->firstKeyPKs[i], &record->firstKey.pks[i]);
if (code) return code;
code = tValueColumnAppend(&statisBlock->lastKeyPKs[i], &record->lastKey.pks[i]);
// last primary keys
for (int i = 0; i < block->numOfPKs; i++) {
code = tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i]);
if (code) return code;
}
statisBlock->numOfRecords++;
// count
record.count++;
code = tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count,
sizeof(record.count));
if (code) return code;
} else {
ASSERT(0);
}
return 0;
}
int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords) {
if (block->numOfRecords > 0) {
int64_t lastUid;
SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * (block->numOfRecords - 1), &block->uids);
tBufferGetI64(&br, &lastUid);
if (lastUid == row->uid) {
return tStatisBlockUpdate(block, row);
} else if (block->numOfRecords >= maxRecords) {
return TSDB_CODE_INVALID_PARA;
}
}
return tStatisBlockAppend(block, row);
}
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) {
int32_t code;
SBufferReader reader;

View File

@ -114,7 +114,7 @@ typedef struct {
int32_t tStatisBlockInit(STbStatisBlock *statisBlock);
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock);
int32_t tStatisBlockClear(STbStatisBlock *statisBlock);
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record);
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, SRowInfo *row, int32_t maxRecords);
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record);
// SBrinRecord ----------