From a8b919918f79097b7aeb3a7a9990e7e3485a8385 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 21 Sep 2022 17:58:44 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 23 +++++ source/dnode/vnode/src/tsdb/tsdbDiskData.c | 25 ++---- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 87 +++++++++++++++++++ 3 files changed, 116 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c2a31679d0..e67fa2ee2d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -32,6 +32,12 @@ extern "C" { #define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \ + if (CODE) { \ + LINO = __LINE__; \ + goto LABEL; \ + } + typedef struct TSDBROW TSDBROW; typedef struct TABLEID TABLEID; typedef struct TSDBKEY TSDBKEY; @@ -65,6 +71,8 @@ typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; +typedef struct SDiskCol SDiskCol; +typedef struct SDiskData SDiskData; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 @@ -652,6 +660,21 @@ typedef struct { STSchema *pTSchema; } SSkmInfo; +struct SDiskCol { + SBlockCol bCol; + const uint8_t *pBit; + const uint8_t *pOff; + const uint8_t *pVal; +}; + +struct SDiskData { + SDiskDataHdr hdr; + const uint8_t *pUid; + const uint8_t *pVer; + const uint8_t *pKey; + SArray *aDiskCol; // SArray +}; + int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index 25c1ea20d6..9b5934a04f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -17,8 +17,6 @@ typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SDiskColBuilder SDiskColBuilder; -typedef struct SDiskCol SDiskCol; -typedef struct SDiskData SDiskData; struct SDiskColBuilder { int16_t cid; @@ -45,21 +43,6 @@ struct SDiskDataBuilder { uint8_t *aBuf[2]; }; -struct SDiskCol { - SBlockCol bCol; - const uint8_t *pBit; - const uint8_t *pOff; - const uint8_t *pVal; -}; - -struct SDiskData { - SDiskDataHdr hdr; - const uint8_t *pUid; - const uint8_t *pVer; - const uint8_t *pKey; - SArray *aDiskCol; // SArray -}; - // SDiskColBuilder ================================================ static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) { int32_t code = 0; @@ -147,9 +130,13 @@ static int32_t tDiskColAddValue(SDiskColBuilder *pBuilder, SColVal *pColVal) { code = tCompress(pBuilder->pOffC, &pBuilder->offset, sizeof(int32_t)); if (code) goto _exit; pBuilder->offset += pColVal->value.nData; + + code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData); + if (code) goto _exit; + } else { + code = tCompress(pBuilder->pValC, &pColVal->value.val, tDataTypes[pColVal->type].bytes); + if (code) goto _exit; } - code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/); - if (code) goto _exit; _exit: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 5fe0b408b1..8674ae571e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,6 +607,93 @@ _err: return code; } +int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInfo *pBlkInfo) { + int32_t code = 0; + int32_t lino = 0; + + STsdbFD *pFD = pWriter->pDataFD; // todo + int64_t offset = pWriter->fData.size; + + // hdr + int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr); + code = tRealloc(&pWriter->aBuf[0], n); + TSDB_CHECK_CODE(code, lino, _exit); + + tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr); + + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], n); + TSDB_CHECK_CODE(code, lino, _exit); + offset += n; + + // uid + ver + key + if (pDiskData->hdr.szUid) { + code = tsdbWriteFile(pFD, offset, pDiskData->pUid, pDiskData->hdr.szUid); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskData->hdr.szUid; + } + + code = tsdbWriteFile(pFD, offset, pDiskData->pVer, pDiskData->hdr.szVer); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskData->hdr.szVer; + + code = tsdbWriteFile(pFD, offset, pDiskData->pKey, pDiskData->hdr.szKey); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskData->hdr.szKey; + + // SBlockCol + if (pDiskData->hdr.szBlkCol) { + code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol); + TSDB_CHECK_CODE(code, lino, _exit); + + n = 0; + for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + + n += tPutBlockCol(pWriter->aBuf[0] + n, &pDiskCol->bCol); + } + + ASSERT(n == pDiskData->hdr.szBlkCol); + + code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], pDiskData->hdr.szBlkCol); + TSDB_CHECK_CODE(code, lino, _exit); + + offset += pDiskData->hdr.szBlkCol; + } + + // pData + for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + + if (pDiskCol->bCol.flag == HAS_NULL) continue; + + if (pDiskCol->bCol.szBitmap) { + code = tsdbWriteFile(pFD, offset, pDiskCol->pBit, pDiskCol->bCol.szBitmap); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskCol->bCol.szBitmap; + } + + if (pDiskCol->bCol.szOffset) { + code = tsdbWriteFile(pFD, offset, pDiskCol->pOff, pDiskCol->bCol.szOffset); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskCol->bCol.szOffset; + } + + if (pDiskCol->bCol.szValue) { + code = tsdbWriteFile(pFD, offset, pDiskCol->pVal, pDiskCol->bCol.szValue); + TSDB_CHECK_CODE(code, lino, _exit); + offset += pDiskCol->bCol.szValue; + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s", TD_VID(pWriter->pTsdb->pVnode), __func__); + } + return code; +} + int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { int32_t code = 0; int64_t n;