From d1c7e3a531cffe060c49eee955d9da4e75c6a456 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 4 Aug 2022 05:42:15 +0000 Subject: [PATCH] more last refact --- source/dnode/vnode/src/inc/tsdb.h | 29 +++-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 25 +++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 4 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 104 +++++++++--------- 4 files changed, 90 insertions(+), 72 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4c31b3e07d..29626ed67d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -134,14 +134,15 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) #define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) -int32_t tBlockDataInit(SBlockData *pBlockData); -void tBlockDataReset(SBlockData *pBlockData); -int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema); +int32_t tBlockDataInit(SBlockData *pBlockData); +void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); +void tBlockDataReset(SBlockData *pBlockData); +int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid); +int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); +int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); + int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataClearData(SBlockData *pBlockData); -void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); -int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); -int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); @@ -468,13 +469,17 @@ struct SColData { uint8_t *pData; }; +// (SBlockData){.suid = 0, .uid = 0}: block data not initialized +// (SBlockData){.suid = suid, .uid = uid}: block data for ONE child table int .data file +// (SBlockData){.suid = suid, .uid = 0}: block data for N child tables int .last file +// (SBlockData){.suid = 0, .uid = uid}: block data for 1 normal table int .last/.data file struct SBlockData { - int64_t suid; // 0 means normal table data block - int64_t uid; // 0 means block data in .last file, others in .data file - int32_t nRow; - int64_t *aUid; - int64_t *aVersion; - TSKEY *aTSKEY; + int64_t suid; // 0 means normal table block data, otherwise child table block data + int64_t uid; // 0 means block data in .last file, otherwise in .data file + int32_t nRow; // number of rows + int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0) + int64_t *aVersion; // versions of each row + TSKEY *aTSKEY; // timestamp of each row SArray *aIdx; // SArray SArray *aColData; // SArray }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 63aa5f3ec8..3f38b664c2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -296,22 +296,28 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) { ASSERT(pCommitter->dReader.pReader); ASSERT(pCommitter->dReader.pRowInfo); + SBlockData *pBlockDatal = &pCommitter->dReader.bDatal; pCommitter->dReader.iRow++; - if (pCommitter->dReader.iRow < pCommitter->dReader.bDatal.nRow) { - pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; - pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + if (pCommitter->dReader.iRow < pBlockDatal->nRow) { + if (pBlockDatal->uid == 0) { + pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[pCommitter->dReader.iRow]; + } + pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); } else { pCommitter->dReader.iBlockL++; if (pCommitter->dReader.iBlockL < taosArrayGetSize(pCommitter->dReader.aBlockL)) { pCommitter->dReader.pBlockL = (SBlockL *)taosArrayGet(pCommitter->dReader.aBlockL, pCommitter->dReader.iBlockL); - code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, &pCommitter->dReader.bDatal, - NULL, NULL); + code = tsdbReadLastBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockL, pBlockDatal, NULL, NULL); if (code) goto _exit; pCommitter->dReader.iRow = 0; - pCommitter->dReader.pRowInfo->suid = pCommitter->dReader.pBlockL->suid; - pCommitter->dReader.pRowInfo->uid = pCommitter->dReader.bData.aUid[pCommitter->dReader.iRow]; - pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(&pCommitter->dReader.bDatal, pCommitter->dReader.iRow); + pCommitter->dReader.pRowInfo->suid = pBlockDatal->suid; + if (pBlockDatal->uid) { + pCommitter->dReader.pRowInfo->uid = pBlockDatal->uid; + } else { + pCommitter->dReader.pRowInfo->uid = pBlockDatal->aUid[0]; + } + pCommitter->dReader.pRowInfo->row = tsdbRowFromBlockData(pBlockDatal, pCommitter->dReader.iRow); } else { pCommitter->dReader.pRowInfo = NULL; } @@ -354,15 +360,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { } else { pCommitter->dReader.pBlockIdx = NULL; } + tBlockDataReset(&pCommitter->dReader.bData); // last code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); if (code) goto _err; pCommitter->dReader.iBlockL = -1; - pCommitter->dReader.bDatal.nRow = 0; pCommitter->dReader.iRow = -1; pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo; + tBlockDataReset(&pCommitter->dReader.bDatal); code = tsdbCommitterNextLastRow(pCommitter); if (code) goto _err; } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 8417b0d1b8..73bac77e6d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -591,8 +591,8 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB ASSERT(n + sizeof(TSCKSUM) == size); -_exit: tFree(pBuf); +_exit: return code; _err: @@ -658,8 +658,8 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) ASSERT(n + sizeof(TSCKSUM) == size); -_exit: tFree(pBuf); +_exit: return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 205d8b29dc..7beac23912 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1121,6 +1121,8 @@ static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { int32_t tBlockDataInit(SBlockData *pBlockData) { int32_t code = 0; + pBlockData->suid = 0; + pBlockData->uid = 0; pBlockData->nRow = 0; pBlockData->aUid = NULL; pBlockData->aVersion = NULL; @@ -1141,36 +1143,45 @@ _exit: return code; } -void tBlockDataReset(SBlockData *pBlockData) { - pBlockData->nRow = 0; - taosArrayClear(pBlockData->aIdx); -} - void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aUid); tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); taosArrayDestroy(pBlockData->aIdx); taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL); - pBlockData->aColData = NULL; - pBlockData->aIdx = NULL; - pBlockData->aTSKEY = NULL; + pBlockData->aUid = NULL; pBlockData->aVersion = NULL; + pBlockData->aTSKEY = NULL; + pBlockData->aIdx = NULL; + pBlockData->aColData = NULL; } -int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema) { - int32_t code = 0; - SColData *pColData; - STColumn *pTColumn; +void tBlockDataReset(SBlockData *pBlockData) { + pBlockData->suid = 0; + pBlockData->uid = 0; + pBlockData->nRow = 0; + taosArrayClear(pBlockData->aIdx); +} + +int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid) { + int32_t code = 0; + + ASSERT(suid || uid); tBlockDataReset(pBlockData); - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { - pTColumn = &pTSchema->columns[iColumn]; + pBlockData->suid = suid; + pBlockData->uid = uid; - code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); - if (code) goto _exit; + if (pTSchema) { + for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + STColumn *pTColumn = &pTSchema->columns[iColumn]; + SColData *pColData; - tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0); + code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0); + } } _exit: @@ -1211,52 +1222,47 @@ _err: return code; } -int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0; - // TSDBKEY + ASSERT(pBlockData->suid || pBlockData->uid); + + // uid + if (pBlockData->uid == 0) { + ASSERT(uid); + code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); + if (code) goto _err; + pBlockData->aUid[pBlockData->nRow] = uid; + } + // version code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); if (code) goto _err; + pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); + // timestamp code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); if (code) goto _err; - pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); // OTHER - int32_t iColData = 0; - int32_t nColData = taosArrayGetSize(pBlockData->aIdx); - SRowIter iter = {0}; - SRowIter *pIter = &iter; - SColData *pColData; - SColVal *pColVal; + SRowIter rIter = {0}; + SColVal *pColVal; - if (nColData == 0) goto _exit; + tRowIterInit(&rIter, pRow, pTSchema); + pColVal = tRowIterNext(&rIter); + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - tRowIterInit(pIter, pRow, pTSchema); - pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - pColVal = tRowIterNext(pIter); + while (pColVal && pColVal->cid < pColData->cid) { + pColVal = tRowIterNext(&rIter); + } - while (pColData) { - if (pColVal) { - if (pColData->cid == pColVal->cid) { - code = tColDataAppendValue(pColData, pColVal); - if (code) goto _err; - - pColVal = tRowIterNext(pIter); - pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; - } else if (pColData->cid < pColVal->cid) { - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); - if (code) goto _err; - - pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; - } else { - pColVal = tRowIterNext(pIter); - } - } else { + if (pColVal == NULL || pColVal->cid > pColData->cid) { code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); if (code) goto _err; - - pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL; + } else { + code = tColDataAppendValue(pColData, pColVal); + if (code) goto _err; + pColVal = tRowIterNext(&rIter); } }