enh: handle void

This commit is contained in:
Hongze Cheng 2024-09-23 15:33:28 +08:00
parent 5639fd0baf
commit 00ef87dd3f
13 changed files with 168 additions and 87 deletions

View File

@ -91,12 +91,7 @@ int metaAbort(SMeta *pMeta) {
return 0;
}
int code = tdbAbort(pMeta->pEnv, pMeta->txn);
if (code) {
metaError("vgId:%d, failed to abort meta since %s", TD_VID(pMeta->pVnode), tstrerror(terrno));
} else {
pMeta->txn = NULL;
}
return code;
tdbAbort(pMeta->pEnv, pMeta->txn);
pMeta->txn = NULL;
return 0;
}

View File

@ -42,7 +42,7 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqS
pReader->type = type;
// impl
TTB *pTb = NULL;
TTB* pTb = NULL;
if (type == SNAP_DATA_TQ_CHECKINFO) {
pTb = pTq->pCheckStore;
} else if (type == SNAP_DATA_TQ_HANDLE) {
@ -132,7 +132,8 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);;
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
;
goto _err;
}
pWriter->pTq = pTq;
@ -160,7 +161,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq;
if (rollback) {
(void)tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
@ -189,7 +190,8 @@ int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto end;
taosWLockLatch(&pTq->lock);
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, (int)strlen(handle.subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, (int)strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr));
taosWUnLockLatch(&pTq->lock);
end:
@ -200,15 +202,16 @@ end:
}
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqCheckInfo info = {0};
code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if(code != 0){
if (code != 0) {
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr));
tDeleteSTqCheckInfo(&info);
if (code) goto _err;
@ -220,22 +223,23 @@ _err:
}
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqOffset info = {0};
code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if(code != 0){
if (code != 0) {
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr));
tDeleteSTqOffset(&info);
if (code) goto _err;
return code;
_err:
_err:
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -210,7 +210,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback, i
streamMetaWLock(pTq->pStreamMeta);
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
if (rollback) {
(void)tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
} else {
code = tdbCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
if (code) goto _err;

View File

@ -597,10 +597,7 @@ void streamMetaCloseImpl(void* arg) {
streamMetaWUnLock(pMeta);
// already log the error, ignore here
code = tdbAbort(pMeta->db, pMeta->txn);
if (code) {
stError("vgId:%d failed to jump of trans for tdb, code:%s", vgId, tstrerror(code));
}
tdbAbort(pMeta->db, pMeta->txn);
code = tdbTbClose(pMeta->pTaskDb);
if (code) {
stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code));
@ -895,7 +892,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
}
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
if (sizeInList != size) {
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
@ -1077,7 +1074,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask);
void* px = taosArrayPush(pRecycleList, &id);
void* px = taosArrayPush(pRecycleList, &id);
if (px == NULL) {
stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
}

View File

