From 081e7120c6dd247532fa96a540622fac462dae84 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 6 Aug 2020 14:04:55 +0800 Subject: [PATCH] make action atomic --- src/tsdb/inc/tsdbMain.h | 17 ++++++ src/tsdb/src/tsdbMain.c | 17 ++++++ src/tsdb/src/tsdbMemTable.c | 112 ++++++++++++++++++++---------------- src/tsdb/src/tsdbMeta.c | 31 ++++++---- 4 files changed, 114 insertions(+), 63 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index d3c4a3d515..2f90fa3921 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -123,6 +123,7 @@ typedef struct { int32_t maxTables; STableData** tData; SList* actList; + SList* extraBuffList; SList* bufBlockList; } SMemTable; @@ -392,6 +393,8 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } // ------------------ tsdbBuffer.c +#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold + STsdbBufPool* tsdbNewBufPool(); void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); @@ -425,6 +428,19 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { return dataRowKey(row); } +static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; + + SListNode* pNode = listTail(pRepo->mem->bufBlockList); + if (pNode == NULL) return NULL; + + STsdbBufBlock* pBufBlock = NULL; + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); + + return pBufBlock; +} + // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) @@ -523,6 +539,7 @@ char* tsdbGetDataDirName(char* rootDir); int tsdbGetNextMaxTables(int tid); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 1486adb65b..46801d0788 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -192,6 +192,8 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * } if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); + + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; } @@ -387,6 +389,21 @@ int tsdbGetNextMaxTables(int tid) { return maxTables + 1; } +int tsdbCheckCommit(STsdbRepo *pRepo) { + ASSERT(pRepo->mem != NULL); + STsdbCfg *pCfg = &(pRepo->config); + + STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + ASSERT(pBufBlock != NULL); + if ((pRepo->mem->extraBuffList != NULL) || + ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { + // trigger commit + if (tsdbAsyncCommit(pRepo) < 0) return -1; + } + + return 0; +} + STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 33e91bd59c..990db76b7e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -18,8 +18,6 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 -static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); - static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); @@ -202,44 +200,59 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + STsdbBufBlock *pBufBlock = NULL; + void * ptr = NULL; - if (pBufBlock != NULL && pBufBlock->remain < bytes) { - if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) { // need to commit mem - if (tsdbAsyncCommit(pRepo) < 0) return NULL; - } else { + // Either allocate from buffer blocks or from SYSTEM memory pool + if (pRepo->mem == NULL) { + SMemTable *pMemTable = tsdbNewMemTable(pRepo); + if (pMemTable == NULL) return NULL; + pRepo->mem = pMemTable; + } + + ASSERT(pRepo->mem != NULL); + + pBufBlock = tsdbGetCurrBufBlock(pRepo); + if ((pRepo->mem->extraBuffList != NULL) || + ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) { + // allocate from SYSTEM buffer pool + if (pRepo->mem->extraBuffList == NULL) { + pRepo->mem->extraBuffList = tdListNew(0); + if (pRepo->mem->extraBuffList == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + } + + ASSERT(pRepo->mem->extraBuffList != NULL); + SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + pNode->next = pNode->prev = NULL; + tdListAppend(pRepo->mem->extraBuffList, pNode); + ptr = (void *)(pNode->data); + tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes); + } else { // allocate from TSDB buffer pool + if (pBufBlock == NULL || pBufBlock->remain < bytes) { + ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3); if (tsdbLockRepo(pRepo) < 0) return NULL; SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); tdListAppendNode(pRepo->mem->bufBlockList, pNode); if (tsdbUnlockRepo(pRepo) < 0) return NULL; - } - } - - if (pRepo->mem == NULL) { - SMemTable *pMemTable = tsdbNewMemTable(pRepo); - if (pMemTable == NULL) return NULL; - - if (tsdbLockRepo(pRepo) < 0) { - tsdbFreeMemTable(pMemTable); - return NULL; + pBufBlock = tsdbGetCurrBufBlock(pRepo); } - SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); - tdListAppendNode(pMemTable->bufBlockList, pNode); - pRepo->mem = pMemTable; - - if (tsdbUnlockRepo(pRepo) < 0) return NULL; + ASSERT(pBufBlock->remain >= bytes); + ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset); + pBufBlock->offset += bytes; + pBufBlock->remain -= bytes; + tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, + listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); } - pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock->remain >= bytes); - void *ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset); - pBufBlock->offset += bytes; - pBufBlock->remain -= bytes; - - tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); - return ptr; } @@ -340,27 +353,23 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } // ---------------- LOCAL FUNCTIONS ---------------- -static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { - ASSERT(pRepo != NULL); - if (pRepo->mem == NULL) return NULL; - - SListNode *pNode = listTail(pRepo->mem->bufBlockList); - if (pNode == NULL) return NULL; - - STsdbBufBlock *pBufBlock = NULL; - tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock)); - - return pBufBlock; -} - static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock != NULL); - pBufBlock->offset -= bytes; - pBufBlock->remain += bytes; - ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); - tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); + ASSERT(pRepo->mem != NULL); + if (pRepo->mem->extraBuffList == NULL) { + STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + ASSERT(pBufBlock != NULL); + pBufBlock->offset -= bytes; + pBufBlock->remain += bytes; + ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); + tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, + listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); + } else { + SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -sizeof(SListNode)); + ASSERT(listTail(pRepo->mem->extraBuffList) == pNode); + tdListPopNode(pRepo->mem->extraBuffList, pNode); + free(pNode); + tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes); + } } static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { @@ -409,6 +418,7 @@ static void tsdbFreeMemTable(SMemTable* pMemTable) { ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0)); ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0)); + tdListFree(pMemTable->extraBuffList); tdListFree(pMemTable->bufBlockList); tdListFree(pMemTable->actList); taosTFree(pMemTable->tData); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 8db6a4a32a..c0d58f6332 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -120,20 +120,23 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { tsdbUnlockRepoMeta(pRepo); // Write to memtable action - int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; - int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); - int tlen = tlen1 + tlen2; - void *buf = tsdbAllocBytes(pRepo, tlen); - if (buf == NULL) { - goto _err; - } - + // TODO: refactor duplicate codes + int tlen = 0; + void *pBuf = NULL; if (newSuper) { - void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super); - ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1); - buf = pBuf; + tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, super); + pBuf = tsdbAllocBytes(pRepo, tlen); + if (pBuf == NULL) goto _err; + void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, super); + ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen); } - tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, table); + tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); + pBuf = tsdbAllocBytes(pRepo, tlen); + if (pBuf == NULL) goto _err; + void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, table); + ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen); + + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; @@ -182,6 +185,8 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); free(tbname); + if (tsdbCheckCommit(pRepo) < 0) goto _err; + return 0; _err: @@ -405,6 +410,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { } tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); + if (tsdbCheckCommit(pRepo) < 0) return -1; + return 0; }