From fe27ee265428b7e03d24b8e75424fe6e4cd57fe8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 5 Jun 2022 03:47:41 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 48 +++---- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 144 +++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbFS.c | 4 +- source/dnode/vnode/src/tsdb/tsdbMemTable2.c | 3 + 4 files changed, 158 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 08cf652d20..02b9e4fcff 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -310,33 +310,13 @@ typedef struct { SBlock blocks[]; } SBlockInfo; -#ifdef TD_REFACTOR_3 -typedef struct { - int16_t colId; - uint16_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows - uint16_t reserve : 15; - int32_t len; - uint32_t type : 8; - uint32_t offset : 24; - int64_t sum; - int64_t max; - int64_t min; - int16_t maxIndex; - int16_t minIndex; - int16_t numOfNull; - uint8_t offsetH; - char padding[1]; -} SBlockCol; -#else typedef struct { int16_t colId; uint16_t type : 6; uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length uint32_t len; // data length + bitmap length uint32_t offset; -} SBlockColV0; - -#define SBlockCol SBlockColV0 // latest SBlockCol definition +} SBlockCol; typedef struct { int16_t colId; @@ -346,11 +326,7 @@ typedef struct { int64_t sum; int64_t max; int64_t min; -} SAggrBlkColV0; - -#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition - -#endif +} SAggrBlkCol; // Code here just for back-ward compatibility static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) { @@ -411,8 +387,7 @@ struct SReadH { #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) -#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \ - (sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM)) +#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { switch (blkVer) { @@ -422,7 +397,7 @@ static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) { } } -#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM)) +#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM)) static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { switch (blkVer) { @@ -520,11 +495,11 @@ static FORCE_INLINE void *taosTZfree(void *ptr) { void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); -static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { +static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) { if (key < 0) { - return (int)((key + 1) / tsTickPerMin[precision] / days - 1); + return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1); } else { - return (int)((key / tsTickPerMin[precision] / days)); + return (int)((key / tsTickPerMin[precision] / minutes)); } } @@ -877,12 +852,21 @@ struct SDelOp { SDelOp *pNext; }; +typedef struct { + tb_uid_t suid; + tb_uid_t uid; + int64_t version; + TSKEY sKey; + TSKEY eKey; +} SDelInfo; + struct SMemTable { STsdb *pTsdb; int32_t nRef; TSDBKEY minKey; TSDBKEY maxKey; int64_t nRows; + int64_t nDelOp; SArray *aSkmInfo; SArray *aMemData; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index afd5cd72ee..4948d8a0b1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -17,13 +17,22 @@ typedef struct { SMemTable *pMemTable; + int32_t minutes; + int8_t precision; + int32_t sfid; + int32_t efid; SReadH readh; + SDFileSet wSet; + SArray *aDelInfo; SArray *aBlkIdx; + SArray *aSupBlk; + SArray *aSubBlk; } SCommitH; static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb); static int32_t tsdbEndCommit(SCommitH *pCHandle); static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid); +static int32_t tsdbCommitDelete(SCommitH *pCHandle); int32_t tsdbBegin2(STsdb *pTsdb) { int32_t code = 0; @@ -50,15 +59,18 @@ int32_t tsdbCommit2(STsdb *pTsdb) { } // commit - int32_t sfid; // todo - int32_t efid; // todo - for (int32_t fid = sfid; fid <= efid; fid++) { + for (int32_t fid = ch.sfid; fid <= ch.efid; fid++) { code = tsdbCommitToFile(&ch, fid); if (code) { goto _err; } } + code = tsdbCommitDelete(&ch); + if (code) { + goto _err; + } + // end commit code = tsdbEndCommit(&ch); if (code) { @@ -74,18 +86,77 @@ _err: } static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) { - int32_t code = 0; + int32_t code = 0; + SMemTable *pMemTable = (SMemTable *)pTsdb->mem; + tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode)); + + // switch to commit ASSERT(pTsdb->imem == NULL && pTsdb->mem); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; - // TODO + + // open handle + pCHandle->pMemTable = pMemTable; + pCHandle->minutes = pTsdb->keepCfg.days; + pCHandle->precision = pTsdb->keepCfg.precision; + pCHandle->sfid = TSDB_KEY_FID(pMemTable->minKey.ts, pCHandle->minutes, pCHandle->precision); + pCHandle->efid = TSDB_KEY_FID(pMemTable->maxKey.ts, pCHandle->minutes, pCHandle->precision); + + code = tsdbInitReadH(&pCHandle->readh, pTsdb); + if (code) { + goto _err; + } + pCHandle->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pCHandle->aBlkIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pCHandle->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCHandle->aSupBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pCHandle->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCHandle->aSubBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + // start FS transaction + tsdbStartFSTxn(pTsdb, 0, 0); + + return code; + +_err: return code; } static int32_t tsdbEndCommit(SCommitH *pCHandle) { - int32_t code = 0; - // TODO + int32_t code = 0; + STsdb *pTsdb = pCHandle->pMemTable->pTsdb; + SMemTable *pMemTable = (SMemTable *)pTsdb->imem; + + // end transaction + code = tsdbEndFSTxn(pTsdb); + if (code) { + goto _err; + } + + // close handle + taosArrayClear(pCHandle->aSubBlk); + taosArrayClear(pCHandle->aSupBlk); + taosArrayClear(pCHandle->aBlkIdx); + tsdbDestroyReadH(&pCHandle->readh); + + // destroy memtable (todo: unref it) + pTsdb->imem = NULL; + tsdbMemTableDestroy2(pMemTable); + + tsdbInfo("vgId:%d commit over", TD_VID(pTsdb->pVnode)); + return code; + +_err: return code; } @@ -193,6 +264,65 @@ static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) { return code; +_err: + return code; +} + +static int32_t delInfoCmprFn(const void *p1, const void *p2) { + SDelInfo *pDelInfo1 = (SDelInfo *)p1; + SDelInfo *pDelInfo2 = (SDelInfo *)p2; + + if (pDelInfo1->suid < pDelInfo2->suid) { + return -1; + } else if (pDelInfo1->suid > pDelInfo2->suid) { + return 1; + } + + if (pDelInfo1->uid < pDelInfo2->uid) { + return -1; + } else if (pDelInfo1->uid > pDelInfo2->uid) { + return 1; + } + + if (pDelInfo1->version < pDelInfo2->version) { + return -1; + } else if (pDelInfo1->version > pDelInfo2->version) { + return 1; + } + + return 0; +} +static int32_t tsdbCommitDelete(SCommitH *pCHandle) { + int32_t code = 0; + SDelInfo delInfo; + SMemData *pMemData; + + if (pCHandle->pMemTable->nDelOp == 0) goto _exit; + + // load del array (todo) + + // loop to append SDelInfo + for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) { + pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); + + for (SDelOp *pDelOp = pMemData->delOpHead; pDelOp; pDelOp = pDelOp->pNext) { + delInfo = (SDelInfo){.suid = pMemData->suid, + .uid = pMemData->uid, + .version = pDelOp->version, + .sKey = pDelOp->sKey, + .eKey = pDelOp->eKey}; + if (taosArrayPush(pCHandle->aDelInfo, &delInfo) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + } + + taosArraySort(pCHandle->aDelInfo, delInfoCmprFn); + +_exit: + return code; + _err: return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index c0ca2f9594..584a1ec49c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -37,11 +37,11 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); // static int tsdbProcessExpiredFS(STsdb *pRepo); // static int tsdbCreateMeta(STsdb *pRepo); -static void tsdbGetRootDir(int repoid, const char* dir, char dirName[]) { +static void tsdbGetRootDir(int repoid, const char *dir, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, dir); } -static void tsdbGetDataDir(int repoid, const char* dir, char dirName[]) { +static void tsdbGetDataDir(int repoid, const char *dir, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, dir); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index e581e9fe3d..cf8557bba3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -59,6 +59,7 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX}; pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN}; pMemTable->nRows = 0; + pMemTable->nDelOp = 0; pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *)); if (pMemTable->aMemData == NULL) { taosMemoryFree(pMemTable); @@ -149,6 +150,8 @@ int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_ui // update the state of pMemTable, pMemData, last and lastrow (todo) } + pMemTable->nDelOp++; + tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64 " since %s", TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));