Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact

This commit is contained in:
Haojun Liao 2022-06-30 23:38:56 +08:00
commit deb81301e8
5 changed files with 60 additions and 230 deletions

View File

@ -251,6 +251,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf);
// tsdbCache // tsdbCache
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache); void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
@ -394,9 +395,9 @@ typedef struct {
int64_t nRow; int64_t nRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t offset; int64_t offset;
int64_t vsize; // VERSION size int64_t szVersion; // VERSION size
int64_t ksize; // TSKEY size int64_t szTSKEY; // TSKEY size
int64_t bsize; // total block size int64_t szBlock; // total block size
SMapData mBlockCol; // SMapData<SBlockCol> SMapData mBlockCol; // SMapData<SBlockCol>
} SSubBlock; } SSubBlock;

View File

@ -137,9 +137,12 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
if (row->ts >= cacheRow->ts) { if (row->ts >= cacheRow->ts) {
if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) {
tdRowCpy(cacheRow, row); tdRowCpy(cacheRow, row);
taosLRUCacheRelease(pCache, h, false);
} else { } else {
tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX); taosLRUCacheRelease(pCache, h, true);
tsdbCacheInsertLastrow(pCache, uid, row); /* tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX); */
tsdbCacheInsertLast(pCache, uid, row);
} }
} }
} else { } else {
@ -178,31 +181,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
return suid; return suid;
} }
/*
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow
**ppRow) { int32_t code = 0;
if (mem) {
STbData *pMem = NULL;
STbDataIter* iter; // mem buffer skip list iterator
tsdbGetTbDataFromMemTable(mem, suid, uid, &pMem);
if (pMem != NULL) {
tsdbTbDataIterCreate(pMem, NULL, 1, &iter);
if (iter != NULL) {
TSDBROW *row = tsdbTbDataIterGet(iter);
tsdbTbDataIterDestroy(iter);
}
}
} else {
*ppRow = NULL;
}
return code;
}
*/
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) { static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
int32_t code = 0; int32_t code = 0;
@ -300,99 +279,6 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
_err: _err:
return code; return code;
} }
#if 0
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, SArray *pSkyline,
STsdb *pTsdb, STSRow **ppLastRow) {
int32_t code = 0;
TSDBROW *pMemRow = NULL;
TSDBROW *pIMemRow = NULL;
TSDBKEY memKey = TSDBKEY_MIN;
TSDBKEY imemKey = TSDBKEY_MIN;
if (iter != NULL) {
pMemRow = tsdbTbDataIterGet(iter);
if (pMemRow) {
memKey = tsdbRowKey(pMemRow);
}
}
if (iter != NULL) {
pIMemRow = tsdbTbDataIterGet(iiter);
if (pIMemRow) {
imemKey = tsdbRowKey(pIMemRow);
}
}
SDataFReader *pDataFReader;
code = tsdbDataFReaderOpen(&pDataFReader, pTsdb, pFileSet);
if (code) goto _err;
SMapData blockIdxMap;
tMapDataReset(&blockIdxMap);
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
if (code) goto _err;
SBlockIdx blockIdx = {0};
tBlockIdxReset(&blockIdx);
code = tMapDataSearch(&blockIdxMap, pBlockIdx, tGetBlockIdx, tCmprBlockIdx, &blockIdx);
if (code) goto _err;
SMapData blockMap = {0};
tMapDataReset(&blockMap);
code = tsdbReadBlock(pDataFReader, &blockIdx, &blockMap, NULL);
if (code) goto _err;
int32_t nBlock = blockMap.nItem;
for (int32_t iBlock = nBlock - 1; iBlock >= 0; --iBlock) {
SBlock block = {0};
SBlockData blockData = {0};
tBlockReset(&block);
tBlockDataReset(&blockData);
tMapDataGetItemByIdx(&blockMap, iBlock, &block, tGetBlock);
code = tsdbReadBlockData(pDataFReader, &blockIdx, &block, &blockData, NULL, NULL);
if (code) goto _err;
int32_t nRow = blockData.nRow;
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
TSDBROW row = tsdbRowFromBlockData(&blockData, iRow);
TSDBKEY key = tsdbRowKey(&row);
if (pMemRow != NULL && pIMemRow != NULL) {
int32_t c = tsdbKeyCmprFn(memKey, imemKey);
if (c < 0) {
} else if (c > 0) {
} else {
}
} else if (pMemRow != NULL) {
pMemRow = tsdbTbDataIterGet(iter);
} else if (pIMemRow != NULL) {
} else {
if (!tsdbKeyDeleted(key, pSkyline)) {
code = buildTsrowFromTsdbrow(&row, ppLastRow);
goto _done;
} else {
continue;
}
}
// select current row if outside delete area
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
}
}
_done:
tsdbDataFReaderClose(&pDataFReader);
return code;
_err:
return code;
}
#endif
typedef enum SFSNEXTROWSTATES { typedef enum SFSNEXTROWSTATES {
SFSNEXTROW_FS, SFSNEXTROW_FS,
@ -1047,60 +933,6 @@ _err:
return code; return code;
} }
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "lr", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLastRow(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
}
tsdbCacheInsertLastrow(pCache, uid, pRow);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
// taosLRUCacheRelease(pCache, h, true);
return code;
}
#if 0
int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
getTableCacheKey(uid, "l", key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
} else {
STSRow *pRow = NULL;
code = mergeLast(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
return -1;
}
tsdbCacheInsertLast(pCache, uid, pRow);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
*ppRow = (STSRow *)taosLRUCacheValue(pCache, h);
}
// taosLRUCacheRelease(pCache, h, true);
return code;
}
#endif
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) {
int32_t code = 0; int32_t code = 0;
char key[32] = {0}; char key[32] = {0};
@ -1125,7 +957,6 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
} }
*handle = h; *handle = h;
// taosLRUCacheRelease(pCache, h, true);
return code; return code;
} }
@ -1155,7 +986,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
} }
*handle = h; *handle = h;
// taosLRUCacheRelease(pCache, h, true);
return code; return code;
} }

