more code

This commit is contained in:
Hongze Cheng 2024-02-23 17:27:03 +08:00
parent 0b8b64feb4
commit 78da82784c
7 changed files with 302 additions and 145 deletions

View File

@ -115,6 +115,7 @@ int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColum
SValueColumn *valCol, SBuffer *helperBuffer); SValueColumn *valCol, SBuffer *helperBuffer);
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer);
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo);
int32_t tValueCompare(const SValue *tv1, const SValue *tv2);
// SRow ================================ // SRow ================================
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);

View File

@ -1250,7 +1250,7 @@ void tRowGetKey(SRow *pRow, SRowKey *key) {
} \ } \
} while (0) } while (0)
static int32_t tValueCompare(const SValue *tv1, const SValue *tv2) { int32_t tValueCompare(const SValue *tv1, const SValue *tv2) {
ASSERT(tv1->type == tv2->type); ASSERT(tv1->type == tv2->type);
switch (tv1->type) { switch (tv1->type) {

View File

@ -15,10 +15,10 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFSet2.h" #include "tsdbFSet2.h"
#include "tsdbUtil2.h"
#include "tsdbMerge.h" #include "tsdbMerge.h"
#include "tsdbReadUtil.h" #include "tsdbReadUtil.h"
#include "tsdbSttFileRW.h" #include "tsdbSttFileRW.h"
#include "tsdbUtil2.h"
static void tLDataIterClose2(SLDataIter *pIter); static void tLDataIterClose2(SLDataIter *pIter);
@ -60,7 +60,7 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pLoadInfo->currentLoadBlockIndex = 1; pLoadInfo->currentLoadBlockIndex = 1;
SBlockDataInfo* pInfo = &pLoadInfo->blockData[0]; SBlockDataInfo *pInfo = &pLoadInfo->blockData[0];
tBlockDataDestroy(&pInfo->data); tBlockDataDestroy(&pInfo->data);
pInfo->sttBlockIndex = -1; pInfo->sttBlockIndex = -1;
pInfo->pin = false; pInfo->pin = false;
@ -88,7 +88,7 @@ void destroyLDataIter(SLDataIter *pIter) {
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) { void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) {
if (pLDataIterArray == NULL) { if (pLDataIterArray == NULL) {
return NULL; return NULL;
} }
@ -115,7 +115,7 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoa
} }
// choose the unpinned slot to load next data block // choose the unpinned slot to load next data block
static void updateBlockLoadSlot(SSttBlockLoadInfo* pLoadInfo) { static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) {
int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1; int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1;
if (pLoadInfo->blockData[nextSlotIndex].pin) { if (pLoadInfo->blockData[nextSlotIndex].pin) {
nextSlotIndex = nextSlotIndex ^ 1; nextSlotIndex = nextSlotIndex ^ 1;
@ -180,7 +180,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr); pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex].data; return &pInfo->blockData[pInfo->currentLoadBlockIndex].data;
_exit: _exit:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
} }
@ -325,7 +325,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
} }
int32_t startIndex = 0; int32_t startIndex = 0;
while((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) { while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) {
++startIndex; ++startIndex;
} }
@ -334,7 +334,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
} }
int32_t endIndex = startIndex; int32_t endIndex = startIndex;
while(endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) { while (endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) {
++endIndex; ++endIndex;
} }
@ -346,12 +346,12 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
for(int32_t k = startIndex; k < endIndex; ++k) { for (int32_t k = startIndex; k < endIndex; ++k) {
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block); tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
int32_t i = 0; int32_t i = 0;
int32_t rows = TARRAY2_SIZE(block.suid); int32_t rows = block.numOfRecords;
while (i < rows && block.suid->data[i] != suid) { while (i < rows && ((int64_t *)block.suids.data)[i] != suid) {
++i; ++i;
} }
@ -365,16 +365,19 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
} }
if (pStatisBlkArray->data[k].maxTbid.suid == suid) { if (pStatisBlkArray->data[k].maxTbid.suid == suid) {
taosArrayAddBatch(pBlockLoadInfo->info.pUid, &block.uid->data[i], rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.suids, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i], rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey,
taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i], rows - i); tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, &block.count->data[i], rows - i); taosArrayAddBatch(pBlockLoadInfo->info.pLastKey,
tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
} else { } else {
while (i < rows && block.suid->data[i] == suid) { while (i < rows && ((int64_t *)block.suids.data)[i] == suid) {
taosArrayPush(pBlockLoadInfo->info.pUid, &block.uid->data[i]); taosArrayPush(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i]); taosArrayPush(pBlockLoadInfo->info.pFirstKey,
taosArrayPush(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i]); tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pCount, &block.count->data[i]); taosArrayPush(pBlockLoadInfo->info.pLastKey, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)));
taosArrayPush(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)));
i += 1; i += 1;
} }
} }
@ -433,14 +436,14 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
return code; return code;
} }
static int32_t uidComparFn(const void* p1, const void* p2) { static int32_t uidComparFn(const void *p1, const void *p2) {
const uint64_t *pFirst = p1; const uint64_t *pFirst = p1;
const uint64_t *pVal = p2; const uint64_t *pVal = p2;
if (*pFirst == *pVal) { if (*pFirst == *pVal) {
return 0; return 0;
} else { } else {
return *pFirst < *pVal? -1:1; return *pFirst < *pVal ? -1 : 1;
} }
} }
@ -455,7 +458,7 @@ static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid
pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index); pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index);
pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index); pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index);
*numOfRows += *(int64_t*) taosArrayGet(pLoadInfo->info.pCount, index); *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index);
} }
} }
@ -709,7 +712,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->rInfo.uid = pBlockData->uid; pIter->rInfo.uid = pBlockData->uid;
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit: _exit:
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
} }
@ -740,7 +743,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
return -1 * tLDataIterCmprFn(p1, p2); return -1 * tLDataIterCmprFn(p1, p2);
} }
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pSttDataInfo) { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pSttDataInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pMTree->pIter = NULL; pMTree->pIter = NULL;
@ -765,12 +768,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
for (int32_t j = 0; j < numOfLevels; ++j) { for (int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
SArray * pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
SLDataIter *pIter = taosArrayGetP(pList, i); SLDataIter *pIter = taosArrayGetP(pList, i);
SSttFileReader * pSttFileReader = pIter->pReader; SSttFileReader *pSttFileReader = pIter->pReader;
SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo;
// open stt file reader if not opened yet // open stt file reader if not opened yet
@ -796,7 +799,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
int64_t numOfRows = 0; int64_t numOfRows = 0;
int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; int64_t cid = pSttLevel->fobjArr->data[i]->f->cid;
code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr); code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows,
pMTree->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _end; goto _end;
} }
@ -820,7 +824,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
return code; return code;
_end: _end:
tMergeTreeClose(pMTree); tMergeTreeClose(pMTree);
return code; return code;
} }
@ -829,8 +833,8 @@ void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTr
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; } bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) { static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) {
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[0].pin = true; pInfo->blockData[0].pin = true;
@ -842,15 +846,15 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) {
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[1].pin = true; pInfo->blockData[1].pin = true;
ASSERT(!pInfo->blockData[0].pin); ASSERT(!pInfo->blockData[0].pin);
tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
return; return;
} }
tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
} }
static void tLDataIterUnpinSttBlock(SLDataIter* pIter, const char* id) { static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) {
SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pInfo->blockData[0].pin) { if (pInfo->blockData[0].pin) {
ASSERT(!pInfo->blockData[1].pin); ASSERT(!pInfo->blockData[1].pin);
pInfo->blockData[0].pin = false; pInfo->blockData[0].pin = false;
@ -883,7 +887,7 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
return; return;
} }
SLDataIter* pIter = pMTree->pPinnedBlockIter; SLDataIter *pIter = pMTree->pPinnedBlockIter;
pMTree->pPinnedBlockIter = NULL; pMTree->pPinnedBlockIter = NULL;
tLDataIterUnpinSttBlock(pIter, pMTree->idStr); tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
} }

