From 58135a1aeb3cb11d62ec35b516d9e306fd726efe Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Jul 2022 06:19:01 +0000 Subject: [PATCH 1/4] tsdb r/w concurrency --- source/dnode/vnode/src/inc/tsdb.h | 46 ++++++++--------- source/dnode/vnode/src/tsdb/tsdbCache.c | 8 +-- source/dnode/vnode/src/tsdb/tsdbCommit.c | 44 ++++++++++++---- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 56 +++++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbOpen.c | 32 ++---------- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +-- source/dnode/vnode/src/tsdb/tsdbRetention.c | 10 ++-- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 18 +++---- 8 files changed, 137 insertions(+), 83 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4e2c762cd3..eb5f66d443 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -174,6 +174,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); +void tsdbRefMemTable(SMemTable *pMemTable); +void tsdbUnrefMemTable(SMemTable *pMemTable); +int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem); +void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter); @@ -273,16 +277,14 @@ typedef struct { } SRtn; struct STsdb { - char *path; - SVnode *pVnode; - TdThreadMutex mutex; - bool repoLocked; - STsdbKeepCfg keepCfg; - SMemTable *mem; - SMemTable *imem; - SRtn rtn; - STsdbFS *fs; - SLRUCache *lruCache; + char *path; + SVnode *pVnode; + TdThreadRwlock rwLock; + STsdbKeepCfg keepCfg; + SMemTable *mem; + SMemTable *imem; + STsdbFS *pFS; + SLRUCache *lruCache; }; struct STable { @@ -330,21 +332,19 @@ struct STbData { }; struct SMemTable { - SRWLatch latch; - STsdb *pTsdb; - int32_t nRef; - TSKEY minKey; - TSKEY maxKey; - int64_t minVersion; - int64_t maxVersion; - int64_t nRow; - int64_t nDel; - SArray *aTbData; // SArray + SRWLatch latch; + STsdb *pTsdb; + SVBufPool *pPool; + volatile int32_t nRef; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; + int64_t nRow; + int64_t nDel; + SArray *aTbData; // SArray }; -int tsdbLockRepo(STsdb *pTsdb); -int tsdbUnlockRepo(STsdb *pTsdb); - struct TSDBROW { int8_t type; // 0 for row from tsRow, 1 for row from block data union { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 723caca5d7..2ca52ef5b3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { switch (state->state) { case SFSNEXTROW_FS: - state->aDFileSet = state->pTsdb->fs->cState->aDFileSet; + state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet; state->nFileSet = taosArrayGetSize(state->aDFileSet); state->iFileSet = state->nFileSet; @@ -814,7 +814,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -1189,7 +1189,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -1377,7 +1377,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { SDelIdx delIdx; - SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader *pDelFReader; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 470bff1eae..04fe5520fb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -64,9 +64,26 @@ int32_t tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return code; - code = tsdbMemTableCreate(pTsdb, &pTsdb->mem); + SMemTable *pMemTable; + code = tsdbMemTableCreate(pTsdb, &pMemTable); if (code) goto _err; + // lock + code = taosThreadRwlockWrlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + + pTsdb->mem = pMemTable; + + // unlock + code = taosThreadRwlockUnlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _err; + } + return code; _err: @@ -83,9 +100,11 @@ int32_t tsdbCommit(STsdb *pTsdb) { // check if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { - // TODO: lock? + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->mem = NULL; - tsdbMemTableDestroy(pMemTable); + taosThreadRwlockUnlock(&pTsdb->rwLock); + + tsdbUnrefMemTable(pMemTable); goto _exit; } @@ -139,7 +158,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { goto _err; } - SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile; + SDelFile *pDelFileR = pTsdb->pFS->nState->pDelFile; if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); if (code) goto _err; @@ -228,7 +247,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); if (code) goto _err; - code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel); + code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pCommitter->pDelFWriter->fDel); if (code) goto _err; code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); @@ -263,7 +282,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { taosArrayClear(pCommitter->aBlockIdx); tMapDataReset(&pCommitter->oBlockMap); tBlockDataReset(&pCommitter->oBlockData); - pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid, TD_EQ); + pRSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, pCommitter->commitFid, TD_EQ); if (pRSet) { code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; @@ -836,7 +855,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { if (code) goto _err; // upsert SDFileSet - code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter)); + code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter)); if (code) goto _err; // close and sync @@ -954,7 +973,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; return code; @@ -1135,13 +1154,16 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { SMemTable *pMemTable = pTsdb->imem; if (eno == 0) { - code = tsdbFSCommit(pTsdb->fs); + code = tsdbFSCommit(pTsdb->pFS); } else { - code = tsdbFSRollback(pTsdb->fs); + code = tsdbFSRollback(pTsdb->pFS); } - tsdbMemTableDestroy(pMemTable); + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->imem = NULL; + taosThreadRwlockUnlock(&pTsdb->rwLock); + + tsdbUnrefMemTable(pMemTable); tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 82de931872..d0a719b5cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -41,6 +41,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { } taosInitRWLatch(&pMemTable->latch); pMemTable->pTsdb = pTsdb; + pMemTable->pPool = pTsdb->pVnode->inUse; pMemTable->nRef = 1; pMemTable->minKey = TSKEY_MAX; pMemTable->maxKey = TSKEY_MIN; @@ -590,3 +591,58 @@ _err: } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } + +void tsdbRefMemTable(SMemTable *pMemTable) { + int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); + ASSERT(nRef > 0); +} + +void tsdbUnrefMemTable(SMemTable *pMemTable) { + int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); + if (nRef == 0) { + tsdbMemTableDestroy(pMemTable); + } +} + +int32_t tsdbTakeMemSnapshot(STsdb *pTsdb, SMemTable **ppMem, SMemTable **ppIMem) { + int32_t code = 0; + + // lock + code = taosThreadRwlockRdlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _exit; + } + + // take snapshot + *ppMem = pTsdb->mem; + *ppIMem = pTsdb->imem; + + if (*ppMem) { + tsdbRefMemTable(*ppMem); + } + + if (*ppIMem) { + tsdbRefMemTable(*ppIMem); + } + + // unlock + code = taosThreadRwlockUnlock(&pTsdb->rwLock); + if (code) { + code = TAOS_SYSTEM_ERROR(code); + goto _exit; + } + +_exit: + return code; +} + +void tsdbUntakeMemSnapshot(STsdb *pTsdb, SMemTable *pMem, SMemTable *pIMem) { + if (pMem) { + tsdbUnrefMemTable(pMem); + } + + if (pIMem) { + tsdbUnrefMemTable(pIMem); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 1fa582fae5..064c7adf4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -54,8 +54,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir); taosRealPath(pTsdb->path, NULL, slen); pTsdb->pVnode = pVnode; - pTsdb->repoLocked = false; - taosThreadMutexInit(&pTsdb->mutex, NULL); + taosThreadRwlockInit(&pTsdb->rwLock, NULL); if (!pKeepCfg) { tsdbSetKeepCfg(&pTsdb->keepCfg, &pVnode->config.tsdbCfg); } else { @@ -67,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee tfsMkdir(pVnode->pTfs, pTsdb->path); // open tsdb - if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) { + if (tsdbFSOpen(pTsdb, &pTsdb->pFS) < 0) { goto _err; } @@ -88,33 +87,10 @@ _err: int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { - taosThreadMutexDestroy(&(*pTsdb)->mutex); - tsdbFSClose((*pTsdb)->fs); + taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + tsdbFSClose((*pTsdb)->pFS); tsdbCloseCache((*pTsdb)->lruCache); taosMemoryFreeClear(*pTsdb); } return 0; } - -int tsdbLockRepo(STsdb *pTsdb) { - int code = taosThreadMutexLock(&pTsdb->mutex); - if (code != 0) { - tsdbError("vgId:%d, failed to lock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - pTsdb->repoLocked = true; - return 0; -} - -int tsdbUnlockRepo(STsdb *pTsdb) { - // ASSERT(IS_REPO_LOCKED(pTsdb)); - pTsdb->repoLocked = false; - int code = taosThreadMutexUnlock(&pTsdb->mutex); - if (code != 0) { - tsdbError("vgId:%d, failed to unlock tsdb since %s", TD_VID(pTsdb->pVnode), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0b05bb7224..a09bc32c60 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1905,7 +1905,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); - SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->cState); if (pDelFile) { SDelFReader* pDelFReader = NULL; code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); @@ -2795,7 +2795,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl SDataBlockIter* pBlockIter = &pReader->status.blockIter; - STsdbFSState* pFState = pReader->pTsdb->fs->cState; + STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); @@ -3042,7 +3042,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDataFReaderClose(&pReader->pFileReader); - STsdbFSState* pFState = pReader->pTsdb->fs->cState; + STsdbFSState* pFState = pReader->pTsdb->pFS->cState; initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); resetDataBlockScanInfo(pReader->status.pTableMap); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 2ae646a571..137ef9a4a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -20,10 +20,10 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t STsdbFSState *pState; if (try) { - pState = pTsdb->fs->cState; + pState = pTsdb->pFS->cState; *canDo = 0; } else { - pState = pTsdb->fs->nState; + pState = pTsdb->pFS->nState; } for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) { @@ -83,7 +83,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (!canDo) goto _exit; // begin - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; // do retention @@ -91,7 +91,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (code) goto _err; // commit - code = tsdbFSCommit(pTsdb->fs); + code = tsdbFSCommit(pTsdb->pFS); if (code) goto _err; _exit: @@ -99,6 +99,6 @@ _exit: _err: tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - tsdbFSRollback(pTsdb->fs); + tsdbFSRollback(pTsdb->pFS); return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 4886d874a9..fea0254045 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -45,7 +45,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { while (true) { if (pReader->pDataFReader == NULL) { - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT); + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->cState, pReader->fid, TD_GT); if (pSet == NULL) goto _exit; @@ -159,7 +159,7 @@ _err: static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; - SDelFile* pDelFile = pTsdb->fs->cState->pDelFile; + SDelFile* pDelFile = pTsdb->pFS->cState->pDelFile; if (pReader->pDelFReader == NULL) { if (pDelFile == NULL) { @@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); if (code) goto _err; - code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); + code = tsdbFSStateUpsertDFileSet(pTsdb->pFS->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); if (code) goto _err; code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); @@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 pWriter->fid = fid; // read - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->pFS->nState, fid, TD_EQ); if (pSet) { code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; @@ -907,7 +907,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 STsdb* pTsdb = pWriter->pTsdb; if (pWriter->pDelFWriter == NULL) { - SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->nState); + SDelFile* pDelFile = tsdbFSStateGetDelFile(pTsdb->pFS->nState); // reader if (pDelFile) { @@ -1017,7 +1017,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); if (code) goto _err; - code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pWriter->pDelFWriter->fDel); + code = tsdbFSStateUpsertDelFile(pTsdb->pFS->nState, &pWriter->pDelFWriter->fDel); if (code) goto _err; code = tsdbDelFWriterClose(&pWriter->pDelFWriter, 1); @@ -1096,7 +1096,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr goto _err; } - code = tsdbFSBegin(pTsdb->fs); + code = tsdbFSBegin(pTsdb->pFS); if (code) goto _err; *ppWriter = pWriter; @@ -1113,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { STsdbSnapWriter* pWriter = *ppWriter; if (rollback) { - code = tsdbFSRollback(pWriter->pTsdb->fs); + code = tsdbFSRollback(pWriter->pTsdb->pFS); if (code) goto _err; } else { code = tsdbSnapWriteDataEnd(pWriter); @@ -1122,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; - code = tsdbFSCommit(pWriter->pTsdb->fs); + code = tsdbFSCommit(pWriter->pTsdb->pFS); if (code) goto _err; } From 6b454a4dd43844fb447436428f54af3c9698bb0b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Jul 2022 06:26:09 +0000 Subject: [PATCH 2/4] more --- source/dnode/vnode/src/inc/tsdb.h | 9 +-------- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 ++-- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index eb5f66d443..30a6188db0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -279,21 +279,14 @@ typedef struct { struct STsdb { char *path; SVnode *pVnode; - TdThreadRwlock rwLock; STsdbKeepCfg keepCfg; + TdThreadRwlock rwLock; SMemTable *mem; SMemTable *imem; STsdbFS *pFS; SLRUCache *lruCache; }; -struct STable { - uint64_t suid; - uint64_t uid; - STSchema *pSchema; // latest schema - STSchema *pCacheSchema; // cached cache -}; - struct TSDBKEY { int64_t version; TSKEY ts; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 04fe5520fb..13f310ae27 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -960,10 +960,10 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { memset(pCommitter, 0, sizeof(*pCommitter)); ASSERT(pTsdb->mem && pTsdb->imem == NULL); - // lock(); + taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; - // unlock(); + taosThreadRwlockUnlock(&pTsdb->rwLock); pCommitter->pTsdb = pTsdb; pCommitter->commitID = pTsdb->pVnode->state.commitID; From 657c2dcaa4f10f97e3a9ba67bbbe8db6cf7eb31d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Jul 2022 07:21:15 +0000 Subject: [PATCH 3/4] more r/w concurrency --- source/dnode/vnode/src/tsdb/tsdbCache.c | 17 +++++++++++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 14 ++++++++++---- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2ca52ef5b3..484020e6e1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -793,6 +793,9 @@ typedef struct { TSDBROW memRow, imemRow, fsRow; TsdbNextRowState input[3]; + SMemTable *pMemTable; + SMemTable *pIMemTable; + STsdb *pTsdb; } CacheNextRowIter; static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb) { @@ -800,16 +803,20 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + tsdbTakeMemSnapshot(pTsdb, &pIter->pMemTable, &pIter->pIMemTable); + STbData *pMem = NULL; - if (pTsdb->mem) { - tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + if (pIter->pMemTable) { + tsdbGetTbDataFromMemTable(pIter->pMemTable, suid, uid, &pMem); } STbData *pIMem = NULL; - if (pTsdb->imem) { - tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + if (pIter->pIMemTable) { + tsdbGetTbDataFromMemTable(pIter->pIMemTable, suid, uid, &pIMem); } + pIter->pTsdb = pTsdb; + pIter->pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); SDelIdx delIdx; @@ -878,6 +885,8 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { taosArrayDestroy(pIter->pSkyline); } + tsdbUntakeMemSnapshot(pIter->pTsdb, pIter->pMemTable, pIter->pIMemTable); + return code; _err: return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a09bc32c60..d605f564a5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -118,6 +118,8 @@ struct STsdbReader { char* idStr; // query info handle, for debug purpose int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; + SMemTable* pMem; + SMemTable* pIMem; SIOCostSummary cost; STSchema* pSchema; @@ -1847,8 +1849,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea int32_t backward = (!ASCENDING_TRAVERSE(pReader->order)); STbData* d = NULL; - if (pReader->pTsdb->mem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); + if (pReader->pMem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pMem, pReader->suid, pBlockScanInfo->uid, &d); if (d != NULL) { code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -1868,8 +1870,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea } STbData* di = NULL; - if (pReader->pTsdb->imem != NULL) { - tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); + if (pReader->pIMem != NULL) { + tsdbGetTbDataFromMemTable(pReader->pIMem, pReader->suid, pBlockScanInfo->uid, &di); if (di != NULL) { code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter); if (code == TSDB_CODE_SUCCESS) { @@ -2809,6 +2811,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } } + tsdbTakeMemSnapshot(pReader->pTsdb, &pReader->pMem, &pReader->pIMem); + tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; @@ -2824,6 +2828,8 @@ void tsdbReaderClose(STsdbReader* pReader) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + tsdbUntakeMemSnapshot(pReader->pTsdb, pReader->pMem, pReader->pIMem); + taosMemoryFreeClear(pSupInfo->plist); taosMemoryFree(pSupInfo->colIds); From 706b081f700cc1f06ef03bb79f24e9a967e3cc28 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Jul 2022 08:30:49 +0000 Subject: [PATCH 4/4] fix: r/w concurrency --- source/dnode/vnode/src/inc/vnd.h | 15 ++++---- source/dnode/vnode/src/inc/vnodeInt.h | 42 +++++++++++----------- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 ++ source/dnode/vnode/src/vnd/vnodeBufPool.c | 29 +++++++++++++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 16 ++++----- source/dnode/vnode/src/vnd/vnodeOpen.c | 4 +++ 6 files changed, 68 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index cb25e93cde..984b34814d 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -62,12 +62,13 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; - int64_t nRef; - int64_t size; - uint8_t* ptr; - SVBufPoolNode* pTail; - SVBufPoolNode node; + SVBufPool* next; + SVnode* pVnode; + volatile int32_t nRef; + int64_t size; + uint8_t* ptr; + SVBufPoolNode* pTail; + SVBufPoolNode node; }; int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); @@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); int32_t vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg); +int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb403f79a7..d724bcd3be 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr; // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); +void vnodeBufPoolRef(SVBufPool* pPool); +void vnodeBufPoolUnRef(SVBufPool* pPool); // meta typedef struct SMCtbCursor SMCtbCursor; @@ -247,26 +249,26 @@ struct STsdbKeepCfg { }; struct SVnode { - char* path; - SVnodeCfg config; - SVState state; - STfs* pTfs; - SMsgCb msgCb; - SVBufPool* pPool; - SVBufPool* inUse; - SVBufPool* onCommit; - SVBufPool* onRecycle; - SMeta* pMeta; - SSma* pSma; - STsdb* pTsdb; - SWal* pWal; - STQ* pTq; - SSink* pSink; - tsem_t canCommit; - int64_t sync; - int32_t blockCount; - tsem_t syncSem; - SQHandle* pQuery; + char* path; + SVnodeCfg config; + SVState state; + STfs* pTfs; + SMsgCb msgCb; + TdThreadMutex mutex; + TdThreadCond poolNotEmpty; + SVBufPool* pPool; + SVBufPool* inUse; + SMeta* pMeta; + SSma* pSma; + STsdb* pTsdb; + SWal* pWal; + STQ* pTq; + SSink* pSink; + tsem_t canCommit; + int64_t sync; + int32_t blockCount; + tsem_t syncSem; + SQHandle* pQuery; }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d0a719b5cd..ee8a23e76e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -55,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { taosMemoryFree(pMemTable); goto _err; } + vnodeBufPoolRef(pMemTable->pPool); *ppMemTable = pMemTable; return code; @@ -66,6 +67,7 @@ _err: void tsdbMemTableDestroy(SMemTable *pMemTable) { if (pMemTable) { + vnodeBufPoolUnRef(pMemTable->pPool); taosArrayDestroy(pMemTable->aTbData); taosMemoryFree(pMemTable); } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 9ca4dd6efb..d1584f701e 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -17,7 +17,7 @@ /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool); +static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool); static int vnodeBufPoolDestroy(SVBufPool *pPool); int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { @@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { for (int i = 0; i < 3; i++) { // create pool - ret = vnodeBufPoolCreate(size, &pPool); + ret = vnodeBufPoolCreate(pVnode, size, &pPool); if (ret < 0) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); @@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) { } // STATIC METHODS ------------------- -static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { +static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); @@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { } pPool->next = NULL; + pPool->pVnode = pVnode; pPool->nRef = 0; pPool->size = 0; pPool->ptr = pPool->node.data; @@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); taosMemoryFree(pPool); return 0; +} + +void vnodeBufPoolRef(SVBufPool *pPool) { + int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1); + ASSERT(nRef > 0); +} + +void vnodeBufPoolUnRef(SVBufPool *pPool) { + int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); + if (nRef == 0) { + SVnode *pVnode = pPool->pVnode; + + vnodeBufPoolReset(pPool); + + taosThreadMutexLock(&pVnode->mutex); + + pPool->next = pVnode->pPool; + pVnode->pPool = pPool; + taosThreadCondSignal(&pVnode->poolNotEmpty); + + taosThreadMutexUnlock(&pVnode->mutex); + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index c969861241..0c8a8a5fd1 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,7 +15,7 @@ #include "vnd.h" -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); @@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode); int vnodeBegin(SVnode *pVnode) { // alloc buffer pool - /* pthread_mutex_lock(); */ + taosThreadMutexLock(&pVnode->mutex); while (pVnode->pPool == NULL) { - /* pthread_cond_wait(); */ + taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); } pVnode->inUse = pVnode->pPool; + pVnode->inUse->nRef = 1; pVnode->pPool = pVnode->inUse->next; pVnode->inUse->next = NULL; - /* ref pVnode->inUse buffer pool */ - /* pthread_mutex_unlock(); */ + taosThreadMutexUnlock(&pVnode->mutex); pVnode->state.commitID++; // begin meta @@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - pVnode->onCommit = pVnode->inUse; + vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; // save info @@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) { // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vnodeBufPoolReset(pVnode->onCommit); - pVnode->onCommit->next = pVnode->pPool; - pVnode->pPool = pVnode->onCommit; - pVnode->onCommit = NULL; vInfo("vgId:%d, commit over", TD_VID(pVnode)); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 0914827950..fb2ac722a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); + taosThreadMutexInit(&pVnode->mutex, NULL); + taosThreadCondInit(&pVnode->poolNotEmpty, NULL); // open buffer pool if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { @@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) { // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); + taosThreadCondDestroy(&pVnode->poolNotEmpty); + taosThreadMutexDestroy(&pVnode->mutex); taosMemoryFree(pVnode); } }