View File

@ -117,6 +117,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
SLastrowReader* pr = pReader; SLastrowReader* pr = pReader;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL; LRUHandle* h = NULL;
STSRow* pRow = NULL; STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList); size_t numOfTables = taosArrayGetSize(pr->pTableList);
@ -128,8 +129,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
/* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -138,7 +138,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
if (pRow->ts > lastKey) { if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not. // appended or not.
@ -151,14 +151,13 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
lastKey = pRow->ts; lastKey = pRow->ts;
} }
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); tsdbCacheRelease(lruCache, h);
} }
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
/* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -168,10 +167,10 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
saveOneRow(pRow, pResBlock, pr, slotIds); saveOneRow(pRow, pResBlock, pr, slotIds);
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1; pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {

View File

@ -635,7 +635,7 @@ _err:
static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int64_t size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
int64_t n; int64_t n;
if (!taosCheckChecksumWhole(pBuf, size)) { if (!taosCheckChecksumWhole(pBuf, size)) {
@ -649,15 +649,15 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
if (code) goto _err; if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) { if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow); ASSERT(pSubBlock->szVersion == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow); ASSERT(pSubBlock->szTSKEY == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION // VERSION
memcpy(pBlockData->aVersion, pBuf, pSubBlock->vsize); memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion);
// TSKEY // TSKEY
pBuf = pBuf + pSubBlock->vsize; pBuf = pBuf + pSubBlock->szVersion;
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->vsize, pSubBlock->ksize); memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY);
} else { } else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
@ -666,7 +666,7 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
} }
// VERSION // VERSION
n = tsDecompressBigint(pBuf, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion, n = tsDecompressBigint(pBuf, pSubBlock->szVersion, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) { if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
@ -674,8 +674,8 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
} }
// TSKEY // TSKEY
pBuf = pBuf + pSubBlock->vsize; pBuf = pBuf + pSubBlock->szVersion;
n = tsDecompressTimestamp(pBuf, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY, n = tsDecompressTimestamp(pBuf, pSubBlock->szTSKEY, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) { if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
@ -768,7 +768,7 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
// TSDBKEY // TSDBKEY
offset = pSubBlock->offset + sizeof(SBlockDataHdr); offset = pSubBlock->offset + sizeof(SBlockDataHdr);
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size); code = tsdbRealloc(ppBuf1, size);
if (code) goto _err; if (code) goto _err;
@ -809,8 +809,8 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
if (code) goto _err; if (code) goto _err;
} }
} else { } else {
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM) + offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->szVersion + pSubBlock->szTSKEY +
pBlockCol->offset; sizeof(TSCKSUM) + pBlockCol->offset;
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size); code = tsdbRealloc(ppBuf1, size);
@ -909,7 +909,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
// realloc // realloc
code = tsdbRealloc(ppBuf1, pSubBlock->bsize); code = tsdbRealloc(ppBuf1, pSubBlock->szBlock);
if (code) goto _err; if (code) goto _err;
// seek // seek
@ -920,11 +920,11 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
} }
// read // read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize); n = taosReadFile(pFD, *ppBuf1, pSubBlock->szBlock);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} else if (n < pSubBlock->bsize) { } else if (n < pSubBlock->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; goto _err;
} }
@ -935,7 +935,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
if (code) goto _err; if (code) goto _err;
p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); p = p + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
SColData *pColData; SColData *pColData;
@ -1411,7 +1411,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
} else { } else {
pSubBlock->offset = pWriter->wSet.fData.size; pSubBlock->offset = pWriter->wSet.fData.size;
} }
pSubBlock->bsize = 0; pSubBlock->szBlock = 0;
// HDR // HDR
n = taosWriteFile(pFileFD, &hdr, sizeof(hdr)); n = taosWriteFile(pFileFD, &hdr, sizeof(hdr));
@ -1419,29 +1419,29 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pSubBlock->bsize += n; pSubBlock->szBlock += n;
// TSDBKEY // TSDBKEY
if (cmprAlg == NO_COMPRESSION) { if (cmprAlg == NO_COMPRESSION) {
cksm = 0; cksm = 0;
// version // version
pSubBlock->vsize = sizeof(int64_t) * pBlockData->nRow; pSubBlock->szVersion = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->vsize); n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->szVersion);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->vsize); cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->szVersion);
// TSKEY // TSKEY
pSubBlock->ksize = sizeof(TSKEY) * pBlockData->nRow; pSubBlock->szTSKEY = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->ksize); n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->szTSKEY);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->ksize); cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->szTSKEY);
// cksm // cksm
size = sizeof(cksm); size = sizeof(cksm);
@ -1470,19 +1470,19 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _err; goto _err;
} }
pSubBlock->vsize = n; pSubBlock->szVersion = n;
// TSKEY // TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->vsize, size - pSubBlock->vsize, cmprAlg, *ppBuf2, size); *ppBuf1 + pSubBlock->szVersion, size - pSubBlock->szVersion, cmprAlg, *ppBuf2, size);
if (n <= 0) { if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR; code = TSDB_CODE_COMPRESS_ERROR;
goto _err; goto _err;
} }
pSubBlock->ksize = n; pSubBlock->szTSKEY = n;
// cksm // cksm
n = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); n = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
ASSERT(n <= size); ASSERT(n <= size);
taosCalcChecksumAppend(0, *ppBuf1, n); taosCalcChecksumAppend(0, *ppBuf1, n);
@ -1493,7 +1493,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
goto _err; goto _err;
} }
} }
pSubBlock->bsize += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); pSubBlock->szBlock += (pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
// other columns // other columns
offset = 0; offset = 0;
@ -1580,7 +1580,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// state // state
offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
pSubBlock->bsize = pSubBlock->bsize + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); pSubBlock->szBlock = pSubBlock->szBlock + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
} }
code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol); code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol);
@ -1588,9 +1588,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
} }
if (pBlock->last) { if (pBlock->last) {
pWriter->wSet.fLast.size += pSubBlock->bsize; pWriter->wSet.fLast.size += pSubBlock->szBlock;
} else { } else {
pWriter->wSet.fData.size += pSubBlock->bsize; pWriter->wSet.fData.size += pSubBlock->szBlock;
} }
tsdbFree(pBuf1); tsdbFree(pBuf1);

View File

@ -388,9 +388,9 @@ void tBlockReset(SBlock *pBlock) {
pBlock->aSubBlock[iSubBlock].nRow = 0; pBlock->aSubBlock[iSubBlock].nRow = 0;
pBlock->aSubBlock[iSubBlock].cmprAlg = -1; pBlock->aSubBlock[iSubBlock].cmprAlg = -1;
pBlock->aSubBlock[iSubBlock].offset = -1; pBlock->aSubBlock[iSubBlock].offset = -1;
pBlock->aSubBlock[iSubBlock].vsize = -1; pBlock->aSubBlock[iSubBlock].szVersion = -1;
pBlock->aSubBlock[iSubBlock].ksize = -1; pBlock->aSubBlock[iSubBlock].szTSKEY = -1;
pBlock->aSubBlock[iSubBlock].bsize = -1; pBlock->aSubBlock[iSubBlock].szBlock = -1;
tMapDataReset(&pBlock->aSubBlock->mBlockCol); tMapDataReset(&pBlock->aSubBlock->mBlockCol);
} }
pBlock->nSubBlock = 0; pBlock->nSubBlock = 0;
@ -417,9 +417,9 @@ int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) {
pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow; pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow;
pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg; pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg;
pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset; pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset;
pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize; pBlockDest->aSubBlock[iSubBlock].szVersion = pBlockSrc->aSubBlock[iSubBlock].szVersion;
pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize; pBlockDest->aSubBlock[iSubBlock].szTSKEY = pBlockSrc->aSubBlock[iSubBlock].szTSKEY;
pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize; pBlockDest->aSubBlock[iSubBlock].szBlock = pBlockSrc->aSubBlock[iSubBlock].szBlock;
code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol); code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol);
if (code) goto _exit; if (code) goto _exit;
} }
@ -444,9 +444,9 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].vsize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol); n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol);
} }
@ -469,9 +469,9 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].vsize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol); n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol);
} }