@ -40,7 +40,7 @@ int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn);
int32_t tdbAbort(TDB *pDb, TXN *pTxn);
void tdbAbort(TDB *pDb, TXN *pTxn);
int32_t tdbAlter(TDB *pDb, int pages);
// TTB
@ -79,11 +79,11 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i
int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *),
void *xArg, int flags);
int32_t tdbTxnCloseImpl(TXN *pTxn);
#define tdbTxnClose(pTxn) \
do { \
(void)tdbTxnCloseImpl(pTxn); \
(pTxn) = NULL; \
void tdbTxnCloseImpl(TXN *pTxn);
#define tdbTxnClose(pTxn) \
do { \
tdbTxnCloseImpl(pTxn); \
(pTxn) = NULL; \
} while (0)
// other

View File

@ -118,7 +118,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, txn);
if (ret < 0) {
(void)tdbAbort(pEnv, txn);
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return ret;
}
@ -126,7 +126,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
(void)tdbAbort(pEnv, txn);
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return ret;
}
@ -139,7 +139,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), txn);
if (ret < 0) {
(void)tdbAbort(pEnv, txn);
tdbAbort(pEnv, txn);
tdbOsFree(pBt);
return ret;
}
@ -513,7 +513,10 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
}
// Copy the root page content to the child page
(void)tdbPageCopy(pRoot, pChild, 0);
ret = tdbPageCopy(pRoot, pChild, 0);
if (ret < 0) {
return ret;
}
// Reinitialize the root page
zArg.flags = TDB_BTREE_ROOT;
@ -633,14 +636,22 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
}
(void)tdbPageDropCell(pParent, sIdx, pTxn, pBt);
ret = tdbPageDropCell(pParent, sIdx, pTxn, pBt);
if (ret < 0) {
tdbError("tdb/btree-balance: drop cell failed with ret: %d.", ret);
return TSDB_CODE_FAILED;
}
if (!childNotLeaf) {
SArray *ofps = pParent->pPager->ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
(void)tdbPagerInsertFreePage(pParent->pPager, ofp, pTxn);
ret = tdbPagerInsertFreePage(pParent->pPager, ofp, pTxn);
if (ret < 0) {
tdbError("tdb/btree-balance: insert free page failed with ret: %d.", ret);
return TSDB_CODE_FAILED;
}
}
if (destroyOfps) {
@ -853,7 +864,11 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
if (iNew == nNews - 1 && pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
} else {
(void)tdbBtreeDecodeCell(pPage, pCell, &cd, pTxn, pBt);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd, pTxn, pBt);
if (ret < 0) {
tdbError("tdb/btree-balance: decode cell failed with ret: %d.", ret);
return TSDB_CODE_FAILED;
}
// TODO: pCell here may be inserted as an overflow cell, handle it
SCell *pNewCell = tdbOsMalloc(cd.kLen + 9);
@ -863,8 +878,12 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
int szNewCell;
SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]);
(void)tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell,
ret = tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell,
pTxn, pBt);
if (ret < 0) {
tdbError("tdb/btree-balance: encode cell failed with ret: %d.", ret);
return TSDB_CODE_FAILED;
}
ret = tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
if (ret) {
tdbError("tdb/btree-balance: insert cell failed with ret: %d.", ret);
@ -979,7 +998,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) {
if (pageIdx >= nNews) {
(void)tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn);
ret = tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn);
if (ret < 0) {
return ret;
}
}
tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn);
}
@ -2189,7 +2211,11 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
(void)tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder, pBtc->pTxn, pBtc->pBt);
int32_t ret = tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btc-get: decode cell failed with ret: %d.", ret);
return ret;
}
if (ppKey) {
*ppKey = (void *)pBtc->coder.pKey;
@ -2238,13 +2264,19 @@ int tdbBtcDelete(SBTC *pBtc) {
destroyOfps = true;
}
(void)tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
ret = tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btc-delete: page drop cell failed with ret: %d.", ret);
}
SArray *ofps = pBtc->pPage->pPager->ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
(void)tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
ret = tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
if (ret < 0) {
tdbError("tdb/btc-delete: insert free page failed with ret: %d.", ret);
}
}
if (destroyOfps) {
@ -2282,7 +2314,10 @@ int tdbBtcDelete(SBTC *pBtc) {
tdbError("tdb/btc-delete: malloc failed.");
return terrno;
}
(void)tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell, pBtc->pTxn, pBtc->pBt);
ret = tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbError("tdb/btc-delete: btree encode cell failed with ret: %d.", ret);
}
ret = tdbPageUpdateCell(pPage, idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {

View File

@ -101,10 +101,10 @@ int tdbClose(TDB *pDb) {
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
pDb->pgrList = pPager->pNext;
(void)tdbPagerClose(pPager);
tdbPagerClose(pPager);
}
(void)tdbPCacheClose(pDb->pCache);
tdbPCacheClose(pDb->pCache);
tdbOsFree(pDb->pgrHash);
tdbOsFree(pDb);
}
@ -199,7 +199,7 @@ int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn) {
return 0;
}
int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
void tdbAbort(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;
@ -208,13 +208,12 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
if (ret < 0) {
tdbError("failed to abort pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
pTxn->txnId);
return ret;
}
}
tdbTxnClose(pTxn);
return 0;
return;
}
SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {

View File

@ -42,12 +42,31 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage);
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage);
static int tdbPCacheCloseImpl(SPCache *pCache);
static void tdbPCacheCloseImpl(SPCache *pCache);
static void tdbPCacheInitLock(SPCache *pCache) { (void)tdbMutexInit(&(pCache->mutex), NULL); }
static void tdbPCacheDestroyLock(SPCache *pCache) { (void)tdbMutexDestroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { (void)tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { (void)tdbMutexUnlock(&(pCache->mutex)); }
static void tdbPCacheInitLock(SPCache *pCache) {
if (tdbMutexInit(&(pCache->mutex), NULL) != 0) {
tdbError("tdb/pcache: mutex init failed.");
}
}
static void tdbPCacheDestroyLock(SPCache *pCache) {
if (tdbMutexDestroy(&(pCache->mutex)) != 0) {
tdbError("tdb/pcache: mutex destroy failed.");
}
}
static void tdbPCacheLock(SPCache *pCache) {
if (tdbMutexLock(&(pCache->mutex)) != 0) {
tdbError("tdb/pcache: mutex lock failed.");
}
}
static void tdbPCacheUnlock(SPCache *pCache) {
if (tdbMutexUnlock(&(pCache->mutex)) != 0) {
tdbError("tdb/pcache: mutex unlock failed.");
}
}
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
int32_t code = 0;
@ -74,7 +93,7 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
_exit:
if (code) {
tdbError("%s failed at %s:%d since %s", __func__, __FILE__, __LINE__, tstrerror(code));
(void)tdbPCacheClose(pCache);
tdbPCacheClose(pCache);
*ppCache = NULL;
} else {
*ppCache = pCache;
@ -82,13 +101,13 @@ _exit:
return code;
}
int tdbPCacheClose(SPCache *pCache) {
void tdbPCacheClose(SPCache *pCache) {
if (pCache) {
(void)tdbPCacheCloseImpl(pCache);
tdbPCacheCloseImpl(pCache);
tdbOsFree(pCache->aPage);
tdbOsFree(pCache);
}
return 0;
return;
}
// TODO:
@ -514,7 +533,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
return 0;
}
static int tdbPCacheCloseImpl(SPCache *pCache) {
static void tdbPCacheCloseImpl(SPCache *pCache) {
// free free page
for (SPage *pPage = pCache->pFree; pPage;) {
SPage *pPageT = pPage->pFreeNext;
@ -532,5 +551,5 @@ static int tdbPCacheCloseImpl(SPCache *pCache) {
tdbOsFree(pCache->pgHash);
tdbPCacheDestroyLock(pCache);
return 0;
return ;
}

View File

@ -64,7 +64,10 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
memset(ptr, 0, size);
pPage = (SPage *)(ptr + pageSize);
(void)TDB_INIT_PAGE_LOCK(pPage);
int32_t code = TDB_INIT_PAGE_LOCK(pPage);
if (code) {
tdbError("tdb/page-create: init page lock failed.");
}
pPage->pageSize = pageSize;
pPage->pData = ptr;
if (pageSize < 65536) {

View File

@ -95,7 +95,7 @@ static int hashset_add(hashset_t set, void *item) {
set->nitems = 0;
for (size_t i = 0; i < old_capacity; ++i) {
(void)hashset_add_member(set, (void *)old_items[i]);
int nt = hashset_add_member(set, (void *)old_items[i]);
}
tdbOsFree(old_items);
}
@ -209,12 +209,15 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
return 0;
}
int tdbPagerClose(SPager *pPager) {
void tdbPagerClose(SPager *pPager) {
if (pPager) {
(void)tdbOsClose(pPager->fd);
int32_t code = tdbOsClose(pPager->fd);
if (code) {
tdbWarn("failed to close file since %s", tstrerror(code));
}
tdbOsFree(pPager);
}
return 0;
return;
}
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
@ -224,14 +227,14 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
if (pPage->isDirty) return 0;
// ref page one more time so the page will not be release
(void)tdbRefPage(pPage);
tdbTrace("pager/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
int32_t nRef = tdbRefPage(pPage);
tdbTrace("pager/mdirty page %p/%d/%d, ref:%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
// Set page as dirty
pPage->isDirty = 1;
tdbTrace("tdb/pager-write: put page: %p %d to dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
(void)tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
SRBTreeNode *tnode = tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
// Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
@ -244,7 +247,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
}
if (pPager->pActiveTxn->jPageSet) {
(void)hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
int32_t nt = hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
}
}
@ -340,7 +343,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pTxn->jPageSet) {
(void)hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
int32_t nt = hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
}
tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
@ -577,7 +580,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
(void)hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
int32_t nt = hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
tdbPCacheMarkFree(pPager->pCache, pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
@ -699,7 +702,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
(void)tdbPagerFlushPage(pPager, pTxn);
int32_t code = tdbPagerFlushPage(pPager, pTxn);
if (code) {
tdbError("tdb/pager: %p, pPage: %p, flush page failed.", pPager, pPage);
return code;
}
}
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
@ -879,7 +886,9 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
lcode = TDB_TRY_LOCK_PAGE(pPage);
if (lcode == P_LOCK_SUCC) {
if (TDB_PAGE_INITIALIZED(pPage)) {
(void)TDB_UNLOCK_PAGE(pPage);
if (TDB_UNLOCK_PAGE(pPage) != 0) {
tdbError("tdb/pager:%p, pgno:%d, unlock page failed.", pPager, pgno);
}
return 0;
}
@ -893,7 +902,10 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
tdbTrace("tdb/pager:%p, pgno:%d, nRead:%" PRId64, pPager, pgno, nRead);
if (nRead < pPage->pageSize) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32, pPager, pgno, nRead, pPage->pageSize);
(void)TDB_UNLOCK_PAGE(pPage);
if (TDB_UNLOCK_PAGE(pPage) < 0) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno,
nRead, pPage->pageSize);
}
return TAOS_SYSTEM_ERROR(errno);
}
@ -939,7 +951,10 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
if (ret < 0) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " init page failed.", pPager, pgno, nRead,
pPage->pageSize);
(void)TDB_UNLOCK_PAGE(pPage);
if (TDB_UNLOCK_PAGE(pPage) != 0) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno, nRead,
pPage->pageSize);
}
return ret;
}
@ -947,7 +962,10 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
pPage->pPager = pPager;
(void)TDB_UNLOCK_PAGE(pPage);
if (TDB_UNLOCK_PAGE(pPage) != 0) {
tdbError("tdb/pager:%p, pgno:%d, nRead:%" PRId64 "pgSize:%" PRId32 " unlock page failed.", pPager, pgno, nRead,
pPage->pageSize);
}
} else if (lcode == P_LOCK_BUSY) {
nLoops = 0;
for (;;) {

View File

@ -112,7 +112,11 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
return ret;
}
} else {
(void)tdbPagerRollback(pPager);
ret = tdbPagerRollback(pPager);
if (ret < 0) {
tdbOsFree(pTb);
return ret;
}
}
// pTb->pBt
@ -202,7 +206,7 @@ int tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int va
int tdbTbDelete(TTB *pTb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pTb->pBt, pKey, kLen, pTxn); }
int tdbTbUpsert(TTB *pTb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
(void)tdbTbDelete(pTb, pKey, kLen, pTxn);
TAOS_UNUSED(tdbTbDelete(pTb, pKey, kLen, pTxn));
return tdbTbInsert(pTb, pKey, kLen, pVal, vLen, pTxn);
}
@ -241,7 +245,11 @@ int32_t tdbTbTraversal(TTB *pTb, void *data,
return ret;
}
(void)tdbTbcMoveToFirst(pCur);
ret = tdbTbcMoveToFirst(pCur);
if (ret < 0) {
tdbTbcClose(pCur);
return ret;
}
void *pKey = NULL;
int kLen = 0;

View File

@ -31,7 +31,7 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void
return 0;
}
int tdbTxnCloseImpl(TXN *pTxn) {
void tdbTxnCloseImpl(TXN *pTxn) {
if (pTxn) {
if (pTxn->jPageSet) {
hashset_destroy(pTxn->jPageSet);
@ -39,11 +39,14 @@ int tdbTxnCloseImpl(TXN *pTxn) {
}
if (pTxn->jfd) {
TAOS_UNUSED(tdbOsClose(pTxn->jfd));
int32_t code = tdbOsClose(pTxn->jfd);
if (code) {
tdbError("tdb/txn: close journal file failed, code:%d", code);
}
}
tdbOsFree(pTxn);
}
return 0;
return;
}

View File

@ -179,7 +179,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// tdbPager.c ====================================
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
void tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
@ -214,7 +214,7 @@ int tdbPagerRollback(SPager *pPager);
// For page ref
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache);
void tdbPCacheClose(SPCache *pCache);
int tdbPCacheAlter(SPCache *pCache, int32_t nPage);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);