diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 4683b70f73..d2de7ca841 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -75,8 +75,8 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } -int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ - SDecoder decoder = {0}; +int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, int32_t vLen) { + SDecoder decoder = {0}; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); int32_t code = tDecodeSTqCheckInfo(&decoder, info); if (code != 0) { @@ -87,8 +87,8 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ return code; } -int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen){ - SDecoder decoder = {0}; +int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, int32_t vLen) { + SDecoder decoder = {0}; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); int32_t code = tDecodeSTqOffset(&decoder, info); if (code != 0) { @@ -101,9 +101,10 @@ int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen){ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) { int32_t code = TDB_CODE_SUCCESS; - TXN* txn = NULL; + TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_RETURN( + tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); @@ -113,9 +114,10 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { int32_t code = TDB_CODE_SUCCESS; - TXN* txn = NULL; + TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_RETURN( + tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); @@ -131,10 +133,10 @@ END: return code; } -void* tqMetaGetOffset(STQ* pTq, const char* subkey){ +void* tqMetaGetOffset(STQ* pTq, const char* subkey) { void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); if (data == NULL) { - int vLen = 0; + int vLen = 0; if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) { tdbFree(data); return NULL; @@ -146,7 +148,7 @@ void* tqMetaGetOffset(STQ* pTq, const char* subkey){ return NULL; } - if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0){ + if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) { tDeleteSTqOffset(&offset); tdbFree(data); return NULL; @@ -160,9 +162,9 @@ void* tqMetaGetOffset(STQ* pTq, const char* subkey){ } int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code = TDB_CODE_SUCCESS; - int32_t vlen; - void* buf = NULL; + int32_t code = TDB_CODE_SUCCESS; + int32_t vlen; + void* buf = NULL; SEncoder encoder; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); if (code < 0) { @@ -192,8 +194,7 @@ END: return code; } - -static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){ +static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) { int32_t code = TDB_CODE_SUCCESS; SVnode* pVnode = pTq->pVnode; @@ -201,7 +202,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){ handle->pRef = walOpenRef(pVnode->pWal); if (handle->pRef == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } TQ_ERR_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer)); @@ -269,10 +270,10 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){ return 0; } -static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ +static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle) { int32_t vgId = TD_VID(pTq->pVnode); SDecoder decoder = {0}; - int32_t code = TDB_CODE_SUCCESS; + int32_t code = TDB_CODE_SUCCESS; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle)); @@ -285,8 +286,8 @@ END: return code; } -int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ - int32_t vgId = TD_VID(pTq->pVnode); +int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) { + int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); handle->consumerId = req->newConsumerId; @@ -305,33 +306,35 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); - if(tqMetaInitHandle(pTq, handle) < 0){ + if (tqMetaInitHandle(pTq, handle) < 0) { return -1; } - tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); + tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, + handle->consumerId, vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew){ - TBC* pCur = NULL; - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - TXN* txn = NULL; +static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) { + TBC* pCur = NULL; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + TXN* txn = NULL; - int32_t code = TDB_CODE_SUCCESS; + int32_t code = TDB_CODE_SUCCESS; TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL)); - TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END( + tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - TQ_ERR_GO_TO_END (tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn)); + TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn)); } - TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); - TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn)); END: tdbFree(pKey); @@ -342,24 +345,24 @@ END: int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) { void* data = taosHashGet(pTq->pHandle, key, strlen(key)); - if(data == NULL){ - int vLen = 0; + if (data == NULL) { + int vLen = 0; if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) { tdbFree(data); return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } STqHandle handle = {0}; - if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){ + if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0) { tdbFree(data); tqDestroyTqHandle(&handle); return TSDB_CODE_OUT_OF_MEMORY; } tdbFree(data); *pHandle = taosHashGet(pTq->pHandle, key, strlen(key)); - if(*pHandle == NULL){ + if (*pHandle == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - }else{ + } else { *pHandle = data; } return TDB_CODE_SUCCESS; @@ -376,8 +379,8 @@ END: return code; } -static int32_t replaceTqPath(char** path){ - char* tpath = NULL; +static int32_t replaceTqPath(char** path) { + char* tpath = NULL; int32_t code = TDB_CODE_SUCCESS; TQ_ERR_RETURN(tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME)); taosMemoryFree(*path); @@ -386,12 +389,12 @@ static int32_t replaceTqPath(char** path){ } static int32_t tqMetaRestoreCheckInfo(STQ* pTq) { - TBC* pCur = NULL; - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - int32_t code = 0; + TBC* pCur = NULL; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + int32_t code = 0; STqCheckInfo info = {0}; TQ_ERR_GO_TO_END(tdbTbcOpen(pTq->pCheckStore, &pCur, NULL)); @@ -415,10 +418,10 @@ int32_t tqMetaOpen(STQ* pTq) { char* maindb = NULL; int32_t code = TDB_CODE_SUCCESS; TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); - if(!taosCheckExistFile(maindb)){ + if (!taosCheckExistFile(maindb)) { TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path)); TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); - }else{ + } else { TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); taosRemoveFile(maindb); } @@ -430,12 +433,12 @@ END: } int32_t tqMetaTransform(STQ* pTq) { - int32_t code = TDB_CODE_SUCCESS; - TDB* pMetaDB = NULL; - TTB* pExecStore = NULL; - TTB* pCheckStore = NULL; - char* offsetNew = NULL; - char* offset = NULL; + int32_t code = TDB_CODE_SUCCESS; + TDB* pMetaDB = NULL; + TTB* pExecStore = NULL; + TTB* pCheckStore = NULL; + char* offsetNew = NULL; + char* offset = NULL; TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL)); @@ -449,7 +452,7 @@ int32_t tqMetaTransform(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); - if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ + if (taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0) { tqError("copy offset file error"); } @@ -461,7 +464,7 @@ END: taosMemoryFree(offset); taosMemoryFree(offsetNew); - //return 0 always, so ignore + // return 0 always, so ignore (void)tdbTbClose(pExecStore); (void)tdbTbClose(pCheckStore); (void)tdbClose(pMetaDB); @@ -481,4 +484,4 @@ int32_t tqMetaClose(STQ* pTq) { } (void)tdbClose(pTq->pMetaDB); return 0; -} \ No newline at end of file +} diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 26447f60d9..ecca876c0d 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -22,6 +22,7 @@ SWalRef *walOpenRef(SWal *pWal) { SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef)); if (pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -29,6 +30,7 @@ SWalRef *walOpenRef(SWal *pWal) { if (taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *))) { taosMemoryFree(pRef); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; }