From f51c86430a381a9bae6ec1c80726b3a0c09feb22 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 21 Sep 2022 14:50:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbDiskData.c | 198 +++++++++++++-------- 1 file changed, 124 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index a5104dcfbd..a0c1bb8934 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -15,8 +15,9 @@ #include "tsdb.h" -typedef struct SDiskData SDiskData; -typedef struct SDiskCol SDiskCol; +typedef struct SDiskDataBuilder SDiskDataBuilder; +typedef struct SDiskCol SDiskCol; +typedef struct SDiskData SDiskData; struct SDiskCol { int16_t cid; @@ -70,6 +71,34 @@ static int32_t tDiskColClear(SDiskCol *pDiskCol) { return code; } +static int32_t tDiskColToBinary(SDiskCol *pDiskCol, const uint8_t **ppData, int32_t *nData) { + int32_t code = 0; + + ASSERT(pDiskCol->flag && pDiskCol->flag != HAS_NONE); + + if (pDiskCol->flag == HAS_NULL) { + return code; + } + + // bitmap (todo) + if (pDiskCol->flag != HAS_VALUE) { + } + + // offset (todo) + if (IS_VAR_DATA_TYPE(pDiskCol->type)) { + code = tCompGen(pDiskCol->pOffC, NULL /* todo */, NULL /* todo */); + if (code) return code; + } + + // value (todo) + if (pDiskCol->flag != (HAS_NULL | HAS_NONE)) { + code = tCompGen(pDiskCol->pValC, NULL /* todo */, NULL /* todo */); + if (code) return code; + } + + return code; +} + static int32_t tDiskColAddValue(SDiskCol *pDiskCol, SColVal *pColVal) { int32_t code = 0; @@ -338,8 +367,8 @@ static int32_t (*tDiskColAddValImpl[])(SDiskCol *pDiskCol, SColVal *pColVal) = { tDiskColAddVal7, // HAS_VALUE|HAS_NULL|HAS_NONE }; -// SDiskData ================================================ -struct SDiskData { +// SDiskDataBuilder ================================================ +struct SDiskDataBuilder { int64_t suid; int64_t uid; int32_t nRow; @@ -352,121 +381,121 @@ struct SDiskData { uint8_t *aBuf[2]; }; -int32_t tDiskDataInit(SDiskData *pDiskData, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) { +int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) { int32_t code = 0; - pDiskData->suid = pId->suid; - pDiskData->uid = pId->uid; - pDiskData->nRow = 0; - pDiskData->cmprAlg = cmprAlg; + pBuilder->suid = pId->suid; + pBuilder->uid = pId->uid; + pBuilder->nRow = 0; + pBuilder->cmprAlg = cmprAlg; - if (pDiskData->pUidC == NULL) { - code = tCompressorCreate(&pDiskData->pUidC); + if (pBuilder->pUidC == NULL) { + code = tCompressorCreate(&pBuilder->pUidC); if (code) return code; } - code = tCompressorReset(pDiskData->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + code = tCompressorReset(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; - if (pDiskData->pVerC == NULL) { - code = tCompressorCreate(&pDiskData->pVerC); + if (pBuilder->pVerC == NULL) { + code = tCompressorCreate(&pBuilder->pVerC); if (code) return code; } - code = tCompressorReset(pDiskData->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + code = tCompressorReset(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; - if (pDiskData->pKeyC == NULL) { - code = tCompressorCreate(&pDiskData->pKeyC); + if (pBuilder->pKeyC == NULL) { + code = tCompressorCreate(&pBuilder->pKeyC); if (code) return code; } - code = tCompressorReset(pDiskData->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); + code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); if (code) return code; - if (pDiskData->aDiskCol == NULL) { - pDiskData->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol)); - if (pDiskData->aDiskCol == NULL) { + if (pBuilder->aDiskCol == NULL) { + pBuilder->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol)); + if (pBuilder->aDiskCol == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } } - pDiskData->nDiskCol = 0; + pBuilder->nDiskCol = 0; for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { STColumn *pTColumn = &pTSchema->columns[iCol]; - if (pDiskData->nDiskCol >= taosArrayGetSize(pDiskData->aDiskCol)) { + if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskCol)) { SDiskCol dc = (SDiskCol){0}; - if (taosArrayPush(pDiskData->aDiskCol, &dc) == NULL) { + if (taosArrayPush(pBuilder->aDiskCol, &dc) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } } - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, pDiskData->nDiskCol); + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, pBuilder->nDiskCol); code = tDiskColInit(pDiskCol, pTColumn->colId, pTColumn->type, cmprAlg); if (code) return code; - pDiskData->nDiskCol++; + pBuilder->nDiskCol++; } return code; } -int32_t tDiskDataDestroy(SDiskData *pDiskData) { +int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { int32_t code = 0; - if (pDiskData->pUidC) tCompressorDestroy(pDiskData->pUidC); - if (pDiskData->pVerC) tCompressorDestroy(pDiskData->pVerC); - if (pDiskData->pKeyC) tCompressorDestroy(pDiskData->pKeyC); + if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC); + if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); + if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); - if (pDiskData->aDiskCol) { - for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) { - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + if (pBuilder->aDiskCol) { + for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pBuilder->aDiskCol); iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); tDiskColClear(pDiskCol); } - taosArrayDestroy(pDiskData->aDiskCol); + taosArrayDestroy(pBuilder->aDiskCol); } - for (int32_t iBuf = 0; iBuf < sizeof(pDiskData->aBuf) / sizeof(pDiskData->aBuf[0]); iBuf++) { - tFree(pDiskData->aBuf[iBuf]); + for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { + tFree(pBuilder->aBuf[iBuf]); } return code; } -int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { +int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { int32_t code = 0; - ASSERT(pId->suid == pDiskData->suid); + ASSERT(pId->suid == pBuilder->suid); // uid - if (pDiskData->uid && pDiskData->uid != pId->uid) { - for (int32_t iRow = 0; iRow < pDiskData->nRow; iRow++) { - code = tCompress(pDiskData->pUidC, &pDiskData->uid, sizeof(int64_t)); + if (pBuilder->uid && pBuilder->uid != pId->uid) { + for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) { + code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t)); if (code) goto _exit; } - pDiskData->uid = 0; + pBuilder->uid = 0; } - if (pDiskData->uid == 0) { - code = tCompress(pDiskData->pUidC, &pId->uid, sizeof(int64_t)); + if (pBuilder->uid == 0) { + code = tCompress(pBuilder->pUidC, &pId->uid, sizeof(int64_t)); if (code) goto _exit; } // version int64_t version = TSDBROW_VERSION(pRow); - code = tCompress(pDiskData->pVerC, &version, sizeof(int64_t)); + code = tCompress(pBuilder->pVerC, &version, sizeof(int64_t)); if (code) goto _exit; // TSKEY TSKEY ts = TSDBROW_TS(pRow); - code = tCompress(pDiskData->pKeyC, &ts, sizeof(int64_t)); + code = tCompress(pBuilder->pKeyC, &ts, sizeof(int64_t)); if (code) goto _exit; SRowIter iter = {0}; tRowIterInit(&iter, pRow, pTSchema); SColVal *pColVal = tRowIterNext(&iter); - for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); while (pColVal && pColVal->cid < pDiskCol->cid) { pColVal = tRowIterNext(&iter); @@ -482,53 +511,53 @@ int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema, pColVal = tRowIterNext(&iter); } } - pDiskData->nRow++; + pBuilder->nRow++; _exit: return code; } -int32_t tDiskDataToBinary(SDiskData *pDiskData, const uint8_t **ppData, int32_t *nData) { +int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { int32_t code = 0; - ASSERT(pDiskData->nRow); + ASSERT(pBuilder->nRow); SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .fmtVer = 0, - .suid = pDiskData->suid, - .uid = pDiskData->uid, + .suid = pBuilder->suid, + .uid = pBuilder->uid, .szUid = 0, .szVer = 0, .szKey = 0, .szBlkCol = 0, - .nRow = pDiskData->nRow, - .cmprAlg = pDiskData->cmprAlg}; + .nRow = pBuilder->nRow, + .cmprAlg = pBuilder->cmprAlg}; // UID const uint8_t *pUid = NULL; - if (pDiskData->uid == 0) { - code = tCompGen(pDiskData->pUidC, &pUid, &hdr.szUid); + if (pBuilder->uid == 0) { + code = tCompGen(pBuilder->pUidC, &pUid, &hdr.szUid); if (code) return code; } // VERSION const uint8_t *pVer = NULL; - code = tCompGen(pDiskData->pVerC, &pVer, &hdr.szVer); + code = tCompGen(pBuilder->pVerC, &pVer, &hdr.szVer); if (code) return code; // TSKEY const uint8_t *pKey = NULL; - code = tCompGen(pDiskData->pKeyC, &pKey, &hdr.szKey); + code = tCompGen(pBuilder->pKeyC, &pKey, &hdr.szKey); if (code) return code; int32_t offset = 0; - for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); if (pDiskCol->flag == HAS_NONE) continue; - // code = tDiskColToBinary(pDiskCol, ); - // if (code) return code; + code = tDiskColToBinary(pDiskCol, NULL, NULL); + if (code) return code; SBlockCol bCol = {.cid = pDiskCol->cid, .type = pDiskCol->type, @@ -544,30 +573,51 @@ int32_t tDiskDataToBinary(SDiskData *pDiskData, const uint8_t **ppData, int32_t offset = offset + bCol.szBitmap + bCol.szOffset + bCol.szValue; } +#if 0 *nData = tPutDiskDataHdr(NULL, &hdr) + hdr.szUid + hdr.szVer + hdr.szKey + hdr.szBlkCol + offset; - code = tRealloc(&pDiskData->aBuf[0], *nData); + code = tRealloc(&pBuilder->aBuf[0], *nData); if (code) return code; - *ppData = pDiskData->aBuf[0]; + *ppData = pBuilder->aBuf[0]; int32_t n = 0; - n += tPutDiskDataHdr(pDiskData->aBuf[0] + n, &hdr); + n += tPutDiskDataHdr(pBuilder->aBuf[0] + n, &hdr); if (hdr.szUid) { - memcpy(pDiskData->aBuf[0] + n, pUid, hdr.szUid); + memcpy(pBuilder->aBuf[0] + n, pUid, hdr.szUid); n += hdr.szUid; } - memcpy(pDiskData->aBuf[0] + n, pVer, hdr.szVer); + memcpy(pBuilder->aBuf[0] + n, pVer, hdr.szVer); n += hdr.szVer; - memcpy(pDiskData->aBuf[0] + n, pKey, hdr.szKey); + memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey); n += hdr.szKey; - for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); - n += tPutBlockCol(pDiskData->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */); + for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); + n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */); } - for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { - SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); // memcpy(pDiskData->aBuf[0] + n, NULL, ); // n += 0; } +#endif + + return code; +} + +// SDiskData ================================================ +struct SDiskData { + SDiskDataHdr hdr; + const uint8_t *pUid; + const uint8_t *pVer; + const uint8_t *pKey; + SArray *aBlockCol; + SArray *aColData; +}; + +int32_t tDiskDataDestroy(SDiskData *pDiskData) { + int32_t code = 0; + + if (pDiskData->aBlockCol) taosArrayDestroy(pDiskData->aBlockCol); + if (pDiskData->aColData) taosArrayDestroy(pDiskData->aColData); return code; }