Merge branch 'main' of https://github.com/taosdata/TDengine into main

This commit is contained in:
jiajingbin 2022-12-20 08:39:12 +08:00
commit 0bdaf3bc0f
10 changed files with 77 additions and 19 deletions

View File

@ -739,6 +739,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
SArray* pArray = NULL;
SSubmitRsp* pRsp = (SSubmitRsp*)res;
if (pRsp->nBlocks <= 0) {
taosMemoryFreeClear(pRsp->pBlocks);
return TSDB_CODE_SUCCESS;
}

View File

@ -174,6 +174,7 @@ typedef struct {
void* param;
char opername[TSDB_TRANS_OPER_LEN];
SArray* pRpcArray;
SRWLatch lockRpcArray;
} STrans;
typedef struct {

View File

@ -628,6 +628,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
taosInitRWLatch(&pTrans->lockRpcArray);
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) {
@ -737,12 +738,14 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname) == 0) {
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
taosWLockLatch(&pTrans->lockRpcArray);
if (pTrans->pRpcArray == NULL) {
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->pRpcArray = taosArrayInit(4, sizeof(SRpcHandleInfo));
}
if (pTrans->pRpcArray != NULL && taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) {
code = 0;
}
taosWUnLockLatch(&pTrans->lockRpcArray);
sdbRelease(pMnode->pSdb, pTrans);
break;
@ -944,8 +947,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
pTrans->failedTimes, code);
}
taosWLockLatch(&pTrans->lockRpcArray);
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
if (size <= 0) return;
if (size <= 0) {
taosWUnLockLatch(&pTrans->lockRpcArray);
return;
}
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
@ -997,6 +1004,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
taosArrayClear(pTrans->pRpcArray);
taosWUnLockLatch(&pTrans->lockRpcArray);
}
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {

View File

@ -161,7 +161,10 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) {
metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
code = metaAbort(pWriter->pMeta);
metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
code);
if (code) goto _err;
} else {
code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);

View File

@ -710,6 +710,9 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
return 0;
}
metaDebug("vgId:%d, start to save ctime:%" PRId64 " uid:%" PRId64 " ct:%" PRId64, TD_VID(pMeta->pVnode), pME->version,
pME->uid, ctimeKey.ctime);
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
}

View File

@ -192,6 +192,28 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
return pPage;
}
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage) {
tdbPCacheLock(pCache);
tdbPCacheRemovePageFromHash(pCache, pPage);
pPage->isFree = 1;
tdbPCacheUnlock(pCache);
}
static void tdbPCacheFreePage(SPCache *pCache, SPage *pPage) {
if (pPage->id < pCache->nPages) {
pPage->pFreeNext = pCache->pFree;
pCache->pFree = pPage;
pPage->isFree = 0;
++pCache->nFree;
tdbTrace("pcache/free page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
} else {
tdbTrace("pcache destroy page: %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
tdbPCacheRemovePageFromHash(pCache, pPage);
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
}
}
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
i32 nRef;
@ -209,7 +231,11 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
// nRef = tdbGetPageRef(pPage);
// if (nRef == 0) {
if (pPage->isLocal) {
tdbPCacheUnpinPage(pCache, pPage);
if (!pPage->isFree) {
tdbPCacheUnpinPage(pCache, pPage);
} else {
tdbPCacheFreePage(pCache, pPage);
}
} else {
if (TDB_TXN_IS_WRITE(pTxn)) {
// remove from hash

View File

@ -111,6 +111,9 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
tdbTrace("page/init: %p %" PRIu8 " %p", pPage, szAmHdr, xCellSize);
pPage->pPageHdr = pPage->pData + szAmHdr;
if (TDB_PAGE_NCELLS(pPage) == 0) {
return tdbPageZero(pPage, szAmHdr, xCellSize);
}
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);

View File

@ -466,11 +466,19 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
return -1;
}
if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
if (pageBuf == NULL) {
return -1;
}
tdbDebug("tdb/abort: pager:%p,", pPager);
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// read pgno & the page from journal
SPgno pgno;
@ -481,6 +489,8 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
return -1;
}
tdbTrace("tdb/abort: pgno:%d,", pgno);
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbOsFree(pageBuf);
@ -524,6 +534,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
tdbPCacheMarkFree(pPager->pCache, pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
@ -577,12 +588,12 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
return -1;
}
tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
pPager->dbOrigSize = maxPgno;
pPage->isDirty = 0;
tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
@ -829,7 +840,7 @@ static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
return 0;
}
static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
int ret = 0;
SPgno journalSize = 0;
u8 *pageBuf = NULL;
@ -907,7 +918,7 @@ static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
return 0;
}
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
int tdbPagerRestoreJournals(SPager *pPager) {
tdbDirEntryPtr pDirEntry;
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
if (pDir == NULL) {
@ -918,7 +929,7 @@ int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
if (tdbPagerRestore(pPager, pBt, name) < 0) {
if (tdbPagerRestore(pPager, name) < 0) {
tdbCloseDir(&pDir);
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);

View File

@ -107,6 +107,16 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
ASSERT(pPager != NULL);
if (rollback) {
tdbPagerRollback(pPager);
} else {
ret = tdbPagerRestoreJournals(pPager);
if (ret < 0) {
tdbOsFree(pTb);
return -1;
}
}
// pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, pEnv, &(pTb->pBt));
if (ret < 0) {
@ -114,16 +124,6 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
return -1;
}
if (rollback) {
tdbPagerRollback(pPager);
} else {
ret = tdbPagerRestoreJournals(pPager, pTb->pBt);
if (ret < 0) {
tdbOsFree(pTb);
return -1;
}
}
*ppTb = pTb;
return 0;
}

View File

@ -197,7 +197,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt);
int tdbPagerRestoreJournals(SPager *pPager);
int tdbPagerRollback(SPager *pPager);
// tdbPCache.c ====================================
@ -205,6 +205,7 @@ int tdbPagerRollback(SPager *pPager);
u8 isAnchor; \
u8 isLocal; \
u8 isDirty; \
u8 isFree; \
volatile i32 nRef; \
i32 id; \
SPage *pFreeNext; \
@ -222,6 +223,7 @@ int tdbPCacheClose(SPCache *pCache);
int tdbPCacheAlter(SPCache *pCache, int32_t nPage);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage);
int tdbPCacheGetPageSize(SPCache *pCache);
// tdbPage.c ====================================