diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index e270228a30..89350e761f 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -127,7 +127,6 @@ int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) { goto END; } - buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { code = terrno; @@ -152,7 +151,8 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END( + tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); @@ -168,7 +168,8 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END( + tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn)); TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); @@ -180,7 +181,7 @@ END: return code; } -int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){ +int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset) { void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); if (data == NULL) { int vLen = 0; @@ -203,7 +204,7 @@ int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset){ tdbFree(data); *pOffset = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); - if(*pOffset == NULL){ + if (*pOffset == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } } else { @@ -266,8 +267,8 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) { initStorageAPI(&reader.api); if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle->execHandle.task = - qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); + handle->execHandle.task = qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, + &handle->execHandle.numOfCols, handle->consumerId); TQ_NULL_GO_TO_END(handle->execHandle.task); void* scanner = NULL; qExtractStreamScanner(handle->execHandle.task, &scanner); @@ -280,20 +281,21 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) { handle->execHandle.pTqReader = tqReaderOpen(pVnode); TQ_NULL_GO_TO_END(handle->execHandle.pTqReader); TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta, - (SSnapContext**)(&reader.sContext))); + (SSnapContext**)(&reader.sContext))); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); TQ_NULL_GO_TO_END(handle->execHandle.task); } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); TQ_NULL_GO_TO_END(handle->pWalReader); - if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { + if (handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s", terrstr()); return TSDB_CODE_SCH_INTERNAL_ERROR; } } - TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, - handle->fetchMeta, (SSnapContext**)(&reader.sContext))); + TQ_ERR_GO_TO_END(buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, + handle->execHandle.subType, handle->fetchMeta, + (SSnapContext**)(&reader.sContext))); handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId); TQ_NULL_GO_TO_END(handle->execHandle.task); SArray* tbUidList = NULL; @@ -341,7 +343,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) { handle->execHandle.subType = req->subType; handle->fetchMeta = req->withMeta; if (req->subType == TOPIC_SUB_TYPE__COLUMN) { - void *tmp = taosStrdup(req->qmsg); + void* tmp = taosStrdup(req->qmsg); if (tmp == NULL) { return terrno; } @@ -349,12 +351,12 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) { } else if (req->subType == TOPIC_SUB_TYPE__DB) { handle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if(handle->execHandle.execDb.pFilterOutTbUid == NULL){ + if (handle->execHandle.execDb.pFilterOutTbUid == NULL) { return terrno; } - }else if(req->subType == TOPIC_SUB_TYPE__TABLE){ + } else if (req->subType == TOPIC_SUB_TYPE__TABLE) { handle->execHandle.execTb.suid = req->suid; - void *tmp = taosStrdup(req->qmsg); + void* tmp = taosStrdup(req->qmsg); if (tmp == NULL) { return terrno; } @@ -364,7 +366,7 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) { handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); int32_t code = tqMetaInitHandle(pTq, handle); - if (code != 0){ + if (code != 0) { return code; } tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, @@ -437,10 +439,10 @@ END: return code; } -static int32_t replaceTqPath(char** path){ - char* tpath = NULL; +static int32_t replaceTqPath(char** path) { + char* tpath = NULL; int32_t code = tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME); - if (code != 0){ + if (code != 0) { return code; } taosMemoryFree(*path); @@ -475,7 +477,7 @@ END: } int32_t tqMetaOpen(STQ* pTq) { - char* maindb = NULL; + char* maindb = NULL; char* offsetNew = NULL; int32_t code = TDB_CODE_SUCCESS; TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); @@ -488,7 +490,7 @@ int32_t tqMetaOpen(STQ* pTq) { } TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); - if(taosCheckExistFile(offsetNew)){ + if (taosCheckExistFile(offsetNew)) { TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew)); TQ_ERR_GO_TO_END(taosRemoveFile(offsetNew)); } @@ -522,7 +524,7 @@ int32_t tqMetaTransform(STQ* pTq) { TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); - if(taosCheckExistFile(offset)) { + if (taosCheckExistFile(offset)) { if (taosCopyFile(offset, offsetNew) < 0) { tqError("copy offset file error"); } else { @@ -551,8 +553,5 @@ void tqMetaClose(STQ* pTq) { if (pTq->pOffsetStore) { tdbTbClose(pTq->pOffsetStore); } - ret = tdbClose(pTq->pMetaDB); - if (ret != 0) { - tqError("failed to close tdb, ret:%d", ret); - } + tdbClose(pTq->pMetaDB); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fbea455bd5..44c9e76906 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -483,10 +483,7 @@ _err: tdbTbClose(pMeta->pCheckpointDb); } if (pMeta->db) { - int32_t ret = tdbClose(pMeta->db); - if (ret) { - stError("vgId:%d tdb failed close meta db, code:%s", pMeta->vgId, tstrerror(ret)); - } + tdbClose(pMeta->db); } if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); @@ -592,18 +589,9 @@ void streamMetaCloseImpl(void* arg) { // already log the error, ignore here tdbAbort(pMeta->db, pMeta->txn); - code = tdbTbClose(pMeta->pTaskDb); - if (code) { - stError("vgId:%d failed to close taskDb, code:%s", vgId, tstrerror(code)); - } - code = tdbTbClose(pMeta->pCheckpointDb); - if (code) { - stError("vgId:%d failed to close checkpointDb, code:%s", vgId, tstrerror(code)); - } - code = tdbClose(pMeta->db); - if (code) { - stError("vgId:%d failed to close db, code:%s", vgId, tstrerror(code)); - } + tdbTbClose(pMeta->pTaskDb); + tdbTbClose(pMeta->pCheckpointDb); + tdbClose(pMeta->db); taosArrayDestroy(pMeta->pTaskList); taosArrayDestroy(pMeta->chkpSaved); diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 41a5545122..52ff749191 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -34,7 +34,7 @@ typedef struct STxn TXN; // TDB int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback, int32_t encryptAlgorithm, char *encryptKey); -int32_t tdbClose(TDB *pDb); +void tdbClose(TDB *pDb); int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, int flags); int32_t tdbCommit(TDB *pDb, TXN *pTxn); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index eb2603bdd8..02ab997f69 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -90,7 +90,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i return 0; } -int tdbClose(TDB *pDb) { +void tdbClose(TDB *pDb) { SPager *pPager; if (pDb) { @@ -109,7 +109,7 @@ int tdbClose(TDB *pDb) { tdbOsFree(pDb); } - return 0; + return; } int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); }