tdb/api: migrate txn related api changes of meta, sma, tq, and stream meta/state

This commit is contained in:
Minglei Jin 2022-11-17 11:54:07 +08:00
parent 1441684c03
commit 78335f83d2
20 changed files with 274 additions and 268 deletions

View File

@ -35,7 +35,7 @@ typedef struct {
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TXN txn;
TXN* txn;
int32_t number;
} SStreamState;

View File

@ -562,7 +562,7 @@ typedef struct SStreamMeta {
SHashObj* pTasks;
SHashObj* pRecoverStatus;
void* ahandle;
TXN txn;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;

View File

@ -70,7 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta);
void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t delta);
struct SMeta {
TdThreadRwlock lock;
@ -78,7 +78,7 @@ struct SMeta {
char* path;
SVnode* pVnode;
TDB* pEnv;
TXN txn;
TXN* txn;
TTB* pTbDb;
TTB* pSkmDb;
TTB* pUidIdx;

View File

@ -20,12 +20,19 @@ static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVB
// begin a meta txn
int metaBegin(SMeta *pMeta, int8_t fromSys) {
void *(*xMalloc)(void *, size_t);
void (*xFree)(void *, void *);
void *xArg = NULL;
if (fromSys) {
tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
xMalloc = tdbDefaultMalloc;
xFree = tdbDefaultFree;
} else {
tdbTxnOpen(&pMeta->txn, 0, metaMalloc, metaFree, pMeta->pVnode->inUse, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
xMalloc = metaMalloc;
xFree = metaFree;
xArg = pMeta->pVnode->inUse;
}
if (tdbBegin(pMeta->pEnv, &pMeta->txn) < 0) {
if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
@ -33,9 +40,9 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
}
// commit the meta txn
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); }
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, &pMeta->txn); }
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, pMeta->txn); }
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, pMeta->txn); }
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); }
// abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); }

View File

@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, pMeta->txn) < 0) {
goto _err;
}
@ -131,17 +131,17 @@ _err:
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0};
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, pMeta->txn);
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {

View File

@ -261,7 +261,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
// drop all child tables
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (rc < 0) {
tdbTbcClose(pCtbIdxc);
@ -295,10 +295,10 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid},
sizeof(STbDbKey), &pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
sizeof(STbDbKey), pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn);
metaULock(pMeta);
@ -321,7 +321,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int32_t ret;
int32_t c = -2;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c) {
tdbTbcClose(pUidIdxc);
@ -340,7 +340,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
oversion = ((SUidIdxVal *)pData)[0].version;
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL);
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(ret == 0 && c == 0);
@ -589,7 +589,7 @@ static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0};
metaBuildTtlIdxKey(&ttlKey, pME);
if (ttlKey.dtime == 0) return 0;
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), &pMeta->txn);
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), pMeta->txn);
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
@ -651,7 +651,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if (metaCreateTagIdxKey(e.ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, uid,
&pTagIdxKey, &nTagIdxKey) == 0) {
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, &pMeta->txn);
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
}
metaDestroyTagIdxKey(pTagIdxKey);
}
@ -661,9 +661,9 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
}
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn);
if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteCtimeIdx(pMeta, &e);
if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e);
@ -671,7 +671,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e);
if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
--pMeta->pVnode->config.vndStats.numOfCTables;
@ -682,7 +682,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
--pMeta->pVnode->config.vndStats.numOfNTables;
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
} else if (e.type == TSDB_SUPER_TABLE) {
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn);
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
// drop schema.db (todo)
metaStatsCacheDrop(pMeta, uid);
@ -702,7 +702,7 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
return 0;
}
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
}
int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -710,14 +710,14 @@ int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
return 0;
}
return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), &pMeta->txn);
return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), pMeta->txn);
}
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
SNcolIdxKey ncolKey = {0};
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
return 0;
}
return tdbTbInsert(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), NULL, 0, pMeta->txn);
}
int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -725,7 +725,7 @@ int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
if (metaBuildNColIdxKey(&ncolKey, pME) < 0) {
return 0;
}
return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), &pMeta->txn);
return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), pMeta->txn);
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) {
@ -760,7 +760,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// search uid index
TBC *pUidIdxc = NULL;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
@ -770,7 +770,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// search table.db
TBC *pTbDbc = NULL;
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -951,7 +951,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// search uid index
TBC *pUidIdxc = NULL;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
@ -964,7 +964,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
SDecoder dc2 = {0};
/* get ctbEntry */
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -1062,7 +1062,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
ASSERT(ctbEntry.ctbEntry.pTags);
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn);
metaULock(pMeta);
@ -1110,7 +1110,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// search uid index
TBC *pUidIdxc = NULL;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
@ -1120,7 +1120,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// search table.db
TBC *pTbDbc = NULL;
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
@ -1227,7 +1227,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, pMeta->txn) < 0) {
goto _err;
}
@ -1250,29 +1250,29 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer};
return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn);
return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), pMeta->txn);
}
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pSuidIdx, &pME->uid, sizeof(tb_uid_t), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pSuidIdx, &pME->uid, sizeof(tb_uid_t), NULL, 0, pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
}
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0};
metaBuildTtlIdxKey(&ttlKey, pME);
if (ttlKey.dtime == 0) return 0;
return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, pMeta->txn);
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), pME->ctbEntry.pTags,
((STag *)(pME->ctbEntry.pTags))->len, &pMeta->txn);
((STag *)(pME->ctbEntry.pTags))->len, pMeta->txn);
}
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
@ -1364,7 +1364,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
ret = -1;
goto end;
}
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
}
end:
metaDestroyTagIdxKey(pTagIdxKey);
@ -1411,7 +1411,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW);
if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, pMeta->txn) < 0) {
rcode = -1;
goto _exit;
}

