From 0584acf505a1c3562d8646a8c513acccf85ec3e3 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 22 May 2022 22:30:46 +0800 Subject: [PATCH 1/4] trigger CI From 55411d07c6d495b5beac7b3d8e935a55392e2c9a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 25 May 2022 15:45:13 +0800 Subject: [PATCH 2/4] feat: support delete function --- source/dnode/vnode/src/tsdb/tsdbDelete.c | 61 ++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 source/dnode/vnode/src/tsdb/tsdbDelete.c diff --git a/source/dnode/vnode/src/tsdb/tsdbDelete.c b/source/dnode/vnode/src/tsdb/tsdbDelete.c new file mode 100644 index 0000000000..ef18b8ff66 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbDelete.c @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +typedef struct STombStoneItem STombStoneItem; +typedef struct STombStones STombStones; + +struct STombStones { + TdThreadRwlock lock; + SHashObj* pTmStones; // key: uid, value: tombstone items sorted by version + T_REF_DECLARE() +}; + +struct STombStoneItem { + int64_t tbUid; // child/normal table + int64_t version; + int64_t gversion; // global unique version assigned by mnode + SArray* tsFilters; +}; + +// SBlock 中除了原有的 keyFirst/keyLast,应该不再需要记录 versionMin 和 versionMax +// 因为在查询时,新到的 query 的 version,一定大于原有写入的数据 version。 + +// mem [ts1:v200, ts1:v201, ts2:v300,ts2:v310,ts2:320, ts5:400,ts5:v500,ts6:v600, ts7:v700, ts8:v800] + +// imem [ts2:80,ts2:v90,ts2:v100] + +// file [blk1: ts1:v10,ts1:v11,ts2:v20,ts2:v21 + [blk2: ts2:v30,ts2:v31,ts2:v35 + [blk3: ts2:v40,ts2:v45,ts2:v50,ts3:v60,ts4:v70 + +// delete [ctb1, ts2-ts5:v315] +// [ctb1, ts5:v450] +// [ctb1, ts6:v550] +// 根据 delete version, 逆序过滤。 + +// sql: select * from ctb1 where ts < ts8 and ts > ts1; // query version = 460; + +int32_t tsdbLoadBlockDataMV(SFileBlockIter *pBlockIter, SMemIter *pMemIter, SDataCols *pTarget, SArray *pTmStoneArray, int64_t queryVersion){ + + // 首先,query executor 根据 查询的 ts 范围,找到最后一条记录的位置。 + // 1) 有可能只在 mem/imem + // 2) 有可能只在 某一个文件 + // 3) 有可能 mem/imem 和 文件中均包含。 + + + +} From 80d780eb03dd223428a15bc6e09b520137700b28 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 25 May 2022 22:47:56 +0800 Subject: [PATCH 3/4] fix: use schema version of row when commit --- source/dnode/vnode/src/inc/tsdb.h | 13 +++++++--- source/dnode/vnode/src/tsdb/tsdbCommit.c | 30 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 25 ++++++++++-------- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 2 +- 4 files changed, 41 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 1195f9e2b3..6d3d23cc20 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -40,8 +40,8 @@ typedef struct STable STable; int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); -int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, - TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); // tsdbCommit ================ @@ -179,7 +179,14 @@ struct STsdbFS { int tsdbLockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb); -static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { +static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STsdb *pTsdb, STable *pTable, bool lock, bool copy, + int32_t version) { + + if ((version != -1) && (schemaVersion(pTable->pSchema) != version)) { + taosMemoryFreeClear(pTable->pSchema); + pTable->pSchema = metaGetTbTSchema(REPO_META(pTsdb), pTable->uid, version); + } + return pTable->pSchema; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 93ec6028f8..d462b7e046 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -84,7 +84,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols static void tsdbResetCommitTable(SCommitH *pCommith); static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); @@ -301,7 +301,8 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { SCommitIter *pIter = pCommith->iters + i; if (pIter->pTable == NULL || pIter->pIter == NULL) continue; - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); + tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, + true, NULL); } } @@ -947,7 +948,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { } static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { - STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith),pTable, false, false, -1); pCommith->pTable = pTable; @@ -1254,8 +1255,8 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi SBlock block; while (true) { - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, - pCfg->update, &mInfo); + tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, defaultRows, + pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo); if (pCommith->pDataCols->numOfRows <= 0) break; @@ -1298,8 +1299,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { SSkipListIterator titer = *(pIter->pIter); if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1; - tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, - pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); + tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, + pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, + &mInfo); if (mInfo.nOperations == 0) { // no new data to insert (all updates denied) @@ -1313,9 +1315,9 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { *(pIter->pIter) = titer; } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { // Add a sub-block - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, - pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, - &mInfo); + tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, + pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData, + pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); if (pBlock->last) { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); } else { @@ -1420,7 +1422,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols int biter = 0; while (true) { - tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, + tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, pCfg->update); if (pCommith->pDataCols->numOfRows == 0) break; @@ -1445,7 +1447,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols return 0; } -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) { TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; @@ -1487,7 +1489,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ++(*iter); } else if (key1 > key2) { if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); + pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } @@ -1527,7 +1529,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (TD_SUPPORT_UPDATE(update)) { // copy mem data(Multi-Version) if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); + pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d8426db127..0a77274a21 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -20,7 +20,8 @@ static void tsdbFreeTbData(STbData *pTbData); static char *tsdbGetTsTupleKey(const void *data); static int tsdbTbDataComp(const void *arg1, const void *arg2); static char *tsdbTbDataGetUid(const void *arg); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge); +static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, + bool merge); int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { STsdbMemTable *pMemTable; @@ -88,8 +89,8 @@ void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) { * * The function tries to procceed AS MUCH AS POSSIBLE. */ -int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, - TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { +int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); if (pIter == NULL) return 0; STSchema *pSchema = NULL; @@ -222,12 +223,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (lastKey != TSKEY_INITIAL_VAL) { ++pCols->numOfRows; } - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); } lastKey = rowKey; } else { if (keepDup) { - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); } else { // discard } @@ -249,7 +250,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; pMergeInfo->rowsDeleteSucceed++; pMergeInfo->nOperations++; - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); } else { if (keepDup) { if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; @@ -262,11 +263,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (lastKey != TSKEY_INITIAL_VAL) { ++pCols->numOfRows; } - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); } lastKey = rowKey; } else { - tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); + tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); } } else { pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); @@ -321,7 +322,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo return -1; } strcat(pRsp->tblFName, mr.me.name); - + if (mr.me.type == TSDB_NORMAL_TABLE) { sverNew = mr.me.ntbEntry.schema.sver; } else { @@ -431,10 +432,12 @@ static char *tsdbTbDataGetUid(const void *arg) { STbData *pTbData = (STbData *)arg; return (char *)(&(pTbData->uid)); } -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, bool merge) { + +static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, + bool merge) { if (pCols) { if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row)); + *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); if (*ppSchema == NULL) { ASSERT(false); return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index f66037b16d..d51521c41c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -157,7 +157,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) { } int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { - STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_READ_REPO(pReadh), pTable, false, false, -1); pReadh->pTable = pTable; From 55c010dd6cde2c178e5933fab0926c7f5d274aa6 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 25 May 2022 22:50:41 +0800 Subject: [PATCH 4/4] fix: code optimization --- source/dnode/vnode/src/tsdb/tsdbDelete.c | 61 ------------------------ 1 file changed, 61 deletions(-) delete mode 100644 source/dnode/vnode/src/tsdb/tsdbDelete.c diff --git a/source/dnode/vnode/src/tsdb/tsdbDelete.c b/source/dnode/vnode/src/tsdb/tsdbDelete.c deleted file mode 100644 index ef18b8ff66..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbDelete.c +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdb.h" - -typedef struct STombStoneItem STombStoneItem; -typedef struct STombStones STombStones; - -struct STombStones { - TdThreadRwlock lock; - SHashObj* pTmStones; // key: uid, value: tombstone items sorted by version - T_REF_DECLARE() -}; - -struct STombStoneItem { - int64_t tbUid; // child/normal table - int64_t version; - int64_t gversion; // global unique version assigned by mnode - SArray* tsFilters; -}; - -// SBlock 中除了原有的 keyFirst/keyLast,应该不再需要记录 versionMin 和 versionMax -// 因为在查询时,新到的 query 的 version,一定大于原有写入的数据 version。 - -// mem [ts1:v200, ts1:v201, ts2:v300,ts2:v310,ts2:320, ts5:400,ts5:v500,ts6:v600, ts7:v700, ts8:v800] - -// imem [ts2:80,ts2:v90,ts2:v100] - -// file [blk1: ts1:v10,ts1:v11,ts2:v20,ts2:v21 - [blk2: ts2:v30,ts2:v31,ts2:v35 - [blk3: ts2:v40,ts2:v45,ts2:v50,ts3:v60,ts4:v70 - -// delete [ctb1, ts2-ts5:v315] -// [ctb1, ts5:v450] -// [ctb1, ts6:v550] -// 根据 delete version, 逆序过滤。 - -// sql: select * from ctb1 where ts < ts8 and ts > ts1; // query version = 460; - -int32_t tsdbLoadBlockDataMV(SFileBlockIter *pBlockIter, SMemIter *pMemIter, SDataCols *pTarget, SArray *pTmStoneArray, int64_t queryVersion){ - - // 首先,query executor 根据 查询的 ts 范围,找到最后一条记录的位置。 - // 1) 有可能只在 mem/imem - // 2) 有可能只在 某一个文件 - // 3) 有可能 mem/imem 和 文件中均包含。 - - - -}