more work
This commit is contained in:
parent
43dec55fd7
commit
7c32e099d0
|
@ -253,12 +253,12 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
|
||||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
|
||||||
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
|
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
|
||||||
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
|
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL);
|
||||||
|
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
|
||||||
|
|
||||||
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
||||||
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
|
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
|
||||||
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
|
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
|
||||||
uint8_t **ppBuf1, uint8_t **ppBuf2);
|
uint8_t **ppBuf1, uint8_t **ppBuf2);
|
||||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf);
|
|
||||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
|
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
|
||||||
uint8_t **ppBuf2);
|
uint8_t **ppBuf2);
|
||||||
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
|
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
|
||||||
|
|
|
@ -2806,7 +2806,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
|
|
||||||
if (tBlockHasSma(pBlock)) {
|
if (tBlockHasSma(pBlock)) {
|
||||||
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg, NULL);
|
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pSup->pColAgg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
|
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code),
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
|
@ -829,6 +829,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr *pHdr, SArray *aBlockCol) {
|
static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr *pHdr, SArray *aBlockCol) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
@ -862,6 +863,7 @@ static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg,
|
static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg,
|
||||||
uint8_t **ppOut, uint8_t **ppBuf) {
|
uint8_t **ppOut, uint8_t **ppBuf) {
|
||||||
|
@ -1349,30 +1351,21 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf) {
|
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
#if 0
|
SSmaInfo *pSmaInfo = &pBlock->smaInfo;
|
||||||
TdFilePtr pFD = pReader->pSmaFD;
|
|
||||||
int64_t offset = pBlock->aSubBlock[0].sOffset;
|
|
||||||
int64_t size = pBlock->aSubBlock[0].nSma * sizeof(SColumnDataAgg) + sizeof(TSCKSUM);
|
|
||||||
uint8_t *pBuf = NULL;
|
|
||||||
int64_t n;
|
|
||||||
|
|
||||||
ASSERT(tBlockHasSma(pBlock));
|
ASSERT(pSmaInfo->size > 0);
|
||||||
|
|
||||||
if (!ppBuf) ppBuf = &pBuf;
|
taosArrayClear(aColumnDataAgg);
|
||||||
code = tRealloc(ppBuf, size);
|
|
||||||
|
// alloc
|
||||||
|
int32_t size = pSmaInfo->size + sizeof(TSCKSUM);
|
||||||
|
code = tRealloc(&pReader->pBuf1, size);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// lseek
|
|
||||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// read
|
// read
|
||||||
n = taosReadFile(pFD, *ppBuf, size);
|
int64_t n = taosReadFile(pReader->pSmaFD, pReader->pBuf1, size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -1382,27 +1375,27 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
|
||||||
}
|
}
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (!taosCheckChecksumWhole(*ppBuf, size)) {
|
if (!taosCheckChecksumWhole(pReader->pBuf1, size)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
taosArrayClear(aColumnDataAgg);
|
n = 0;
|
||||||
for (int32_t iSma = 0; iSma < pBlock->aSubBlock[0].nSma; iSma++) {
|
while (n < pSmaInfo->size) {
|
||||||
if (taosArrayPush(aColumnDataAgg, &((SColumnDataAgg *)(*ppBuf))[iSma]) == NULL) {
|
SColumnDataAgg sma;
|
||||||
|
|
||||||
|
n += tGetColumnDataAgg(pReader->pBuf1 + n, &sma);
|
||||||
|
if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tFree(pBuf);
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d, read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||||
tFree(pBuf);
|
|
||||||
#endif
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1914,8 +1907,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
int8_t cmprAlg, int8_t toLast) {
|
int8_t cmprAlg, int8_t toLast) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
|
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
|
||||||
SDiskData *pDiskData = &pWriter->dData;
|
|
||||||
|
|
||||||
|
ASSERT(pBlockData->nRow > 0);
|
||||||
|
|
||||||
|
// ================= DATA ====================
|
||||||
#if 0
|
#if 0
|
||||||
// convert
|
// convert
|
||||||
code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg);
|
code = tBlockToDiskData(pBlockData, pDiskData, cmprAlg);
|
||||||
|
@ -1993,6 +1988,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue