set terrno with taos_return

This commit is contained in:
Minglei Jin 2024-07-22 08:51:30 +08:00
parent 383e90e43f
commit b0b9dd7d5f
2 changed files with 64 additions and 59 deletions

View File

@ -75,7 +75,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ int32_t tqMetaDecodeCheckInfo(STqCheckInfo* info, void* pVal, int32_t vLen) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
int32_t code = tDecodeSTqCheckInfo(&decoder, info); int32_t code = tDecodeSTqCheckInfo(&decoder, info);
@ -87,7 +87,7 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){
return code; return code;
} }
int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen){ int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, int32_t vLen) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
int32_t code = tDecodeSTqOffset(&decoder, info); int32_t code = tDecodeSTqOffset(&decoder, info);
@ -103,7 +103,8 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_RETURN(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
@ -115,7 +116,8 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL; TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_RETURN(
tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
@ -131,7 +133,7 @@ END:
return code; return code;
} }
void* tqMetaGetOffset(STQ* pTq, const char* subkey){ void* tqMetaGetOffset(STQ* pTq, const char* subkey) {
void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if (data == NULL) { if (data == NULL) {
int vLen = 0; int vLen = 0;
@ -146,7 +148,7 @@ void* tqMetaGetOffset(STQ* pTq, const char* subkey){
return NULL; return NULL;
} }
if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0){ if (taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0) {
tDeleteSTqOffset(&offset); tDeleteSTqOffset(&offset);
tdbFree(data); tdbFree(data);
return NULL; return NULL;
@ -192,8 +194,7 @@ END:
return code; return code;
} }
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle) {
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
SVnode* pVnode = pTq->pVnode; SVnode* pVnode = pTq->pVnode;
@ -201,7 +202,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){
handle->pRef = walOpenRef(pVnode->pWal); handle->pRef = walOpenRef(pVnode->pWal);
if (handle->pRef == NULL) { if (handle->pRef == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
TQ_ERR_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer)); TQ_ERR_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer));
@ -269,7 +270,7 @@ static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){
return 0; return 0;
} }
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
@ -285,7 +286,7 @@ END:
return code; return code;
} }
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -305,14 +306,15 @@ int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
if(tqMetaInitHandle(pTq, handle) < 0){ if (tqMetaInitHandle(pTq, handle) < 0) {
return -1; return -1;
} }
tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey,
handle->consumerId, vgId, handle->snapshotVer);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew){ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew) {
TBC* pCur = NULL; TBC* pCur = NULL;
void* pKey = NULL; void* pKey = NULL;
int kLen = 0; int kLen = 0;
@ -323,15 +325,16 @@ static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pOld, TTB* pNew){
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL)); TQ_ERR_GO_TO_END(tdbTbcOpen(pOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); TQ_ERR_GO_TO_END(
tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
TQ_ERR_GO_TO_END (tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn)); TQ_ERR_GO_TO_END(tdbTbUpsert(pNew, pKey, kLen, pVal, vLen, txn));
} }
TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbCommit(pMetaDB, txn));
TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn)); TQ_ERR_GO_TO_END(tdbPostCommit(pMetaDB, txn));
END: END:
tdbFree(pKey); tdbFree(pKey);
@ -342,24 +345,24 @@ END:
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) { int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
void* data = taosHashGet(pTq->pHandle, key, strlen(key)); void* data = taosHashGet(pTq->pHandle, key, strlen(key));
if(data == NULL){ if (data == NULL) {
int vLen = 0; int vLen = 0;
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) { if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
tdbFree(data); tdbFree(data);
return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
} }
STqHandle handle = {0}; STqHandle handle = {0};
if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){ if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0) {
tdbFree(data); tdbFree(data);
tqDestroyTqHandle(&handle); tqDestroyTqHandle(&handle);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
tdbFree(data); tdbFree(data);
*pHandle = taosHashGet(pTq->pHandle, key, strlen(key)); *pHandle = taosHashGet(pTq->pHandle, key, strlen(key));
if(*pHandle == NULL){ if (*pHandle == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
}else{ } else {
*pHandle = data; *pHandle = data;
} }
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
@ -376,7 +379,7 @@ END:
return code; return code;
} }
static int32_t replaceTqPath(char** path){ static int32_t replaceTqPath(char** path) {
char* tpath = NULL; char* tpath = NULL;
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_RETURN(tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME)); TQ_ERR_RETURN(tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME));
@ -415,10 +418,10 @@ int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL; char* maindb = NULL;
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
if(!taosCheckExistFile(maindb)){ if (!taosCheckExistFile(maindb)) {
TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path)); TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
}else{ } else {
TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
taosRemoveFile(maindb); taosRemoveFile(maindb);
} }
@ -449,7 +452,7 @@ int32_t tqMetaTransform(STQ* pTq) {
TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ if (taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0) {
tqError("copy offset file error"); tqError("copy offset file error");
} }
@ -461,7 +464,7 @@ END:
taosMemoryFree(offset); taosMemoryFree(offset);
taosMemoryFree(offsetNew); taosMemoryFree(offsetNew);
//return 0 always, so ignore // return 0 always, so ignore
(void)tdbTbClose(pExecStore); (void)tdbTbClose(pExecStore);
(void)tdbTbClose(pCheckStore); (void)tdbTbClose(pCheckStore);
(void)tdbClose(pMetaDB); (void)tdbClose(pMetaDB);

View File

@ -22,6 +22,7 @@
SWalRef *walOpenRef(SWal *pWal) { SWalRef *walOpenRef(SWal *pWal) {
SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef)); SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef));
if (pRef == NULL) { if (pRef == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -29,6 +30,7 @@ SWalRef *walOpenRef(SWal *pWal) {
if (taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *))) { if (taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *))) {
taosMemoryFree(pRef); taosMemoryFree(pRef);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }