diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 5614df786c..0005e9027e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -108,6 +108,7 @@ int32_t tValueColumnInit(SValueColumn *valCol); int32_t tValueColumnDestroy(SValueColumn *valCol); int32_t tValueColumnClear(SValueColumn *valCol); int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value); +int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value); int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value); int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *info, SBuffer *output, SBuffer *assist); int32_t tValueColumnDecompress(void *input, const SValueColumnCompressInfo *compressInfo, SValueColumn *valCol, diff --git a/include/util/tbuffer.h b/include/util/tbuffer.h index 83ba3a6cfa..094d0e37ba 100644 --- a/include/util/tbuffer.h +++ b/include/util/tbuffer.h @@ -32,6 +32,7 @@ static int32_t tBufferDestroy(SBuffer *buffer); static int32_t tBufferClear(SBuffer *buffer); static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity); static int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size); +static int32_t tBufferPutAt(SBuffer *buffer, uint32_t offset, const void *data, uint32_t size); static int32_t tBufferPutI8(SBuffer *buffer, int8_t value); static int32_t tBufferPutI16(SBuffer *buffer, int16_t value); static int32_t tBufferPutI32(SBuffer *buffer, int32_t value); diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc index e0a7f57be8..542c68ec8e 100644 --- a/include/util/tbuffer.inc +++ b/include/util/tbuffer.inc @@ -74,6 +74,14 @@ static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32 return 0; } +static int32_t tBufferPutAt(SBuffer *buffer, uint32_t offset, const void *data, uint32_t size) { + if (offset + size > buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; + } + memcpy((char *)buffer->data + offset, data, size); + return 0; +} + static FORCE_INLINE int32_t tBufferPutI8(SBuffer *buffer, int8_t value) { return tBufferPut(buffer, &value, sizeof(value)); } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 0faf67fc49..4710bfb688 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -4039,6 +4039,37 @@ int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value) { return 0; } +int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value) { + int32_t code; + + if (idx < 0 || idx >= valCol->numOfValues) { + return TSDB_CODE_OUT_OF_RANGE; + } + + if (IS_VAR_DATA_TYPE(valCol->type)) { + int32_t *offsets = (int32_t *)tBufferGetData(&valCol->offsets); + int32_t nextOffset = (idx == valCol->numOfValues - 1) ? tBufferGetSize(&valCol->data) : offsets[idx + 1]; + int32_t oldDataSize = nextOffset - offsets[idx]; + int32_t bytesAdded = value->nData - oldDataSize; + + if (bytesAdded != 0) { + if ((code = tBufferEnsureCapacity(&valCol->data, tBufferGetSize(&valCol->data) + bytesAdded))) return code; + memmove(tBufferGetDataAt(&valCol->data, nextOffset + bytesAdded), tBufferGetDataAt(&valCol->data, nextOffset), + tBufferGetSize(&valCol->data) - nextOffset); + valCol->data.size += bytesAdded; + + for (int32_t i = idx + 1; i < valCol->numOfValues; i++) { + offsets[i] += bytesAdded; + } + } + return tBufferPutAt(&valCol->data, offsets[idx], value->pData, value->nData); + } else { + return tBufferPutAt(&valCol->data, idx * tDataTypes[valCol->type].bytes, &value->val, + tDataTypes[valCol->type].bytes); + } + return 0; +} + int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) { if (idx < 0 || idx >= valCol->numOfValues) { return TSDB_CODE_OUT_OF_RANGE; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index f4567547ec..ee6f0a8c84 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -372,12 +372,17 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); } else { - while (i < rows && ((int64_t *)block.suids.data)[i] == suid) { - taosArrayPush(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t))); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, - tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t))); - taosArrayPush(pBlockLoadInfo->info.pLastKey, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t))); - taosArrayPush(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t))); + STbStatisRecord record; + while (i < rows) { + tStatisBlockGet(&block, i, &record); + if (record.suid != suid) { + break; + } + + taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.ts); + taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.ts); + taosArrayPush(pBlockLoadInfo->info.pCount, &record.count); i += 1; } } @@ -414,6 +419,7 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * return code; } +#if 0 // load stt statistics block for all stt-blocks, to decide if the data of queried table exists in current stt file TStatisBlkArray *pStatisBlkArray = NULL; code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pStatisBlkArray); @@ -428,6 +434,7 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * tsdbError("failed to load stt statistics block data, code:%s, %s", tstrerror(code), idStr); return code; } +#endif code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index ffe32904a2..8bc3177e63 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2080,7 +2080,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan initMemDataIterator(pScanInfo, pReader); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); - if (conf.rspRows) { + if (0 /*conf.rspRows*/) { pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pTimeWindowList, &pReader->info.window, pScanInfo, pReader->info.order); diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 4c30951329..f3d271f1a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -871,47 +871,24 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } - STsdbRowKey key; - tsdbRowGetKey(&row->row, &key); - if (writer->ctx->tbid->uid != row->uid) { writer->ctx->tbid->suid = row->suid; writer->ctx->tbid->uid = row->uid; + } - if (STATIS_BLOCK_SIZE(writer->staticBlock) >= writer->config->maxRow) { + STsdbRowKey key; + tsdbRowGetKey(&row->row, &key); + + for (;;) { + code = tStatisBlockPut(writer->staticBlock, row, writer->config->maxRow); + if (code == TSDB_CODE_INVALID_PARA) { code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); + continue; + } else { + TSDB_CHECK_CODE(code, lino, _exit); } - - STbStatisRecord record = { - .suid = row->suid, - .uid = row->uid, - .firstKey = key.key, - .lastKey = key.key, - .count = 1, - }; - for (;;) { - code = tStatisBlockPut(writer->staticBlock, &record); - if (code == TSDB_CODE_INVALID_PARA) { - code = tsdbSttFileDoWriteStatisBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); - continue; - } else { - TSDB_CHECK_CODE(code, lino, _exit); - } - break; - } - } else { - // update last key and count - STbStatisRecord record; - - tStatisBlockGet(writer->staticBlock, writer->staticBlock->numOfRecords, &record); - - if (tRowKeyCompare(&key.key, &record.lastKey) > 0) { - // TODO: update count and last key - // TARRAY2_LAST(writer->staticBlock->count)++; - // TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts; - } + break; } if (row->row.type == TSDBROW_ROW_FMT) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 73b821bb30..063f4e406a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -111,52 +111,88 @@ int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { return 0; } -int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record) { - int32_t code; +static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) { + int32_t code; + STsdbRowKey key; - ASSERT(record->firstKey.numOfPKs == record->lastKey.numOfPKs); - - if (statisBlock->numOfRecords == 0) { - statisBlock->numOfPKs = record->firstKey.numOfPKs; - } else if (statisBlock->numOfPKs != record->firstKey.numOfPKs) { + tsdbRowGetKey(&row->row, &key); + if (block->numOfRecords == 0) { + block->numOfPKs = key.key.numOfPKs; + } else if (block->numOfPKs != key.key.numOfPKs) { return TSDB_CODE_INVALID_PARA; } else { - for (int i = 0; i < statisBlock->numOfPKs; i++) { - if (record->firstKey.pks[i].type != statisBlock->firstKeyPKs[i].type) { + for (int i = 0; i < block->numOfPKs; i++) { + if (key.key.pks[i].type != block->firstKeyPKs[i].type) { return TSDB_CODE_INVALID_PARA; } } } - ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs); - ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs); - - code = tBufferPutI64(&statisBlock->suids, record->suid); - if (code) return code; - - code = tBufferPutI64(&statisBlock->uids, record->uid); - if (code) return code; - - code = tBufferPutI64(&statisBlock->firstKeyTimestamps, record->firstKey.ts); - if (code) return code; - - code = tBufferPutI64(&statisBlock->lastKeyTimestamps, record->lastKey.ts); - if (code) return code; - - code = tBufferPutI64(&statisBlock->counts, record->count); - if (code) return code; - - for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) { - code = tValueColumnAppend(&statisBlock->firstKeyPKs[i], &record->firstKey.pks[i]); - if (code) return code; - code = tValueColumnAppend(&statisBlock->lastKeyPKs[i], &record->lastKey.pks[i]); - if (code) return code; + if ((code = tBufferPutI64(&block->suids, row->suid))) return code; + if ((code = tBufferPutI64(&block->uids, row->uid))) return code; + if ((code = tBufferPutI64(&block->firstKeyTimestamps, key.key.ts))) return code; + if ((code = tBufferPutI64(&block->lastKeyTimestamps, key.key.ts))) return code; + if ((code = tBufferPutI64(&block->counts, 1))) return code; + for (int32_t i = 0; i < block->numOfPKs; ++i) { + if ((code = tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i))) return code; + if ((code = tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i))) return code; } - statisBlock->numOfRecords++; + block->numOfRecords++; return 0; } +static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) { + STbStatisRecord record; + STsdbRowKey key; + int32_t c; + int32_t code; + + tStatisBlockGet(block, block->numOfRecords - 1, &record); + tsdbRowGetKey(&row->row, &key); + + c = tRowKeyCompare(&record.lastKey, &key.key); + if (c == 0) { + return 0; + } else if (c < 0) { + // last ts + code = tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts), &key.key.ts, + sizeof(key.key.ts)); + if (code) return code; + + // last primary keys + for (int i = 0; i < block->numOfPKs; i++) { + code = tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i]); + if (code) return code; + } + + // count + record.count++; + code = tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count, + sizeof(record.count)); + if (code) return code; + } else { + ASSERT(0); + } + + return 0; +} + +int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords) { + if (block->numOfRecords > 0) { + int64_t lastUid; + SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * (block->numOfRecords - 1), &block->uids); + tBufferGetI64(&br, &lastUid); + + if (lastUid == row->uid) { + return tStatisBlockUpdate(block, row); + } else if (block->numOfRecords >= maxRecords) { + return TSDB_CODE_INVALID_PARA; + } + } + return tStatisBlockAppend(block, row); +} + int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) { int32_t code; SBufferReader reader; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index fb7270fcc8..71f47a5f8e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -114,7 +114,7 @@ typedef struct { int32_t tStatisBlockInit(STbStatisBlock *statisBlock); int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock); int32_t tStatisBlockClear(STbStatisBlock *statisBlock); -int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record); +int32_t tStatisBlockPut(STbStatisBlock *statisBlock, SRowInfo *row, int32_t maxRecords); int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record); // SBrinRecord ----------