View File

@ -356,7 +356,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
} }
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record){ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
pBlockInfo->uid = record->uid; pBlockInfo->uid = record->uid;
pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->firstKey = record->firstKey.key.ts;
pBlockInfo->lastKey = record->lastKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts;
@ -429,7 +429,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
} }
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; SFileDataBlockInfo blockInfo = {.tbBlockIdx = i};
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
recordToBlockInfo(&blockInfo, record); recordToBlockInfo(&blockInfo, record);
taosArrayPush(pBlockIter->blockList, &blockInfo); taosArrayPush(pBlockIter->blockList, &blockInfo);
@ -464,7 +464,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
int32_t index = sup.indexPerTable[pos]++; int32_t index = sup.indexPerTable[pos]++;
SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; SFileDataBlockInfo blockInfo = {.tbBlockIdx = index};
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
recordToBlockInfo(&blockInfo, record); recordToBlockInfo(&blockInfo, record);
taosArrayPush(pBlockIter->blockList, &blockInfo); taosArrayPush(pBlockIter->blockList, &blockInfo);
@ -722,11 +722,11 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
pBlockLoadInfo->cost.statisElapsedTime += el; pBlockLoadInfo->cost.statisElapsedTime += el;
int32_t index = 0; int32_t index = 0;
while (index < TARRAY2_SIZE(pStatisBlock->suid) && pStatisBlock->suid->data[index] < suid) { while (index < pStatisBlock->numOfRecords && ((int64_t*)pStatisBlock->suids.data)[index] < suid) {
++index; ++index;
} }
if (index >= TARRAY2_SIZE(pStatisBlock->suid)) { if (index >= pStatisBlock->numOfRecords) {
tStatisBlockDestroy(pStatisBlock); tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock); taosMemoryFreeClear(pStatisBlock);
return num; return num;
@ -744,14 +744,14 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
uint64_t uid = pUidList[uidIndex]; uint64_t uid = pUidList[uidIndex];
if (pStatisBlock->uid->data[j] == uid) { if (((int64_t*)pStatisBlock->uids.data)[j] == uid) {
num += pStatisBlock->count->data[j]; num += ((int64_t*)pStatisBlock->counts.data)[j];
uidIndex += 1; uidIndex += 1;
j += 1; j += 1;
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
} else if (pStatisBlock->uid->data[j] < uid) { } else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) {
j += 1; j += 1;
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
} else { } else {
uidIndex += 1; uidIndex += 1;
} }