View File

@ -108,24 +108,22 @@ int32_t tqMetaClose(STQ* pTq) {
}
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
TXN txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
TXN* txn;
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) {
return -1;
}
if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
@ -133,25 +131,22 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
}
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
TXN txn;
TXN* txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}
@ -219,25 +214,22 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0);
}
TXN txn;
TXN* txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) {
ASSERT(0);
}
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}
@ -247,25 +239,22 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
}
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
TXN txn;
TXN* txn;
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) {
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
ASSERT(0);
}

View File

@ -129,7 +129,7 @@ struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
TXN* txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter;
@ -165,11 +167,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
}

View File

@ -129,7 +129,7 @@ struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
TXN* txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter;
@ -165,11 +167,12 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
ASSERT(0);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
}

View File

@ -129,7 +129,7 @@ struct STqSnapWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN txn;
TXN* txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0);
if (tdbBegin(pTq->pMetaStore, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter;
@ -165,11 +167,12 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaStore, pWriter->txn);
ASSERT(0);
} else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
code = tdbCommit(pWriter->pTq->pMetaStore, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
code = tdbPostCommit(pWriter->pTq->pMetaStore, pWriter->txn);
if (code) goto _err;
}

View File

@ -69,8 +69,8 @@ _err:
}
void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn);
tdbPostCommit(pMeta->db, &pMeta->txn);
tdbCommit(pMeta->db, pMeta->txn);
tdbPostCommit(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
@ -115,7 +115,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg,
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
ASSERT(0);
goto FAIL;
@ -152,7 +152,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
tEncodeSStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
ASSERT(0);
return -1;
}
@ -226,7 +226,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn) < 0) {
/*return -1;*/
}
@ -249,42 +249,35 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;
}
int32_t streamMetaAbort(SStreamMeta* pMeta) {
if (tdbAbort(pMeta->db, &pMeta->txn) < 0) {
if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
return -1;
}
memset(&pMeta->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;

View File

@ -163,8 +163,8 @@ _err:
}
void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn);
tdbPostCommit(pState->db, &pState->txn);
tdbCommit(pState->db, pState->txn);
tdbPostCommit(pState->db, pState->txn);
tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb);
@ -175,71 +175,61 @@ void streamStateClose(SStreamState* pState) {
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
tdbTxnClose(&pState->txn);
if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
tdbAbort(pState->db, pState->txn);
return -1;
}
return 0;
}
int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) {
if (tdbCommit(pState->db, pState->txn) < 0) {
return -1;
}
if (tdbPostCommit(pState->db, &pState->txn) < 0) {
if (tdbPostCommit(pState->db, pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;
}
int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) {
if (tdbAbort(pState->db, pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;
}
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->txn);
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn);
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), pState->txn);
}
// todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->txn);
}
// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->txn);
}
// todo refactor
@ -256,7 +246,7 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal
// todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn);
return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), pState->txn);
}
int32_t streamStateClear(SStreamState* pState) {
@ -280,7 +270,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn);
return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), pState->txn);
}
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
@ -512,7 +502,7 @@ void streamFreeVal(void* val) { tdbFree(val); }
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, pState->txn);
}
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
@ -535,7 +525,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->txn);
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {

View File

@ -33,7 +33,8 @@ typedef struct STxn TXN;
// TDB
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback);
int32_t tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags);
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn);

View File

