diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 846ca44b83..c65738f31b 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2509,7 +2509,7 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) { if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) { ASSERT(pColData->type == pBind->buffer_type); } - + if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type for (int32_t i = 0; i < pBind->num; ++i) { if (pBind->is_null && pBind->is_null[i]) { diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h new file mode 100644 index 0000000000..313df5f223 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbMerge.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbFS.h" +#include "tsdbSttFReaderWriter.h" + +#ifndef _TD_TSDB_MERGE_H_ +#define _TD_TSDB_MERGE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/* Exposed Handle */ + +/* Exposed APIs */ + +/* Exposed Structs */ + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_MERGE_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index b81b8ebea5..e9778b6022 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -442,7 +442,9 @@ int32_t tsdbCommitCommit(STsdb *pTsdb) { taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbUnrefMemTable(pMemTable, NULL, true); - // check merge (TODO: remove here) + // TODO: make this call async + code = tsdbMerge(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 72b11bb967..b0be2bb528 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -13,103 +13,127 @@ * along with this program. If not, see . */ -#include "dev.h" +#include "inc/tsdbMerge.h" typedef struct { - STsdb *pTsdb; - STFileSet *pSet; + bool launched; +} SMergeCtx; - SBlockData bData; - - // reader - SSttFileReader *pSttFReader; - int32_t nSttFSegReader; - SSttFSegReader *aSttFSegReader; - - // writer - SSttFileWriter *pSttFWriter; - SDataFileWriter *pDataFWriter; - - SArray *aFileOp; // SArray +typedef struct { + STsdb *tsdb; + SMergeCtx ctx; + TFileOpArray fopArr; } SMerger; -static int32_t tsdbFileSystemShouldMerge(STsdb *pTsdb) { - ASSERTS(0, "TODO: not implemented yet"); - // TODO +static int32_t tsdbMergerOpen(SMerger *merger) { + merger->ctx.launched = true; + TARRAY2_INIT(&merger->fopArr); return 0; } -static int32_t tsdbFileSetShouldMerge(struct STFileSet *pSet) { - ASSERTS(0, "TODO: not implemented yet"); +static int32_t tsdbMergerClose(SMerger *merger) { // TODO + ASSERT(0); return 0; } -static int32_t tsdbFileSetMerge(struct STFileSet *pFileSet) { - ASSERTS(0, "TODO: not implemented yet"); - // TODO - return 0; -} - -static int32_t tsdbOpenMerger(STsdb *pTsdb, SMerger *pMerger) { - pMerger->pTsdb = pTsdb; - // TODO - return 0; -} - -static int32_t tsdbDestroyMerger(SMerger *pMerger) { +static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { int32_t code = 0; - // TODO - return code; -} + int32_t lino = 0; -static int32_t tsdbCloseMerger(SMerger *pMerger) { - int32_t code = 0; - int32_t lino; + if (merger->ctx.launched == false) { + code = tsdbMergerOpen(merger); + TSDB_CHECK_CODE(code, lino, _exit); + } - STsdb *pTsdb = pMerger->pTsdb; + { // prepare the merger file set + SSttLvl *lvl; + STFileObj *fobj; + TARRAY2_FOREACH(&fset->lvlArr, lvl) { + TARRAY2_FOREACH(&lvl->farr, fobj) { + if (fobj->f.stt.nseg >= merger->tsdb->pVnode->config.sttTrigger) { + STFileOp op = { + .fid = fset->fid, + .optype = TSDB_FOP_REMOVE, + .of = fobj->f, + }; - // code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE); - // TSDB_CHECK_CODE(code, lino, _exit) - -_exit: - if (code) { - tsdbFSEditAbort(pTsdb->pFS); - } else { - tsdbFSEditCommit(pTsdb->pFS); - } - tsdbDestroyMerger(pMerger); - return code; -} - -int32_t tsdbMerge(STsdb *pTsdb) { - int32_t code = 0; - int32_t lino; - - if (!tsdbFileSystemShouldMerge(pTsdb)) { - goto _exit; - } - - SMerger pMerger = {0}; - code = tsdbOpenMerger(pTsdb, &pMerger); - TSDB_CHECK_CODE(code, lino, _exit); - - // for (int32_t i = 0; i < taosArrayGetSize(pTsdb->pFS->cstate); i++) { - // struct STFileSet *pFileSet = taosArrayGet(pTsdb->pFS->cstate, i); - // if (!tsdbFileSetShouldMerge(pFileSet)) { - // continue; - // } - - // code = tsdbFileSetMerge(pFileSet); - // TSDB_CHECK_CODE(code, lino, _exit); - // } - - code = tsdbCloseMerger(&pMerger); - TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(&merger->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (lvl->level == 0) { + continue; + } else { + // TODO + } + } + } + } + } + + { + // do merge the file set + } + + { // end merge the file set + } _exit: if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code)); } else { + tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid); + } + return 0; +} + +int32_t tsdbMerge(STsdb *tsdb) { + int32_t code = 0; + int32_t lino; + + SVnode *vnode = tsdb->pVnode; + int32_t vid = TD_VID(vnode); + STFileSystem *fs = tsdb->pFS; + STFileSet *fset; + STFileObj *fobj; + int32_t sttTrigger = vnode->config.sttTrigger; + + SMerger merger = { + .tsdb = tsdb, + .ctx = + { + .launched = false, + }, + }; + + // loop to merge each file set + TARRAY2_FOREACH(&fs->cstate, fset) { + SSttLvl *lvl0 = tsdbTFileSetGetLvl(fset, 0); + if (lvl0 == NULL) { + continue; + } + + ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0); + + fobj = TARRAY2_GET(&lvl0->farr, 0); + + if (fobj->f.stt.nseg >= sttTrigger) { + code = tsdbMergeFileSet(&merger, fset); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + // end the merge + if (merger.ctx.launched) { + code = tsdbMergerClose(&merger); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s done, do merge: %d", vid, __func__, merger.ctx.launched); } return 0; }