From a9fcebbdafd47d453448ff3b9d41ea61ef677fae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Jun 2023 17:12:05 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 1147 +++++++++++--------- 1 file changed, 638 insertions(+), 509 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index b5ca716701..f45858ae8d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -14,6 +14,10 @@ */ #include "tsdb.h" +#include "tsdbDataFileRW.h" +#include "tsdbFS2.h" +#include "tsdbIter.h" +#include "tsdbSttFileRW.h" extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); @@ -21,546 +25,483 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, // STsdbSnapReader ======================================== struct STsdbSnapReader { - STsdb* pTsdb; - int64_t sver; - int64_t ever; - int8_t type; + STsdb* tsdb; + int64_t sver; + int64_t ever; + int8_t type; + uint8_t* aBuf[5]; + SSkmInfo skmTb[1]; - STsdbFS fs; - TABLEID tbid; - SSkmInfo skmTable; + TFileSetArray* fsetArr; - // timeseries data - int8_t dataDone; - int32_t fid; + // context + struct { + int32_t fsetArrIdx; + STFileSet* fset; + bool isDataDone; + bool isTombDone; + } ctx[1]; - SDataFReader* pDataFReader; - STsdbDataIter2* iterList; - STsdbDataIter2* pIter; - SRBTree rbt; - SBlockData bData; + // reader + SDataFileReader* dataReader; + TSttFileReaderArray sttReaderArr[1]; - // tombstone data - int8_t delDone; - SDelFReader* pDelFReader; - STsdbDataIter2* pTIter; - SArray* aDelData; + // iter + TTsdbIterArray dataIterArr[1]; + SIterMerger* dataIterMerger; + TTsdbIterArray tombIterArr[1]; + SIterMerger* tombIterMerger; + + // data + SBlockData blockData[1]; + STombBlock tombBlock[1]; }; -static int32_t tsdbSnapReadFileDataStart(STsdbSnapReader* pReader) { +static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) { int32_t code = 0; int32_t lino = 0; - SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSetCmprFn, TD_GT); - if (pSet == NULL) { - pReader->fid = INT32_MAX; - goto _exit; - } + ASSERT(reader->dataReader == NULL); + ASSERT(TARRAY2_SIZE(reader->sttReaderArr) == 0); - pReader->fid = pSet->fid; - - tRBTreeCreate(&pReader->rbt, tsdbDataIterCmprFn); - - code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenDataFileDataIter(pReader->pDataFReader, &pReader->pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pReader->pIter) { - // iter to next with filter info (sver, ever) - code = tsdbDataIterNext2( - pReader->pIter, - &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag - .sver = pReader->sver, - .ever = pReader->ever}); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { - // add to rbtree - tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); - - // add to iterList - pReader->pIter->next = pReader->iterList; - pReader->iterList = pReader->pIter; - } else { - tsdbCloseDataIter2(pReader->pIter); + // data + SDataFileReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .bufArr = reader->aBuf, + }; + bool hasDataFile = false; + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (reader->ctx->fset->farr[ftype] != NULL) { + hasDataFile = true; + config.files[ftype].exist = true; + config.files[ftype].file = reader->ctx->fset->farr[ftype]->f[0]; } } - for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { - code = tsdbOpenSttFileDataIter(pReader->pDataFReader, iStt, &pReader->pIter); + if (hasDataFile) { + code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader); TSDB_CHECK_CODE(code, lino, _exit); + } - if (pReader->pIter) { - // iter to valid row - code = tsdbDataIterNext2( - pReader->pIter, - &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag - .sver = pReader->sver, - .ever = pReader->ever}); + // stt + SSttLvl* lvl; + TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + SSttFileReader* sttReader; + SSttFileReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .file = fobj->f[0], + .bufArr = reader->aBuf, + }; + + code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader); TSDB_CHECK_CODE(code, lino, _exit); - if (pReader->pIter->rowInfo.suid || pReader->pIter->rowInfo.uid) { - // add to rbtree - tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); - - // add to iterList - pReader->pIter->next = pReader->iterList; - pReader->iterList = pReader->pIter; - } else { - tsdbCloseDataIter2(pReader->pIter); - } + code = TARRAY2_APPEND(reader->sttReaderArr, sttReader); + TSDB_CHECK_CODE(code, lino, _exit); } } - pReader->pIter = NULL; - _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pReader->pTsdb->pVnode), __func__, pReader->fid); + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } return code; } -static void tsdbSnapReadFileDataEnd(STsdbSnapReader* pReader) { - while (pReader->iterList) { - STsdbDataIter2* pIter = pReader->iterList; - pReader->iterList = pIter->next; - tsdbCloseDataIter2(pIter); - } - - tsdbDataFReaderClose(&pReader->pDataFReader); -} - -static int32_t tsdbSnapReadNextRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { +static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) { int32_t code = 0; int32_t lino = 0; - if (pReader->pIter) { - code = tsdbDataIterNext2(pReader->pIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | - TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, // flag - .sver = pReader->sver, - .ever = pReader->ever}); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pReader->pIter->rowInfo.suid == 0 && pReader->pIter->rowInfo.uid == 0) { - pReader->pIter = NULL; - } else { - SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); - if (pNode) { - int32_t c = tsdbDataIterCmprFn(&pReader->pIter->rbtn, pNode); - if (c > 0) { - tRBTreePut(&pReader->rbt, &pReader->pIter->rbtn); - pReader->pIter = NULL; - } else if (c == 0) { - ASSERT(0); - } - } - } - } - - if (pReader->pIter == NULL) { - SRBTreeNode* pNode = tRBTreeMin(&pReader->rbt); - if (pNode) { - tRBTreeDrop(&pReader->rbt, pNode); - pReader->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); - } - } - - if (ppRowInfo) { - if (pReader->pIter) { - *ppRowInfo = &pReader->pIter->rowInfo; - } else { - *ppRowInfo = NULL; - } - } + TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose); + tsdbDataFileReaderClose(&reader->dataReader); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } return code; } -static int32_t tsdbSnapReadGetRow(STsdbSnapReader* pReader, SRowInfo** ppRowInfo) { - if (pReader->pIter) { - *ppRowInfo = &pReader->pIter->rowInfo; - return 0; +static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(reader->dataIterMerger == NULL); + ASSERT(reader->tombIterMerger == NULL); + ASSERT(TARRAY2_SIZE(reader->dataIterArr) == 0); + ASSERT(TARRAY2_SIZE(reader->tombIterArr) == 0); + + STsdbIter* iter; + STsdbIterConfig config = { + .filterByVersion = true, + .verRange[0] = reader->sver, + .verRange[1] = reader->ever, + }; + + // data file + if (reader->dataReader) { + // data + config.type = TSDB_ITER_TYPE_DATA; + config.dataReader = reader->dataReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); + + // tomb + config.type = TSDB_ITER_TYPE_DATA_TOMB; + config.dataReader = reader->dataReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->tombIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); } - return tsdbSnapReadNextRow(pReader, ppRowInfo); + // stt file + SSttFileReader* sttReader; + TARRAY2_FOREACH(reader->sttReaderArr, sttReader) { + // data + config.type = TSDB_ITER_TYPE_STT; + config.sttReader = sttReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); + + // tomb + config.type = TSDB_ITER_TYPE_STT_TOMB; + config.sttReader = sttReader; + + code = tsdbIterOpen(&config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->tombIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // merger + code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerOpen(reader->tombIterArr, &reader->dataIterMerger, true); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; } -static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) { - int32_t code = 0; +static int32_t tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) { + tsdbIterMergerClose(&reader->dataIterMerger); + tsdbIterMergerClose(&reader->tombIterMerger); + TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose); + TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose); + return 0; +} - ASSERT(pReader->bData.nRow); +static int32_t tsdbSnapReadFileSetBegin(STsdbSnapReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { + reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); + reader->ctx->isDataDone = false; + reader->ctx->isTombDone = false; + + code = tsdbSnapReadFileSetOpenReader(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapReadFileSetOpenIter(reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapReadFileSetEnd(STsdbSnapReader* reader) { + tsdbSnapReadFileSetCloseIter(reader); + tsdbSnapReadFileSetCloseReader(reader); + reader->ctx->fset = NULL; + return 0; +} + +static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; int32_t aBufN[5] = {0}; - code = tCmprBlockData(&pReader->bData, NO_COMPRESSION, NULL, NULL, pReader->aBuf, aBufN); - if (code) goto _exit; - - int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); - if (*ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData; - pHdr->type = pReader->type; - pHdr->size = size; - - memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]); - memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]); - if (aBufN[1]) { - memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]); - } - if (aBufN[0]) { - memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]); - } - -_exit: - return code; -} - -static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* pReader, uint8_t** ppData) { - int32_t code = 0; - int32_t lino = 0; - - STsdb* pTsdb = pReader->pTsdb; - - tBlockDataReset(&pReader->bData); - - for (;;) { - // start a new file read if need - if (pReader->pDataFReader == NULL) { - code = tsdbSnapReadFileDataStart(pReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pReader->pDataFReader == NULL) break; - - SRowInfo* pRowInfo; - code = tsdbSnapReadGetRow(pReader, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pRowInfo == NULL) { - tsdbSnapReadFileDataEnd(pReader); - continue; - } - - code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, pRowInfo->suid, pRowInfo->uid, &pReader->skmTable); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataInit(&pReader->bData, (TABLEID*)pRowInfo, pReader->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - do { - if (!TABLE_SAME_SCHEMA(pReader->bData.suid, pReader->bData.uid, pRowInfo->suid, pRowInfo->uid)) break; - - if (pReader->bData.uid && pReader->bData.uid != pRowInfo->uid) { - code = tRealloc((uint8_t**)&pReader->bData.aUid, sizeof(int64_t) * (pReader->bData.nRow + 1)); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t iRow = 0; iRow < pReader->bData.nRow; ++iRow) { - pReader->bData.aUid[iRow] = pReader->bData.uid; - } - pReader->bData.uid = 0; - } - - code = tBlockDataAppendRow(&pReader->bData, &pRowInfo->row, NULL, pRowInfo->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbSnapReadNextRow(pReader, &pRowInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pReader->bData.nRow >= 81920) break; - } while (pRowInfo); - - ASSERT(pReader->bData.nRow > 0); - - break; - } - - if (pReader->bData.nRow > 0) { - ASSERT(pReader->bData.suid || pReader->bData.uid); - - code = tsdbSnapCmprData(pReader, ppData); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbSnapCmprTombData(STsdbSnapReader* pReader, uint8_t** ppData) { - int32_t code = 0; - int32_t lino = 0; - - int64_t size = sizeof(TABLEID); - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { - size += tPutDelData(NULL, taosArrayGet(pReader->aDelData, iDelData)); - } - - uint8_t* pData = (uint8_t*)taosMemoryMalloc(sizeof(SSnapDataHdr) + size); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - pHdr->type = SNAP_DATA_DEL; - pHdr->size = size; - - TABLEID* pId = (TABLEID*)(pData + sizeof(SSnapDataHdr)); - *pId = pReader->tbid; - - size = sizeof(SSnapDataHdr) + sizeof(TABLEID); - for (int32_t iDelData = 0; iDelData < taosArrayGetSize(pReader->aDelData); ++iDelData) { - size += tPutDelData(pData + size, taosArrayGet(pReader->aDelData, iDelData)); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - *ppData = pData; - return code; -} - -static void tsdbSnapReadGetTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { - if (pReader->pTIter == NULL || (pReader->pTIter->delInfo.suid == 0 && pReader->pTIter->delInfo.uid == 0)) { - *ppDelInfo = NULL; - } else { - *ppDelInfo = &pReader->pTIter->delInfo; - } -} - -static int32_t tsdbSnapReadNextTombData(STsdbSnapReader* pReader, SDelInfo** ppDelInfo) { - int32_t code = 0; - int32_t lino = 0; - - code = tsdbDataIterNext2( - pReader->pTIter, &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_VERSION | TSDB_FILTER_FLAG_IGNORE_DROPPED_TABLE, - .sver = pReader->sver, - .ever = pReader->ever}); + code = tCmprBlockData(reader->blockData, NO_COMPRESSION, NULL, NULL, reader->aBuf, aBufN); TSDB_CHECK_CODE(code, lino, _exit); - if (ppDelInfo) { - tsdbSnapReadGetTombData(pReader, ppDelInfo); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbSnapReadTombData(STsdbSnapReader* pReader, uint8_t** ppData) { - int32_t code = 0; - int32_t lino = 0; - - STsdb* pTsdb = pReader->pTsdb; - - // open tombstone data iter if need - if (pReader->pDelFReader == NULL) { - if (pReader->fs.pDelFile == NULL) goto _exit; - - // open - code = tsdbDelFReaderOpen(&pReader->pDelFReader, pReader->fs.pDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbOpenTombFileDataIter(pReader->pDelFReader, &pReader->pTIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pReader->pTIter) { - code = tsdbSnapReadNextTombData(pReader, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // loop to get tombstone data - SDelInfo* pDelInfo; - tsdbSnapReadGetTombData(pReader, &pDelInfo); - - if (pDelInfo == NULL) goto _exit; - - pReader->tbid = *(TABLEID*)pDelInfo; - - if (pReader->aDelData) { - taosArrayClear(pReader->aDelData); - } else if ((pReader->aDelData = taosArrayInit(16, sizeof(SDelData))) == NULL) { + int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; + *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*data == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - while (pDelInfo && pDelInfo->suid == pReader->tbid.suid && pDelInfo->uid == pReader->tbid.uid) { - if (taosArrayPush(pReader->aDelData, &pDelInfo->delData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + SSnapDataHdr* pHdr = (SSnapDataHdr*)*data; + pHdr->type = reader->type; + pHdr->size = size; + + memcpy(pHdr->data, reader->aBuf[3], aBufN[3]); + memcpy(pHdr->data + aBufN[3], reader->aBuf[2], aBufN[2]); + if (aBufN[1]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2], reader->aBuf[1], aBufN[1]); + } + if (aBufN[0]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], reader->aBuf[0], aBufN[0]); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; + + tBlockDataReset(reader->blockData); + + for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) { + if (reader->blockData->suid == 0 && reader->blockData->uid == 0) { + code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + TABLEID tbid = { + .suid = row->suid, + .uid = row->suid ? 0 : row->uid, + }; + code = tBlockDataInit(reader->blockData, &tbid, reader->skmTb->pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbSnapReadNextTombData(pReader, &pDelInfo); + if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) { + break; + } + + code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid); TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(reader->dataIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + + if (reader->blockData->nRow >= 81920) { + break; + } } - // encode tombstone data - if (taosArrayGetSize(pReader->aDelData) > 0) { - code = tsdbSnapCmprTombData(pReader, ppData); + if (reader->blockData->nRow > 0) { + ASSERT(reader->blockData->suid || reader->blockData->uid); + code = tsdbSnapCmprData(reader, data); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } return code; } -int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader) { +static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; + + int64_t size = sizeof(SSnapDataHdr); + for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) { + size += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); + } + + data[0] = taosMemoryMalloc(size); + if (data[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SSnapDataHdr* hdr = (SSnapDataHdr*)data[0]; + hdr->type = SNAP_DATA_DEL; + hdr->size = size; + + uint8_t* tdata = hdr->data; + for (int32_t i = 0; i < TARRAY_SIZE(reader->tombBlock->dataArr); i++) { + memcpy(tdata, TARRAY2_DATA(reader->tombBlock->dataArr + i), TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i)); + tdata += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; + + tTombBlockClear(reader->tombBlock); + + for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) { + code = tTombBlockPut(reader->tombBlock, record); + TSDB_CHECK_CODE(code, lino, _exit); + + if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) { + break; + } + } + + if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) { + code = tsdbSnapCmprTombData(reader, data); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** reader) { int32_t code = 0; int32_t lino = 0; // alloc - STsdbSnapReader* pReader = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); - if (pReader == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - pReader->pTsdb = pTsdb; - pReader->sver = sver; - pReader->ever = ever; - pReader->type = type; + reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0])); + if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; - taosThreadRwlockRdlock(&pTsdb->rwLock); - code = tsdbFSRef(pTsdb, &pReader->fs); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - TSDB_CHECK_CODE(code, lino, _exit); - } - taosThreadRwlockUnlock(&pTsdb->rwLock); + reader[0]->tsdb = tsdb; + reader[0]->sver = sver; + reader[0]->ever = ever; + reader[0]->type = type; - // init - pReader->fid = INT32_MIN; - - code = tBlockDataCreate(&pReader->bData); + code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), + tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code), sver, ever, type); - if (pReader) { - tBlockDataDestroy(&pReader->bData); - tsdbFSUnref(pTsdb, &pReader->fs); - taosMemoryFree(pReader); - pReader = NULL; - } + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]); + reader[0] = NULL; } else { - tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(pTsdb->pVnode), __func__, sver, ever, + tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, sver, ever, type); } - *ppReader = pReader; return code; } -int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { +int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) { + if (reader[0] == NULL) return 0; + int32_t code = 0; int32_t lino = 0; - STsdbSnapReader* pReader = *ppReader; - STsdb* pTsdb = pReader->pTsdb; + STsdb* tsdb = reader[0]->tsdb; - // tombstone - if (pReader->pTIter) { - tsdbCloseDataIter2(pReader->pTIter); - pReader->pTIter = NULL; - } - if (pReader->pDelFReader) { - tsdbDelFReaderClose(&pReader->pDelFReader); - } - taosArrayDestroy(pReader->aDelData); + tTombBlockDestroy(reader[0]->tombBlock); + tBlockDataDestroy(reader[0]->blockData); - // timeseries - while (pReader->iterList) { - STsdbDataIter2* pIter = pReader->iterList; - pReader->iterList = pIter->next; - tsdbCloseDataIter2(pIter); - } - if (pReader->pDataFReader) { - tsdbDataFReaderClose(&pReader->pDataFReader); - } - tBlockDataDestroy(&pReader->bData); + tsdbIterMergerClose(&reader[0]->dataIterMerger); + tsdbIterMergerClose(&reader[0]->tombIterMerger); + TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose); + TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose); + TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose); + tsdbDataFileReaderClose(&reader[0]->dataReader); - // other - tDestroyTSchema(pReader->skmTable.pTSchema); - tsdbFSUnref(pReader->pTsdb, &pReader->fs); - for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) { - tFree(pReader->aBuf[iBuf]); + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + tDestroyTSchema(reader[0]->skmTb->pTSchema); + + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf);) { + tFree(reader[0]->aBuf[i]); } - taosMemoryFree(pReader); + + taosMemoryFree(reader[0]); + reader[0] = NULL; _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } else { - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } - *ppReader = NULL; return code; } -int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData) { +int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; - *ppData = NULL; + data[0] = NULL; - // read data file - if (!pReader->dataDone) { - code = tsdbSnapReadTimeSeriesData(pReader, ppData); - TSDB_CHECK_CODE(code, lino, _exit); - if (*ppData) { - goto _exit; - } else { - pReader->dataDone = 1; - } - } + for (;;) { + if (reader->ctx->fset == NULL) { + code = tsdbSnapReadFileSetBegin(reader); + TSDB_CHECK_CODE(code, lino, _exit); - // read del file - if (!pReader->delDone) { - code = tsdbSnapReadTombData(pReader, ppData); - TSDB_CHECK_CODE(code, lino, _exit); - if (*ppData) { - goto _exit; - } else { - pReader->delDone = 1; + if (reader->ctx->fset == NULL) { + break; + } } + + if (!reader->ctx->isDataDone) { + code = tsdbSnapReadTimeSeriesData(reader, data); + TSDB_CHECK_CODE(code, lino, _exit); + if (data[0]) { + goto _exit; + } else { + reader->ctx->isDataDone = true; + } + } + + if (!reader->ctx->isTombDone) { + code = tsdbSnapReadTombData(reader, data); + TSDB_CHECK_CODE(code, lino, _exit); + if (data[0]) { + goto _exit; + } else { + reader->ctx->isTombDone = true; + } + } + + code = tsdbSnapReadFileSetEnd(reader); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); } else { - tsdbDebug("vgId:%d %s done", TD_VID(pReader->pTsdb->pVnode), __func__); + tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__); } return code; } // STsdbSnapWriter ======================================== struct STsdbSnapWriter { - STsdb* pTsdb; + STsdb* tsdb; int64_t sver; int64_t ever; int32_t minutes; @@ -571,41 +512,37 @@ struct STsdbSnapWriter { int64_t commitID; uint8_t* aBuf[5]; - STsdbFS fs; - TABLEID tbid; + TFileSetArray* fsetArr; + TFileOpArray fopArr[1]; - // time-series data - SBlockData inData; + struct { + bool fsetWriteBegin; - int32_t fid; - SSkmInfo skmTable; + int32_t fid; + STFileSet* fset; - /* reader */ - SDataFReader* pDataFReader; - STsdbDataIter2* iterList; - STsdbDataIter2* pDIter; - STsdbDataIter2* pSIter; - SRBTree rbt; // SRBTree + bool hasData; + bool hasTomb; - /* writer */ - SDataFWriter* pDataFWriter; - SArray* aBlockIdx; - SMapData mDataBlk; // SMapData - SArray* aSttBlk; // SArray - SBlockData bData; - SBlockData sData; + // reader + SDataFileReader* dataReader; + TSttFileReaderArray sttReaderArr[1]; - // tombstone data - /* reader */ - SDelFReader* pDelFReader; - STsdbDataIter2* pTIter; + // iter/merger + TTsdbIterArray dataIterArr[1]; + SIterMerger* dataIterMerger; + TTsdbIterArray tombIterArr[1]; + SIterMerger* tombIterMerger; + } ctx[1]; - /* writer */ - SDelFWriter* pDelFWriter; - SArray* aDelIdx; - SArray* aDelData; + SDataFileWriter* dataWriter; + SSttFileWriter* sttWriter; + + SBlockData blockData[1]; + STombBlock tombBlock[1]; }; +#if 0 // SNAP_DATA_TSDB static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; @@ -666,7 +603,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI } if (pId) { - code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); + code = tsdbUpdateTableSchema(pWriter->tsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); TSDB_CHECK_CODE(code, lino, _exit); tMapDataReset(&pWriter->mDataBlk); @@ -690,9 +627,9 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__, pWriter->tbid.suid, pWriter->tbid.uid); } return code; @@ -712,7 +649,7 @@ static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -782,7 +719,7 @@ _write_row: _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -832,7 +769,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -843,7 +780,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid); - STsdb* pTsdb = pWriter->pTsdb; + STsdb* pTsdb = pWriter->tsdb; pWriter->fid = fid; pWriter->tbid = (TABLEID){0}; @@ -955,7 +892,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -1002,7 +939,7 @@ static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowIn _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -1021,7 +958,7 @@ static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInf _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -1076,9 +1013,9 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->tsdb->pVnode), __func__, tstrerror(code)); } else { - tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__); + tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->tsdb->pVnode), __func__); } return code; } @@ -1139,9 +1076,9 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__, + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->tsdb->pVnode), __func__, pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow); } return code; @@ -1196,9 +1133,9 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID* _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, + tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->tsdb->pVnode), __func__, pWriter->tbid.suid, pWriter->tbid.uid); } return code; @@ -1224,9 +1161,9 @@ static int32_t tsdbSnapWriteDelTableDataEnd(STsdbSnapWriter* pWriter) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbTrace("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); + tsdbTrace("vgId:%d %s done", TD_VID(pWriter->tsdb->pVnode), __func__); } return code; } @@ -1261,7 +1198,7 @@ static int32_t tsdbSnapWriteDelTableData(STsdbSnapWriter* pWriter, TABLEID* pId, _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->tsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } @@ -1270,7 +1207,7 @@ static int32_t tsdbSnapWriteDelDataStart(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; - STsdb* pTsdb = pWriter->pTsdb; + STsdb* pTsdb = pWriter->tsdb; SDelFile* pDelFile = pWriter->fs.pDelFile; pWriter->tbid = (TABLEID){0}; @@ -1310,7 +1247,7 @@ static int32_t tsdbSnapWriteDelDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; - STsdb* pTsdb = pWriter->pTsdb; + STsdb* pTsdb = pWriter->tsdb; // end remaining table with NULL data code = tsdbSnapWriteDelTableData(pWriter, NULL, NULL, 0); @@ -1352,7 +1289,7 @@ static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr int32_t code = 0; int32_t lino = 0; - STsdb* pTsdb = pWriter->pTsdb; + STsdb* pTsdb = pWriter->tsdb; // start to write del data if need if (pWriter->pDelFWriter == NULL) { @@ -1373,19 +1310,21 @@ _exit: } return code; } +#endif // APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; int32_t lino = 0; +#if 0 // alloc STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - pWriter->pTsdb = pTsdb; + pWriter->tsdb = pTsdb; pWriter->sver = sver; pWriter->ever = ever; pWriter->minutes = pTsdb->keepCfg.days; @@ -1411,29 +1350,31 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr TSDB_CHECK_CODE(code, lino, _exit); // SNAP_DATA_DEL +#endif _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - if (pWriter) { - tBlockDataDestroy(&pWriter->sData); - tBlockDataDestroy(&pWriter->bData); - tBlockDataDestroy(&pWriter->inData); - tsdbFSDestroy(&pWriter->fs); - taosMemoryFree(pWriter); - pWriter = NULL; - } + // if (pWriter) { + // tBlockDataDestroy(&pWriter->sData); + // tBlockDataDestroy(&pWriter->bData); + // tBlockDataDestroy(&pWriter->inData); + // tsdbFSDestroy(&pWriter->fs); + // taosMemoryFree(pWriter); + // pWriter = NULL; + // } } else { tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever); } - *ppWriter = pWriter; + // *ppWriter = pWriter; return code; } -int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { +int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer) { int32_t code = 0; int32_t lino = 0; +#if 0 if (pWriter->pDataFWriter) { code = tsdbSnapWriteFileDataEnd(pWriter); TSDB_CHECK_CODE(code, lino, _exit); @@ -1444,32 +1385,34 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); + code = tsdbFSPrepareCommit(pWriter->tsdb, &pWriter->fs); TSDB_CHECK_CODE(code, lino, _exit); +#endif _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); + tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__); } return code; } -int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { +int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { int32_t code = 0; int32_t lino = 0; - STsdbSnapWriter* pWriter = *ppWriter; - STsdb* pTsdb = pWriter->pTsdb; +#if 0 + STsdbSnapWriter* pWriter = *writer; + STsdb* pTsdb = pWriter->tsdb; if (rollback) { - tsdbRollbackCommit(pWriter->pTsdb); + tsdbRollbackCommit(pWriter->tsdb); } else { // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFSCommit(pWriter->pTsdb); + code = tsdbFSCommit(pWriter->tsdb); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); @@ -1497,43 +1440,229 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { } tsdbFSDestroy(&pWriter->fs); taosMemoryFree(pWriter); - *ppWriter = NULL; + *writer = NULL; +#endif _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + // tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + // tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } -int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) { +static int32_t tsdbSnapWriteDoWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) { int32_t code = 0; int32_t lino = 0; - if (pHdr->type == SNAP_DATA_TSDB) { - code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr); - TSDB_CHECK_CODE(code, lino, _exit); + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, const SRowInfo* row) { + int32_t code = 0; + int32_t lino = 0; + + while (writer->ctx->hasData) { + SRowInfo* row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger); + if (row1 == NULL) { + writer->ctx->hasData = false; + break; + } + + int32_t c = tRowInfoCmprFn(row1, row); + if (c <= 0) { + code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row1); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(writer->ctx->dataIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + break; + } + } + + if (row->suid == INT64_MAX) { + ASSERT(writer->ctx->hasData == false); goto _exit; - } else if (pWriter->pDataFWriter) { - code = tsdbSnapWriteFileDataEnd(pWriter); + } + + code = tsdbSnapWriteDoWriteTimeSeriesRow(writer, row); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) { + if (!writer->ctx->fsetWriteBegin) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // TODO + SRowInfo row = { + .suid = INT64_MAX, + .uid = INT64_MAX, + }; + + code = tsdbSnapWriteTimeSeriesRow(writer, &row); + TSDB_CHECK_CODE(code, lino, _exit); + + STombRecord record = { + .suid = INT64_MAX, + .uid = INT64_MAX, + }; + + code = tsdbSnapWriteTombRecord(writer, &record); + TSDB_CHECK_CODE(code, lino, _exit); + + // close write + code = tsdbSttFileWriterClose(&writer->sttWriter, 0, writer->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileWriterClose(&writer->dataWriter, 0, writer->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; + + SBlockData blockData[1] = {0}; + + code = tDecmprBlockData(hdr->data, hdr->size, blockData, writer->aBuf); + TSDB_CHECK_CODE(code, lino, _exit); + + int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision); + if (fid != writer->ctx->fid) { + code = tsdbSnapWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteFileSetBegin(writer, fid); TSDB_CHECK_CODE(code, lino, _exit); } - if (pHdr->type == SNAP_DATA_DEL) { - code = tsdbSnapWriteDelData(pWriter, pHdr); + for (int32_t i = 0; i < blockData->nRow; ++i) { + SRowInfo rowInfo = { + .suid = blockData->suid, + .uid = blockData->uid ? blockData->uid : blockData->aUid[i], + .row = tsdbRowFromBlockData(blockData, i), + }; + + code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo); TSDB_CHECK_CODE(code, lino, _exit); - goto _exit; + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } else { + tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__, + blockData->suid, blockData->uid, blockData->nRow); + } + tBlockDataDestroy(blockData); + return code; +} + +static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) { + int32_t code = 0; + int32_t lino = 0; + + // TODO + +_exit: + return code; +} + +static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; + + STombBlock tombBlock[1] = {0}; + + code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) { + STombRecord record; + tTombBlockGet(tombBlock, i, &record); + + code = tsdbSnapWriteTombRecord(writer, &record); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tTombBlockDestroy(tombBlock); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; + + if (hdr->type == SNAP_DATA_TSDB) { + code = tsdbSnapWriteTimeSeriesData(writer, hdr); + TSDB_CHECK_CODE(code, lino, _exit); + } else if (hdr->type == SNAP_DATA_DEL) { + code = tsdbSnapWriteTombData(writer, hdr); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + ASSERT(0); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64, - TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size); + TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size); } else { - tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, - pHdr->type, pHdr->index, pHdr->size); + tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__, + hdr->type, hdr->index, hdr->size); } return code; }