@ -69,7 +69,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, TDB *pEnv,
SBTree **ppBt) {
SBTree *pBt;
int ret;
@ -106,22 +106,26 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
if (pgno == 0) {
// fetch page & insert into main db
SPage *pPage;
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
TXN *txn;
pPager->inTran = 1;
ret = tdbBegin(pEnv, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (ret < 0) {
return -1;
}
SBtreeInitPageArg zArg;
zArg.flags = 0x1 | 0x2; // root leaf node;
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, txn);
if (ret < 0) {
tdbAbort(pEnv, txn);
return -1;
}
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
tdbAbort(pEnv, txn);
return -1;
}
@ -130,18 +134,18 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
pBt->info.nLevel = 1;
pBt->info.nData = 0;
pBt->tbname = (char *)tbname;
// ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), &txn);
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), txn);
if (ret < 0) {
tdbAbort(pEnv, txn);
return -1;
}
}
// tdbUnrefPage(pPage);
tdbPCacheRelease(pPager->pCache, pPage, &txn);
tdbCommit(pPager->pEnv, &txn);
tdbPostCommit(pPager->pEnv, &txn);
tdbTxnClose(&txn);
tdbPCacheRelease(pPager->pCache, pPage, txn);
tdbCommit(pPager->pEnv, txn);
tdbPostCommit(pPager->pEnv, txn);
}
ASSERT(pgno != 0);
@ -1535,10 +1539,21 @@ int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
memset(&pBtc->coder, 0, sizeof(SCellDecoder));
if (pTxn == NULL) {
pBtc->pTxn = &pBtc->txn;
tdbTxnOpen(pBtc->pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn));
if (!pTxn) {
return -1;
}
if (tdbTxnOpen(pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
tdbOsFree(pTxn);
return -1;
}
pBtc->pTxn = pTxn;
pBtc->freeTxn = 1;
} else {
pBtc->pTxn = pTxn;
pBtc->freeTxn = 0;
}
return 0;
@ -2232,6 +2247,10 @@ int tdbBtcClose(SBTC *pBtc) {
tdbFree(pBtc->coder.pVal);
}
if (pBtc->freeTxn) {
tdbTxnClose(pBtc->pTxn);
}
return 0;
}

View File

@ -99,19 +99,34 @@ int tdbClose(TDB *pDb) {
int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); }
int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
int32_t tdbBegin(TDB *pDb, TXN **ppTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags) {
SPager *pPager;
int ret;
int64_t txnId = 1;
TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn));
if (!pTxn) {
return -1;
}
if (tdbTxnOpen(pTxn, txnId, xMalloc, xFree, xArg, flags) < 0) {
tdbOsFree(pTxn);
return -1;
}
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) {
tdbError("failed to begin pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
pTxn->txnId);
tdbTxnClose(pTxn);
return -1;
}
}
*ppTxn = pTxn;
return 0;
}
@ -144,6 +159,8 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) {
}
}
tdbTxnClose(pTxn);
return 0;
}
@ -176,6 +193,8 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
}
}
tdbTxnClose(pTxn);
return 0;
}

View File

@ -108,7 +108,7 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
ASSERT(pPager != NULL);
// pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt));
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, pEnv, &(pTb->pBt));
if (ret < 0) {
tdbOsFree(pTb);
return -1;

View File

@ -28,4 +28,10 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void
return 0;
}
int tdbTxnClose(TXN *pTxn) { return 0; }
int tdbTxnClose(TXN *pTxn) {
if (pTxn) {
tdbOsFree(pTxn);
}
return 0;
}

View File

