more work
This commit is contained in:
parent
2a6be2bc8a
commit
ed69d8ee9f
|
@ -738,98 +738,135 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
|
||||||
|
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
|
||||||
|
uint8_t **ppBuf2) {
|
||||||
|
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
|
||||||
|
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t offset;
|
||||||
|
int64_t size;
|
||||||
|
int64_t n;
|
||||||
|
|
||||||
|
tBlockDataReset(pBlockData);
|
||||||
|
pBlockData->nRow = pSubBlock->nRow;
|
||||||
|
|
||||||
|
// TSDBKEY
|
||||||
|
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
|
||||||
|
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
|
||||||
|
code = tsdbRealloc(ppBuf1, size);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||||
|
if (n < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
n = taosReadFile(pFD, *ppBuf1, size);
|
||||||
|
if (n < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
} else if (n < size) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// OTHER
|
||||||
|
SBlockCol blockCol;
|
||||||
|
SBlockCol *pBlockCol = &blockCol;
|
||||||
|
SColData *pColData;
|
||||||
|
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
||||||
|
int16_t cid = aColId[iCol];
|
||||||
|
|
||||||
|
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
|
||||||
|
0) {
|
||||||
|
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
|
||||||
|
if (pBlockCol->flag == HAS_NULL) {
|
||||||
|
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
|
||||||
|
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset;
|
||||||
|
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
||||||
|
|
||||||
|
code = tsdbRealloc(ppBuf1, size);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
// seek
|
||||||
|
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||||
|
if (n < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read
|
||||||
|
n = taosReadFile(pFD, *ppBuf1, size);
|
||||||
|
if (n < 0) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
} else if (n < size) {
|
||||||
|
code = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
|
||||||
|
if (code) goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
|
||||||
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
|
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
|
uint8_t *pBuf1 = NULL;
|
||||||
uint8_t *pBuf1 = NULL;
|
uint8_t *pBuf2 = NULL;
|
||||||
uint8_t *pBuf2 = NULL;
|
|
||||||
|
|
||||||
ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID);
|
ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||||
|
|
||||||
if (!ppBuf1) ppBuf1 = &pBuf1;
|
if (!ppBuf1) ppBuf1 = &pBuf1;
|
||||||
if (!ppBuf2) ppBuf2 = &pBuf2;
|
if (!ppBuf2) ppBuf2 = &pBuf2;
|
||||||
|
|
||||||
for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2);
|
||||||
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
|
if (code) goto _err;
|
||||||
int64_t offset;
|
|
||||||
int64_t size;
|
|
||||||
int64_t n;
|
|
||||||
|
|
||||||
tBlockDataReset(pBlockData);
|
if (pBlock->nSubBlock > 1) {
|
||||||
pBlockData->nRow = pSubBlock->nRow;
|
SBlockData *pBlockData1 = &(SBlockData){0};
|
||||||
|
SBlockData *pBlockData2 = &(SBlockData){0};
|
||||||
|
|
||||||
// TSDBKEY
|
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
||||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
|
code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2);
|
||||||
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
|
if (code) goto _err;
|
||||||
code = tsdbRealloc(ppBuf1, size);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
code = tBlockDataCopy(pBlockData, pBlockData2);
|
||||||
if (n < 0) {
|
if (code) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
tBlockDataClear(pBlockData1);
|
||||||
goto _err;
|
tBlockDataClear(pBlockData2);
|
||||||
}
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
n = taosReadFile(pFD, *ppBuf1, size);
|
code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData);
|
||||||
if (n < 0) {
|
if (code) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
tBlockDataClear(pBlockData1);
|
||||||
goto _err;
|
tBlockDataClear(pBlockData2);
|
||||||
} else if (n < size) {
|
goto _err;
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// OTHER
|
|
||||||
SBlockCol blockCol;
|
|
||||||
SBlockCol *pBlockCol = &blockCol;
|
|
||||||
SColData *pColData;
|
|
||||||
for (int32_t iCol = 0; iCol < nCol; iCol++) {
|
|
||||||
int16_t cid = aColId[iCol];
|
|
||||||
|
|
||||||
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
|
|
||||||
0) {
|
|
||||||
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
|
|
||||||
if (pBlockCol->flag == HAS_NULL) {
|
|
||||||
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
|
|
||||||
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset;
|
|
||||||
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
|
||||||
|
|
||||||
code = tsdbRealloc(ppBuf1, size);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// seek
|
|
||||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// read
|
|
||||||
n = taosReadFile(pFD, *ppBuf1, size);
|
|
||||||
if (n < 0) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
} else if (n < size) {
|
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tBlockDataClear(pBlockData1);
|
||||||
|
tBlockDataClear(pBlockData2);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbFree(pBuf1);
|
tsdbFree(pBuf1);
|
||||||
|
@ -876,34 +913,6 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// // check
|
|
||||||
// p = *ppBuf1;
|
|
||||||
// SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
|
|
||||||
// ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
|
|
||||||
// ASSERT(pHdr->suid == pBlockIdx->suid);
|
|
||||||
// ASSERT(pHdr->uid == pBlockIdx->uid);
|
|
||||||
// p += sizeof(*pHdr);
|
|
||||||
|
|
||||||
// if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) {
|
|
||||||
// code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
// p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
// for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
|
|
||||||
// tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
|
|
||||||
|
|
||||||
// ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
|
|
||||||
|
|
||||||
// if (pBlockCol->flag == HAS_NULL) continue;
|
|
||||||
|
|
||||||
// if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
|
|
||||||
// code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
// p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// recover
|
// recover
|
||||||
pBlockData->nRow = pSubBlock->nRow;
|
pBlockData->nRow = pSubBlock->nRow;
|
||||||
p = *ppBuf1 + sizeof(SBlockDataHdr);
|
p = *ppBuf1 + sizeof(SBlockDataHdr);
|
||||||
|
@ -964,7 +973,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
|
||||||
SBlockData *pBlockData2 = &(SBlockData){0};
|
SBlockData *pBlockData2 = &(SBlockData){0};
|
||||||
|
|
||||||
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
|
||||||
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
|
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
|
||||||
if (code) {
|
if (code) {
|
||||||
tBlockDataClear(pBlockData1);
|
tBlockDataClear(pBlockData1);
|
||||||
tBlockDataClear(pBlockData2);
|
tBlockDataClear(pBlockData2);
|
||||||
|
|
Loading…
Reference in New Issue