This commit is contained in:
Hongze Cheng 2024-09-24 08:37:15 +08:00
parent 22b58d03de
commit 370a4e9c63
4 changed files with 32 additions and 45 deletions

View File

@ -127,7 +127,6 @@ int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
goto END;
}
buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
code = terrno;
@ -152,7 +151,8 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
@ -168,7 +168,8 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn));
@ -180,7 +181,7 @@ END:
return code;
}
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) {
void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if (data == NULL) {
int vLen = 0;
@ -203,7 +204,7 @@ int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){
tdbFree(data);
*pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if(*pOffset == NULL){
if (*pOffset == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
@ -266,8 +267,8 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
initStorageAPI(&reader.api);
if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle->execHandle.task =
qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId);
handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId,
&handle->execHandle.numOfCols, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task);
void* scanner = NULL;
qExtractStreamScanner(handle->execHandle.task, &scanner);
@ -280,20 +281,21 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
TQ_NULL_GO_TO_END(handle->execHandle.pTqReader);
TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext)));
(SSnapContext**)(&reader.sContext)));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task);
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
TQ_NULL_GO_TO_END(handle->pWalReader);
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
}
TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType,
handle->fetchMeta, (SSnapContext**)(&reader.sContext)));
TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid,
handle->execHandle.subType, handle->fetchMeta,
(SSnapContext**)(&reader.sContext)));
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
TQ_NULL_GO_TO_END(handle->execHandle.task);
SArray* tbUidList = NULL;
@ -341,7 +343,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
handle->execHandle.subType = req->subType;
handle->fetchMeta = req->withMeta;
if (req->subType == TOPIC_SUB_TYPE__COLUMN) {
void *tmp = taosStrdup(req->qmsg);
void* tmp = taosStrdup(req->qmsg);
if (tmp == NULL) {
return terrno;
}
@ -349,12 +351,12 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
} else if (req->subType == TOPIC_SUB_TYPE__DB) {
handle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if(handle->execHandle.execDb.pFilterOutTbUid == NULL){
if (handle->execHandle.execDb.pFilterOutTbUid == NULL) {
return terrno;
}
}else if(req->subType == TOPIC_SUB_TYPE__TABLE){
} else if (req->subType == TOPIC_SUB_TYPE__TABLE) {
handle->execHandle.execTb.suid = req->suid;
void *tmp = taosStrdup(req->qmsg);
void* tmp = taosStrdup(req->qmsg);
if (tmp == NULL) {
return terrno;
}
@ -364,7 +366,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
int32_t code = tqMetaInitHandle(pTq, handle);
if (code != 0){
if (code != 0) {
return code;
}
tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
@ -437,10 +439,10 @@ END:
return code;
}
static int32_t replaceTqPath(char** path){
char* tpath = NULL;
static int32_t replaceTqPath(char** path) {
char* tpath = NULL;
int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME);
if (code != 0){
if (code != 0) {
return code;
}
taosMemoryFree(*path);
@ -475,7 +477,7 @@ END:
}
int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL;
char* maindb = NULL;
char* offsetNew = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
@ -488,7 +490,7 @@ int32_t tqMetaOpen(STQ* pTq) {
}
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offsetNew)){
if (taosCheckExistFile(offsetNew)) {
TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew));
TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew));
}
@ -522,7 +524,7 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset)) {
if (taosCheckExistFile(offset)) {
if (taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error");
} else {
@ -551,8 +553,5 @@ void tqMetaClose(STQ* pTq) {
if (pTq->pOffsetStore) {
tdbTbClose(pTq->pOffsetStore);
}
ret = tdbClose(pTq->pMetaDB);
if (ret != 0) {
tqError("failed to close tdb, ret:%d", ret);
}
tdbClose(pTq->pMetaDB);
}

View File

@ -483,10 +483,7 @@ _err:
tdbTbClose(pMeta->pCheckpointDb);
}
if (pMeta->db) {
int32_t ret = tdbClose(pMeta->db);
if (ret) {
stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret));
}
tdbClose(pMeta->db);
}
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
@ -592,18 +589,9 @@ void streamMetaCloseImpl(void* arg) {
// already log the error, ignore here
tdbAbort(pMeta->db, pMeta->txn);
code = tdbTbClose(pMeta->pTaskDb);
if (code) {
stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code));
}
code = tdbTbClose(pMeta->pCheckpointDb);
if (code) {
stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code));
}
code = tdbClose(pMeta->db);
if (code) {
stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code));
}
tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
taosArrayDestroy(pMeta->pTaskList);
taosArrayDestroy(pMeta->chkpSaved);

View File

@ -34,7 +34,7 @@ typedef struct STxn TXN;
// TDB
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback, int32_t encryptAlgorithm,
char *encryptKey);
int32_t tdbClose(TDB *pDb);
void tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg,
int flags);
int32_t tdbCommit(TDB *pDb, TXN *pTxn);

View File

@ -90,7 +90,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
return 0;
}
int tdbClose(TDB *pDb) {
void tdbClose(TDB *pDb) {
SPager *pPager;
if (pDb) {
@ -109,7 +109,7 @@ int tdbClose(TDB *pDb) {
tdbOsFree(pDb);
}
return 0;
return;
}
int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); }