@ -147,11 +147,11 @@ struct SBTC {
SPage *pgStack[BTREE_MAX_DEPTH + 1];
SCellDecoder coder;
TXN *pTxn;
TXN txn;
i8 freeTxn;
};
// SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, TDB *pEnv,
SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
@ -396,12 +396,12 @@ struct SPager {
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
//SPage *pDirty;
// SPage *pDirty;
hashset_t jPageSet;
SRBTree rbt;
u8 inTran;
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
SRBTree rbt;
u8 inTran;
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB
TDB *pEnv;
#endif

View File

@ -170,11 +170,9 @@ static void insertOfp(void) {
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
TXN *txn = NULL;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
@ -185,12 +183,12 @@ static void insertOfp(void) {
// insert the generated big data
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
// TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
@ -258,11 +256,9 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
@ -271,7 +267,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
generateBigVal(val, valLen);
{ // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -297,7 +293,7 @@ tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_REA
tdbBegin(pEnv, &txn);
*/
{ // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn);
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -316,7 +312,7 @@ tdbBegin(pEnv, &txn);
}
{ // delete the data
ret = tdbTbDelete(pDb, "key1", strlen("key1"), &txn);
ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -335,8 +331,8 @@ tdbBegin(pEnv, &txn);
}
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
// TEST(tdb_test, DISABLED_simple_insert1) {
@ -346,7 +342,7 @@ TEST(tdb_test, simple_insert1) {
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1;
TXN txn;
TXN *txn;
int const pageSize = 4096;
taosRemoveDir("tdb");
@ -365,16 +361,13 @@ TEST(tdb_test, simple_insert1) {
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key0");
@ -393,26 +386,25 @@ TEST(tdb_test, simple_insert1) {
val[i] = c;
}
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
{ // Query the data
void *pVal = NULL;

View File

@ -125,7 +125,7 @@ TEST(tdb_test, DISABLED_simple_insert1) {
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1000000;
TXN txn;
TXN *txn;
taosRemoveDir("tdb");
@ -142,40 +142,35 @@ TEST(tdb_test, DISABLED_simple_insert1) {
char key[64];
char val[64];
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
{ // Query the data
void *pVal = NULL;
@ -245,7 +240,7 @@ TEST(tdb_test, DISABLED_simple_insert2) {
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1000000;
TXN txn;
TXN *txn;
taosRemoveDir("tdb");
@ -261,21 +256,18 @@ TEST(tdb_test, DISABLED_simple_insert2) {
{
char key[64];
char val[64];
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -312,8 +304,8 @@ TEST(tdb_test, DISABLED_simple_insert2) {
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
@ -331,7 +323,7 @@ TEST(tdb_test, DISABLED_simple_delete1) {
TTB *pDb;
char key[128];
char data[128];
TXN txn;
TXN *txn;
TDB *pEnv;
SPoolMem *pPool;
void *pKey = NULL;
@ -353,14 +345,13 @@ TEST(tdb_test, DISABLED_simple_delete1) {
ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// loop to insert batch data
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -378,7 +369,7 @@ TEST(tdb_test, DISABLED_simple_delete1) {
for (int iData = nKV - 1; iData > 30; iData--) {
sprintf(key, "key%d", iData);
ret = tdbTbDelete(pDb, key, strlen(key), &txn);
ret = tdbTbDelete(pDb, key, strlen(key), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -413,7 +404,8 @@ TEST(tdb_test, DISABLED_simple_delete1) {
tdbTbcClose(pDbc);
tdbCommit(pEnv, &txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
@ -430,7 +422,7 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
char data[64];
void *pData = NULL;
SPoolMem *pPool;
TXN txn;
TXN *txn;
taosRemoveDir("tdb");
@ -444,13 +436,12 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
pPool = openPool();
// insert some data
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -467,11 +458,12 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), txn);
GTEST_ASSERT_EQ(ret, 0);
}
tdbCommit(pEnv, &txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// query the data
for (int iData = 0; iData < nData; iData++) {
@ -492,7 +484,7 @@ TEST(tdb_test, multi_thread_query) {
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1000000;
TXN txn;
TXN *txn;
taosRemoveDir("tdb");
@ -508,26 +500,18 @@ TEST(tdb_test, multi_thread_query) {
char key[64];
char val[64];
int64_t poolLimit = 4096 * 20; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
txn.txnId = -1;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
// tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, );
tdbBegin(pEnv, &txn);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -578,7 +562,7 @@ TEST(tdb_test, multi_thread_query) {
std::vector<std::thread> threads;
for (int i = 0; i < nThreads; i++) {
if (i == 0) {
threads.push_back(std::thread(tdbCommit, pEnv, &txn));
threads.push_back(std::thread(tdbCommit, pEnv, txn));
} else {
threads.push_back(std::thread(f, pDb, nData));
}
@ -589,8 +573,8 @@ TEST(tdb_test, multi_thread_query) {
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// Close a database
tdbTbClose(pDb);
@ -621,17 +605,12 @@ TEST(tdb_test, DISABLED_multi_thread1) {
GTEST_ASSERT_EQ(ret, 0);
auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
TXN txn = {0};
TXN *txn = NULL;
char key[128];
char val[128];
SPoolMem *pPool = openPool();
txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
txn.txnId = -1;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
tdbBegin(pDb, &txn);
tdbBegin(pDb, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
@ -644,14 +623,17 @@ TEST(tdb_test, DISABLED_multi_thread1) {
}
if (pPool->size > 1024 * 1024) {
tdbCommit(pDb, &txn);
tdbCommit(pDb, txn);
tdbPostCommit(pDb, txn);
clearPool(pPool);
tdbBegin(pDb, &txn);
tdbBegin(pDb, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
tdbCommit(pDb, &txn);
tdbCommit(pDb, txn);
tdbPostCommit(pDb, txn);
closePool(pPool);
*stop = 1;