View File

@ -29,7 +29,8 @@ struct SSttFileReader {
TSttBlkArray sttBlkArray[1]; TSttBlkArray sttBlkArray[1];
TStatisBlkArray statisBlkArray[1]; TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1]; TTombBlkArray tombBlkArray[1];
uint8_t *bufArr[5]; uint8_t *bufArr[5]; // TODO: remove here
SBuffer *buffers;
}; };
// SSttFileReader // SSttFileReader
@ -471,27 +472,70 @@ _exit:
int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) { int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0; int64_t size = 0;
// load data from file
code = tBufferEnsureCapacity(&reader->buffers[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->buffers[0].data, statisBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode data
tStatisBlockClear(statisBlock); tStatisBlockClear(statisBlock);
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { statisBlock->numOfPKs = statisBlk->numOfPKs;
code = statisBlock->numOfRecords = statisBlk->numRec;
tsdbDecmprData(reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
&reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, &reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->config->bufArr[1], statisBlk->numRec); for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
TSDB_CHECK_CODE(code, lino, _exit); SCompressInfo info = {
.dataType = TSDB_DATA_TYPE_BIGINT,
.cmprAlg = statisBlk->cmprAlg,
.compressedSize = statisBlk->size[i],
.originalSize = statisBlk->numRec * sizeof(int64_t),
};
code = tDecompressData(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info,
&statisBlock->buffers[i], &reader->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i]; size += statisBlk->size[i];
} }
if (statisBlk->numOfPKs > 0) {
SValueColumnCompressInfo firstKeyInfos[TD_MAX_PRIMARY_KEY_COL];
SValueColumnCompressInfo lastKeyInfos[TD_MAX_PRIMARY_KEY_COL];
SBufferReader bfReader;
tBufferReaderInit(&bfReader, true, size, &reader->buffers[0]);
// decode compress info
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&bfReader, &firstKeyInfos[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnCompressInfoDecode(&bfReader, &lastKeyInfos[i]);
TSDB_CHECK_CODE(code, lino, _exit);
}
size = bfReader.offset;
// decode value columns
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), firstKeyInfos[i].compressedDataSize,
&firstKeyInfos[i], &statisBlock->firstKeyPKs[i], &reader->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += firstKeyInfos[i].compressedDataSize;
}
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), lastKeyInfos[i].compressedDataSize,
&lastKeyInfos[i], &statisBlock->lastKeyPKs[i], &reader->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += lastKeyInfos[i].compressedDataSize;
}
}
ASSERT(size == statisBlk->dp->size); ASSERT(size == statisBlk->dp->size);
_exit: _exit:
@ -524,7 +568,8 @@ struct SSttFileWriter {
// helper data // helper data
SSkmInfo skmTb[1]; SSkmInfo skmTb[1];
SSkmInfo skmRow[1]; SSkmInfo skmRow[1];
uint8_t *bufArr[5]; uint8_t *bufArr[5]; // TODO: remove this
SBuffer *buffers;
}; };
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
@ -595,45 +640,88 @@ _exit:
} }
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
if (STATIS_BLOCK_SIZE(writer->staticBlock) == 0) return 0; int32_t code = 0;
int32_t lino = 0;
STbStatisRecord record;
STbStatisBlock *statisBlock = writer->staticBlock;
SStatisBlk statisBlk = {0};
int32_t code = 0; if (statisBlock->numOfRecords == 0) {
int32_t lino = 0; return 0;
SStatisBlk statisBlk[1] = {{
.dp[0] =
{
.offset = writer->file->size,
.size = 0,
},
.minTbid =
{
.suid = TARRAY2_FIRST(writer->staticBlock->suid),
.uid = TARRAY2_FIRST(writer->staticBlock->uid),
},
.maxTbid =
{
.suid = TARRAY2_LAST(writer->staticBlock->suid),
.uid = TARRAY2_LAST(writer->staticBlock->uid),
},
.numRec = STATIS_BLOCK_SIZE(writer->staticBlock),
.cmprAlg = writer->config->cmprAlg,
}};
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->staticBlock->dataArr + i),
TARRAY2_DATA_LEN(&writer->staticBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
&writer->config->bufArr[0], 0, &statisBlk->size[i], &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], statisBlk->size[i]);
TSDB_CHECK_CODE(code, lino, _exit);
statisBlk->dp->size += statisBlk->size[i];
writer->file->size += statisBlk->size[i];
} }
code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk); statisBlk.dp->offset = writer->file->size;
statisBlk.dp->size = 0;
statisBlk.numRec = statisBlock->numOfRecords;
statisBlk.cmprAlg = writer->config->cmprAlg;
statisBlk.numOfPKs = statisBlock->numOfPKs;
tStatisBlockGet(statisBlock, 0, &record);
statisBlk.minTbid.suid = record.suid;
statisBlk.minTbid.uid = record.uid;
tStatisBlockGet(statisBlock, statisBlock->numOfRecords - 1, &record);
statisBlk.maxTbid.suid = record.suid;
statisBlk.maxTbid.uid = record.uid;
// compress each column
for (int32_t i = 0; i < ARRAY_SIZE(statisBlk.size); i++) {
SCompressInfo info = {
.dataType = TSDB_DATA_TYPE_BIGINT,
.cmprAlg = statisBlk.cmprAlg,
};
tBufferClear(&writer->buffers[0]);
code = tCompressData(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), &info,
&writer->buffers[0], &writer->buffers[1]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize);
TSDB_CHECK_CODE(code, lino, _exit);
statisBlk.size[i] = info.compressedSize;
statisBlk.dp->size += info.compressedSize;
writer->file->size += info.compressedSize;
}
// compress primary keys
if (statisBlk.numOfPKs > 0) {
SBufferWriter bfWriter;
SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg};
tBufferClear(&writer->buffers[0]);
tBufferClear(&writer->buffers[1]);
tBufferWriterInit(&bfWriter, true, 0, &writer->buffers[0]);
for (int32_t i = 0; i < statisBlk.numOfPKs; i++) {
code =
tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < statisBlk.numOfPKs; i++) {
code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[0].data, writer->buffers[0].size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->buffers[0].size;
statisBlk.dp->size += writer->buffers[0].size;
code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[1].data, writer->buffers[1].size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->buffers[1].size;
statisBlk.dp->size += writer->buffers[1].size;
}
code = TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tStatisBlockClear(writer->staticBlock); tStatisBlockClear(writer->staticBlock);
@ -918,14 +1006,8 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
TSDBKEY key[1]; STsdbRowKey key;
if (row->row.type == TSDBROW_ROW_FMT) { tsdbRowGetKey(&row->row, &key);
key->ts = row->row.pTSRow->ts;
key->version = row->row.version;
} else {
key->ts = row->row.pBlockData->aTSKEY[row->row.iRow];
key->version = row->row.pBlockData->aVersion[row->row.iRow];
}
if (writer->ctx->tbid->uid != row->uid) { if (writer->ctx->tbid->uid != row->uid) {
writer->ctx->tbid->suid = row->suid; writer->ctx->tbid->suid = row->suid;
@ -939,18 +1021,22 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
STbStatisRecord record = { STbStatisRecord record = {
.suid = row->suid, .suid = row->suid,
.uid = row->uid, .uid = row->uid,
.firstKey = key->ts, .firstKey = key.key,
.lastKey = key->ts, .lastKey = key.key,
.count = 1, .count = 1,
}; };
code = tStatisBlockPut(writer->staticBlock, &record); code = tStatisBlockPut(writer->staticBlock, &record);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
ASSERT(key->ts >= TARRAY2_LAST(writer->staticBlock->lastKey)); // update last key and count
STbStatisRecord record;
if (key->ts > TARRAY2_LAST(writer->staticBlock->lastKey)) { tStatisBlockGet(writer->staticBlock, writer->staticBlock->numOfRecords, &record);
TARRAY2_LAST(writer->staticBlock->count)++;
TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts; if (tRowKeyCmpr(&key.key, &record.lastKey) > 0) {
// TODO: update count and last key
// TARRAY2_LAST(writer->staticBlock->count)++;
// TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts;
} }
} }
@ -961,7 +1047,7 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) {
} }
// row to col conversion // row to col conversion
if (key->version <= writer->config->compactVersion // if (key.version <= writer->config->compactVersion //
&& writer->blockData->nRow > 0 // && writer->blockData->nRow > 0 //
&& (writer->blockData->uid // && (writer->blockData->uid //
? writer->blockData->uid // ? writer->blockData->uid //

View File

@ -66,40 +66,103 @@ int32_t tTombRecordCompare(const STombRecord *r1, const STombRecord *r2) {
// STbStatisBlock ---------- // STbStatisBlock ----------
int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { statisBlock->numOfPKs = 0;
TARRAY2_INIT(&statisBlock->dataArr[i]); statisBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
tBufferInit(&statisBlock->buffers[i]);
}
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnInit(&statisBlock->firstKeyPKs[i]);
tValueColumnInit(&statisBlock->lastKeyPKs[i]);
} }
return 0; return 0;
} }
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) { int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { statisBlock->numOfPKs = 0;
TARRAY2_DESTROY(&statisBlock->dataArr[i], NULL); statisBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
tBufferDestroy(&statisBlock->buffers[i]);
}
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnDestroy(&statisBlock->firstKeyPKs[i]);
tValueColumnDestroy(&statisBlock->lastKeyPKs[i]);
} }
return 0; return 0;
} }
int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { int32_t tStatisBlockClear(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { statisBlock->numOfPKs = 0;
TARRAY2_CLEAR(&statisBlock->dataArr[i], NULL); statisBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
tBufferClear(&statisBlock->buffers[i]);
}
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnClear(&statisBlock->firstKeyPKs[i]);
tValueColumnClear(&statisBlock->lastKeyPKs[i]);
} }
return 0; return 0;
} }
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record) { int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record) {
int32_t code; int32_t code;
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) {
code = TARRAY2_APPEND(&statisBlock->dataArr[i], record->dataArr[i]); if (statisBlock->numOfRecords == 0) {
statisBlock->numOfPKs = record->firstKey.numOfPKs;
}
ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs);
ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs);
code = tBufferAppend(&statisBlock->suids, &record->suid, sizeof(record->suid));
if (code) return code;
code = tBufferAppend(&statisBlock->uids, &record->uid, sizeof(record->uid));
if (code) return code;
code = tBufferAppend(&statisBlock->firstKeyTimestamps, &record->firstKey.ts, sizeof(record->firstKey.ts));
if (code) return code;
code = tBufferAppend(&statisBlock->lastKeyTimestamps, &record->lastKey.ts, sizeof(record->lastKey.ts));
if (code) return code;
code = tBufferAppend(&statisBlock->counts, &record->count, sizeof(record->count));
if (code) return code;
for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) {
code = tValueColumnAppend(&statisBlock->firstKeyPKs[i], &record->firstKey.pks[i]);
if (code) return code;
code = tValueColumnAppend(&statisBlock->lastKeyPKs[i], &record->lastKey.pks[i]);
if (code) return code; if (code) return code;
} }
statisBlock->numOfRecords++;
return 0; return 0;
} }
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) { int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) {
if (idx >= STATIS_BLOCK_SIZE(statisBlock)) return TSDB_CODE_OUT_OF_RANGE; int32_t code;
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) {
record->dataArr[i] = TARRAY2_GET(&statisBlock->dataArr[i], idx); if (idx < 0 || idx >= statisBlock->numOfRecords) {
return TSDB_CODE_OUT_OF_RANGE;
} }
code = tBufferGet(&statisBlock->suids, idx, sizeof(record->suid), &record->suid);
if (code) return code;
code = tBufferGet(&statisBlock->uids, idx, sizeof(record->uid), &record->uid);
if (code) return code;
code = tBufferGet(&statisBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.ts), &record->firstKey.ts);
if (code) return code;
code = tBufferGet(&statisBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.ts), &record->lastKey.ts);
if (code) return code;
code = tBufferGet(&statisBlock->counts, idx, sizeof(record->count), &record->count);
if (code) return code;
record->firstKey.numOfPKs = statisBlock->numOfPKs;
record->lastKey.numOfPKs = statisBlock->numOfPKs;
for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) {
code = tValueColumnGet(&statisBlock->firstKeyPKs[i], idx, &record->firstKey.pks[i]);
if (code) return code;
code = tValueColumnGet(&statisBlock->lastKeyPKs[i], idx, &record->lastKey.pks[i]);
if (code) return code;
}
return 0; return 0;
} }

