Merge pull request #26690 from taosdata/opti/TS-4593-3.0

opti:[TS-4593] transform offset from file to tdb in tmq
This commit is contained in:
dapan1121 2024-07-22 09:06:54 +08:00 committed by GitHub
commit 963b3d1b3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 31 additions and 21 deletions

View File

@ -103,31 +103,31 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
return 0;
END:
tdbAbort(pTq->pMetaDB, txn);
return code;
}
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
return 0;
}
static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) {
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path));
END:
tdbAbort(pTq->pMetaDB, txn);
return code;
}
@ -408,7 +408,8 @@ END:
}
int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL;
char* maindb = NULL;
char* offsetNew = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
if(!taosCheckExistFile(maindb)){
@ -416,12 +417,20 @@ int32_t tqMetaOpen(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
}else{
TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
taosRemoveFile(maindb);
(void)taosRemoveFile(maindb);
}
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
(void)taosRemoveFile(offsetNew);
}
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
END:
taosMemoryFree(maindb);
taosMemoryFree(offsetNew);
return code;
}
@ -445,14 +454,14 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){
tqError("copy offset file error");
if(taosCheckExistFile(offset)) {
if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error");
} else {
(void)taosRemoveFile(offset);
}
}
TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew));
(void)taosRemoveFile(offset);
(void)taosRemoveFile(offsetNew);
END:
taosMemoryFree(offset);
taosMemoryFree(offsetNew);

View File

@ -73,6 +73,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
}
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size));
tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey);
taosMemoryFree(pMemBuf);
pMemBuf = NULL;
}