diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b41b0ceb0f..1d6f37dabc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 0a27abdbf9..81b42f43e8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -55,6 +55,7 @@ typedef struct STsdbDataIter { typedef struct { STsdb *pTsdb; int64_t cid; + int8_t cmprAlg; int32_t maxRows; int32_t minRows; STsdbFS fs; @@ -72,8 +73,9 @@ typedef struct { // Writer SDataFWriter *pWriter; SArray *aBlockIdx; // SArray - SMapData mDataBlk; // SMapData - SArray *aSttBlk; // SArray + TABLEID tableId; + SMapData mDataBlk; // SMapData + SArray *aSttBlk; // SArray } STsdbCompactor; #define TSDB_FLG_DEEP_COMPACT 0x1 @@ -210,8 +212,23 @@ _exit: } static void tsdbDataIterClose(STsdbDataIter *pIter) { - // TODO - ASSERT(0); + if (pIter == NULL) return; + + if (pIter->flag & TSDB_ITER_TYPE_MEM) { + ASSERT(0); + } else if (pIter->flag & TSDB_ITER_TYPE_DAT) { + ASSERT(0); + } else if (pIter->flag & TSDB_ITER_TYPE_STT) { + SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; + + tBlockDataDestroy(&pSttDIter->bData); + taosArrayDestroy(pSttDIter->aSttBlk); + tsdbDataFReaderClose(&pSttDIter->pReader); + } else { + ASSERT(0); + } + + taosMemoryFree(pIter); } static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) { @@ -285,7 +302,8 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { int32_t lino = 0; pCompactor->pTsdb = pTsdb; - // pCompactor->cid = 0; (TODO) + pCompactor->cid = 0; // TODO + pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; @@ -536,6 +554,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { pCompactor->fid = pCompactor->pDFileSet->fid; + // reader code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); @@ -567,9 +586,15 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { pCompactor->pIter = NULL; tBlockDataReset(&pCompactor->bData); - // open writers - SDFileSet fSet = {0}; // TODO - code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, NULL); + // writer + SDFileSet wSet = {.diskId = (SDiskID){0}, // TODO + .fid = pCompactor->pDFileSet->fid, + .pHeadF = &(SHeadFile){.commitID = pCompactor->cid}, + .pDataF = &(SDataFile){.commitID = pCompactor->cid}, + .pSmaF = &(SSmaFile){.commitID = pCompactor->cid}, + .nSttF = 1, + .aSttF = {&(SSttFile){.commitID = pCompactor->cid}}}; + code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, &wSet); TSDB_CHECK_CODE(code, lino, _exit); if (pCompactor->aBlockIdx == NULL) { @@ -604,19 +629,77 @@ _exit: } static void tsdbCloseCompactor(STsdbCompactor *pCompactor) { - STsdb *pTsdb = pCompactor->pTsdb; - for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) { STsdbDataIter *pIterNext = pIter->next; tsdbDataIterClose(pIter); pIter = pIterNext; } - // TODO - ASSERT(0); + tDestroyTSchema(pCompactor->tbSkm.pTSchema); + pCompactor->tbSkm.pTSchema = NULL; + + tsdbDataFReaderClose(&pCompactor->pReader); +} + +extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); +extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); +static int32_t tsdbCompactWriteBlockData(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + SBlockData *pBData = &pCompactor->bData; + + if (pBData->nRow == 0) goto _exit; + + if (pBData->uid && pBData->nRow >= pCompactor->minRows) { // write to .data file + code = tsdbWriteDataBlock(pCompactor->pWriter, pBData, &pCompactor->mDataBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + + pCompactor->tableId.suid = pBData->suid; + pCompactor->tableId.uid = pBData->uid; + } else { // write to .stt file + code = tsdbWriteSttBlock(pCompactor->pWriter, pBData, pCompactor->aSttBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tBlockDataClear(&pCompactor->bData); _exit: - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + return code; +} + +static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + if (pCompactor->mDataBlk.nItem == 0) goto _exit; + + SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1); + if (pBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pBlockIdx->suid = pCompactor->tableId.suid; + pBlockIdx->uid = pCompactor->tableId.uid; + + code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); + + tMapDataReset(&pCompactor->mDataBlk); + pCompactor->tableId.suid = 0; + pCompactor->tableId.uid = 0; + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code)); + } + return code; } int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { @@ -654,31 +737,29 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } else { - if (pCompactor->bData.suid != pRowInfo->suid) { // not same super table - if (pCompactor->bData.nRow < pCompactor->minRows) { - // TODO: write block data to .stt file, need to check if nRow is 0 - tBlockDataClear(&pCompactor->bData); - } else { - // TODO: write block data to .data file, need to check if nRow is 0 - tBlockDataClear(&pCompactor->bData); - } + if (pCompactor->bData.suid != pRowInfo->suid) { // different super table + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); - } else if (pCompactor->bData.uid != pRowInfo->uid) { - if (pRowInfo->suid) { // different child table - if (pCompactor->bData.nRow > pCompactor->minRows) { - // TODO - } - } else { // different normal table - if (pCompactor->bData.nRow < pCompactor->minRows) { - // TODO: write data to .stt file, need to check if nRow is 0 - tBlockDataClear(&pCompactor->bData); - } else { - // TODO: write data to .data file, need to check if nRow is 0 - tBlockDataClear(&pCompactor->bData); + } else if (pCompactor->bData.uid != pRowInfo->uid) { // different table + if (pRowInfo->suid) { + if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) { + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); } + } else { + // different normal table + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, NULL, 0); @@ -688,12 +769,13 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { } // append row to block data - code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); + code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); // check if block data is full if (pCompactor->bData.nRow >= pCompactor->maxRows) { - tBlockDataClear(&pCompactor->bData); + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); } // iterate to next row @@ -701,25 +783,25 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { TSDB_CHECK_CODE(code, lino, _exit); } - if (pCompactor->bData.nRow > 0) { - // write again - } + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); - if (pCompactor->mDataBlk.nItem > 0) { - SBlockIdx *pBlockIdx = taosArrayReserve(pCompactor->aBlockIdx, 1); - if (pBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); + code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); TSDB_CHECK_CODE(code, lino, _exit); tsdbCloseCompactor(pCompactor); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 1c92e922b0..b47d49dfb7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -637,7 +637,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { } } else { ASSERT(0); - return NULL; // suppress error report by compiler + return NULL; // suppress error report by compiler } } @@ -1070,34 +1070,55 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS if (pBlockData->uid == 0) { ASSERT(uid); code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); - if (code) goto _err; + if (code) goto _exit; pBlockData->aUid[pBlockData->nRow] = uid; } // version code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); - if (code) goto _err; + if (code) goto _exit; pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); // timestamp code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); - if (code) goto _err; + if (code) goto _exit; pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); SColVal cv = {0}; if (pRow->type == TSDBROW_ROW_FMT) { code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData); - if (code) goto _err; + if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); - if (code) goto _err; + if (code) goto _exit; } else { ASSERT(0); } pBlockData->nRow++; +_exit: return code; +} -_err: - return code; +int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { + int32_t code = 0; + + ASSERT(pBlockData->suid || pBlockData->uid); + + if (pBlockData->nRow == 0) { + pBlockData->uid = uid; + } else if (pBlockData->uid && pBlockData->uid != uid) { + ASSERT(pBlockData->suid); + + code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); + if (code) return code; + + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + pBlockData->aUid[iRow] = pBlockData->uid; + } + + pBlockData->uid = 0; + } + + return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); } void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {