opti:[TS-4593] transform offset from file to tdb in tmq

This commit is contained in:
wangmm0220 2024-07-19 16:19:12 +08:00
parent 61b46b2e98
commit 21c30fe6ff
2 changed files with 18 additions and 16 deletions

View File

@ -123,14 +123,6 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
return 0; return 0;
} }
static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) {
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path));
END:
return code;
}
void* tqMetaGetOffset(STQ* pTq, const char* subkey){ void* tqMetaGetOffset(STQ* pTq, const char* subkey){
void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if (data == NULL) { if (data == NULL) {
@ -409,6 +401,7 @@ END:
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL; char* maindb = NULL;
char* offsetNew = NULL;
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
if(!taosCheckExistFile(maindb)){ if(!taosCheckExistFile(maindb)){
@ -416,12 +409,20 @@ int32_t tqMetaOpen(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
}else{ }else{
TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
taosRemoveFile(maindb); (void)taosRemoveFile(maindb);
} }
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
(void)taosRemoveFile(offsetNew);
}
TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq));
END: END:
taosMemoryFree(maindb); taosMemoryFree(maindb);
taosMemoryFree(offsetNew);
return code; return code;
} }
@ -445,13 +446,13 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ if(taosCheckExistFile(offset)) {
if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error"); tqError("copy offset file error");
} } else {
TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew));
(void)taosRemoveFile(offset); (void)taosRemoveFile(offset);
(void)taosRemoveFile(offsetNew); }
}
END: END:
taosMemoryFree(offset); taosMemoryFree(offset);

View File

@ -73,6 +73,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
} }
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size));
tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey);
taosMemoryFree(pMemBuf); taosMemoryFree(pMemBuf);
pMemBuf = NULL; pMemBuf = NULL;
} }