diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a985b804be..ca536c5bad 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -103,31 +103,31 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); - TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); - TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); - TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); + TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); return 0; + +END: + tdbAbort(pTq->pMetaDB, txn); + return code; } int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); - TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); - TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); - TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn)); + TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); return 0; -} - -static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) { - int32_t code = TDB_CODE_SUCCESS; - TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path)); END: + tdbAbort(pTq->pMetaDB, txn); return code; } @@ -408,7 +408,8 @@ END: } int32_t tqMetaOpen(STQ* pTq) { - char* maindb = NULL; + char* maindb = NULL; + char* offsetNew = NULL; int32_t code = TDB_CODE_SUCCESS; TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); if(!taosCheckExistFile(maindb)){ @@ -416,12 +417,20 @@ int32_t tqMetaOpen(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); }else{ TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); - taosRemoveFile(maindb); + (void)taosRemoveFile(maindb); } + + TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); + if(taosCheckExistFile(offsetNew)){ + TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew)); + (void)taosRemoveFile(offsetNew); + } + TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); END: taosMemoryFree(maindb); + taosMemoryFree(offsetNew); return code; } @@ -445,14 +454,14 @@ int32_t tqMetaTransform(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); - if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ - tqError("copy offset file error"); + if(taosCheckExistFile(offset)) { + if (taosCopyFile(offset, offsetNew) < 0) { + tqError("copy offset file error"); + } else { + (void)taosRemoveFile(offset); + } } - TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew)); - (void)taosRemoveFile(offset); - (void)taosRemoveFile(offsetNew); - END: taosMemoryFree(offset); taosMemoryFree(offsetNew); diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index f9faf611e1..63257d7fab 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -73,6 +73,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { } TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); + tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey); taosMemoryFree(pMemBuf); pMemBuf = NULL; }