View File

@ -70,27 +70,29 @@ int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record);
int32_t tTombRecordCompare(const STombRecord *record1, const STombRecord *record2); int32_t tTombRecordCompare(const STombRecord *record1, const STombRecord *record2);
// STbStatisRecord ---------- // STbStatisRecord ----------
#define STATIS_RECORD_NUM_ELEM 5 typedef struct {
typedef union { int64_t suid;
int64_t dataArr[STATIS_RECORD_NUM_ELEM]; int64_t uid;
struct { SRowKey firstKey;
int64_t suid; SRowKey lastKey;
int64_t uid; int64_t count;
int64_t firstKey;
int64_t lastKey;
int64_t count;
};
} STbStatisRecord; } STbStatisRecord;
typedef union { typedef struct {
TARRAY2(int64_t) dataArr[STATIS_RECORD_NUM_ELEM]; int8_t numOfPKs;
struct { int32_t numOfRecords;
TARRAY2(int64_t) suid[1]; union {
TARRAY2(int64_t) uid[1]; SBuffer buffers[5];
TARRAY2(int64_t) firstKey[1]; struct {
TARRAY2(int64_t) lastKey[1]; SBuffer suids;
TARRAY2(int64_t) count[1]; SBuffer uids;
SBuffer firstKeyTimestamps;
SBuffer lastKeyTimestamps;
SBuffer counts;
};
}; };
SValueColumn firstKeyPKs[TD_MAX_PRIMARY_KEY_COL];
SValueColumn lastKeyPKs[TD_MAX_PRIMARY_KEY_COL];
} STbStatisBlock; } STbStatisBlock;
typedef struct { typedef struct {
@ -98,12 +100,13 @@ typedef struct {
TABLEID minTbid; TABLEID minTbid;
TABLEID maxTbid; TABLEID maxTbid;
int32_t numRec; int32_t numRec;
int32_t size[STATIS_RECORD_NUM_ELEM]; int32_t size[5];
int8_t cmprAlg; int8_t cmprAlg;
int8_t rsvd[7]; int8_t numOfPKs; // number of primary keys
int8_t rsvd[6];
} SStatisBlk; } SStatisBlk;
#define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) #define STATIS_BLOCK_SIZE(db) ((db)->numOfRecords)
int32_t tStatisBlockInit(STbStatisBlock *statisBlock); int32_t tStatisBlockInit(STbStatisBlock *statisBlock);
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock); int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock);