more work
This commit is contained in:
parent
94069ec20a
commit
3698016ad9
|
@ -603,6 +603,7 @@ struct SDataFWriter {
|
||||||
uint8_t *pBuf1;
|
uint8_t *pBuf1;
|
||||||
uint8_t *pBuf2;
|
uint8_t *pBuf2;
|
||||||
uint8_t *pBuf3;
|
uint8_t *pBuf3;
|
||||||
|
uint8_t *pBuf4;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STsdbReadSnap {
|
struct STsdbReadSnap {
|
||||||
|
|
|
@ -1596,6 +1596,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
|
||||||
tFree((*ppWriter)->pBuf1);
|
tFree((*ppWriter)->pBuf1);
|
||||||
tFree((*ppWriter)->pBuf2);
|
tFree((*ppWriter)->pBuf2);
|
||||||
tFree((*ppWriter)->pBuf3);
|
tFree((*ppWriter)->pBuf3);
|
||||||
|
tFree((*ppWriter)->pBuf4);
|
||||||
taosMemoryFree(*ppWriter);
|
taosMemoryFree(*ppWriter);
|
||||||
_exit:
|
_exit:
|
||||||
*ppWriter = NULL;
|
*ppWriter = NULL;
|
||||||
|
@ -1919,14 +1920,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
.nRow = pBlockData->nRow,
|
.nRow = pBlockData->nRow,
|
||||||
.cmprAlg = cmprAlg};
|
.cmprAlg = cmprAlg};
|
||||||
|
|
||||||
SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol));
|
|
||||||
if (aBlockCol == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// encode =================
|
// encode =================
|
||||||
// columns
|
// columns AND SBlockCol
|
||||||
int32_t nBuf1 = 0;
|
int32_t nBuf1 = 0;
|
||||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
||||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
||||||
|
@ -1949,89 +1944,83 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
|
nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(aBlockCol, &blockCol) == NULL) {
|
code = tRealloc(&pWriter->pBuf2, hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
if (code) goto _err;
|
||||||
goto _err;
|
hdr.szBlkCol += tPutBlockCol(pWriter->pBuf2 + hdr.szBlkCol, &blockCol);
|
||||||
}
|
|
||||||
|
|
||||||
hdr.szBlkCol += tPutBlockCol(NULL, &blockCol);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// uid
|
int32_t nBuf2 = 0;
|
||||||
|
if (hdr.szBlkCol > 0) {
|
||||||
|
nBuf2 = hdr.szBlkCol + sizeof(TSCKSUM);
|
||||||
|
|
||||||
|
code = tRealloc(&pWriter->pBuf2, nBuf2);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
taosCalcChecksumAppend(0, pWriter->pBuf2, nBuf2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// uid + version + tskey
|
||||||
|
int32_t nBuf3 = 0;
|
||||||
if (pBlockData->uid == 0) {
|
if (pBlockData->uid == 0) {
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
|
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
|
||||||
&pWriter->pBuf2, 0, &hdr.szUid, &pWriter->pBuf3);
|
&pWriter->pBuf3, nBuf3, &hdr.szUid, &pWriter->pBuf4);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
nBuf3 += hdr.szUid;
|
||||||
|
|
||||||
// version
|
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
|
||||||
cmprAlg, &pWriter->pBuf2, hdr.szUid, &hdr.szVer, &pWriter->pBuf3);
|
cmprAlg, &pWriter->pBuf3, nBuf3, &hdr.szVer, &pWriter->pBuf4);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
nBuf3 += hdr.szVer;
|
||||||
|
|
||||||
// tskey
|
|
||||||
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
|
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
cmprAlg, &pWriter->pBuf2, hdr.szUid + hdr.szVer, &hdr.szKey, &pWriter->pBuf3);
|
cmprAlg, &pWriter->pBuf3, nBuf3, &hdr.szKey, &pWriter->pBuf4);
|
||||||
|
if (code) goto _err;
|
||||||
|
nBuf3 += hdr.szKey;
|
||||||
|
|
||||||
|
nBuf3 += sizeof(TSCKSUM);
|
||||||
|
code = tRealloc(&pWriter->pBuf3, nBuf3);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// hdr
|
// hdr
|
||||||
pBlkInfo->szKey = tPutDiskDataHdr(NULL, &hdr);
|
int32_t nBuf4 = tPutDiskDataHdr(NULL, &hdr);
|
||||||
code = tRealloc(&pWriter->pBuf3, pBlkInfo->szKey);
|
code = tRealloc(&pWriter->pBuf4, nBuf4);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
tPutDiskDataHdr(pWriter->pBuf3, &hdr);
|
tPutDiskDataHdr(pWriter->pBuf4, &hdr);
|
||||||
TSCKSUM cksm = taosCalcChecksum(0, pWriter->pBuf3, pBlkInfo->szKey);
|
taosCalcChecksumAppend(taosCalcChecksum(0, pWriter->pBuf4, nBuf4), pWriter->pBuf3, nBuf3);
|
||||||
|
|
||||||
code = tRealloc(&pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
|
|
||||||
if (code) goto _err;
|
|
||||||
taosCalcChecksumAppend(cksm, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
// write =================
|
// write =================
|
||||||
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
|
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
|
||||||
|
|
||||||
// hdr
|
pBlkInfo->szKey = nBuf4 + nBuf3;
|
||||||
int64_t n = taosWriteFile(pFD, pWriter->pBuf3, pBlkInfo->szKey);
|
pBlkInfo->szBlock = nBuf1 + nBuf2 + nBuf3 + nBuf4;
|
||||||
|
|
||||||
|
int64_t n = taosWriteFile(pFD, pWriter->pBuf4, nBuf4);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// uid + version + tskey + (CKSM)
|
n = taosWriteFile(pFD, pWriter->pBuf3, nBuf3);
|
||||||
n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
|
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pBlkInfo->szKey = pBlkInfo->szKey + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM);
|
|
||||||
pBlkInfo->szBlock += pBlkInfo->szKey;
|
|
||||||
|
|
||||||
// aBlockCol
|
if (nBuf2) {
|
||||||
if (hdr.szBlkCol > 0) {
|
n = taosWriteFile(pFD, pWriter->pBuf2, nBuf2);
|
||||||
code = tRealloc(&pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
n = 0;
|
|
||||||
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
|
|
||||||
n += tPutBlockCol(pWriter->pBuf2 + n, taosArrayGet(aBlockCol, iBlockCol));
|
|
||||||
}
|
|
||||||
ASSERT(n == hdr.szBlkCol);
|
|
||||||
taosCalcChecksumAppend(0, pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
|
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// colmns
|
if (nBuf1) {
|
||||||
if (nBuf1 > 0) {
|
|
||||||
n = taosWriteFile(pFD, pWriter->pBuf1, nBuf1);
|
n = taosWriteFile(pFD, pWriter->pBuf1, nBuf1);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlkInfo->szBlock += nBuf1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update info
|
// update info
|
||||||
|
@ -2048,12 +2037,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
taosArrayDestroy(aBlockCol);
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||||
taosArrayDestroy(aBlockCol);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue