diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7e89753241..60d794c85c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9297,12 +9297,12 @@ int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) { int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; - int32_t sz; + int32_t sz = 0; if (tDecodeI32(pDecoder, &sz) < 0) return -1; pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t)); if (pInfo->colIdList == NULL) return -1; for (int32_t i = 0; i < sz; i++) { - int16_t colId; + int16_t colId = 0; if (tDecodeI16(pDecoder, &colId) < 0) return -1; taosArrayPush(pInfo->colIdList, &colId); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bcb38a3902..b8cb1d00d0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -725,19 +725,19 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO continue; } - buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + buf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(pTopic->ntbUid)); if (buf == NULL){ code = TSDB_CODE_OUT_OF_MEMORY; goto end; } void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); - memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + *(int64_t*)abuf = pTopic->ntbUid; STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; - action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.contLen = sizeof(SMsgHead) + sizeof(pTopic->ntbUid); action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO; code = mndTransAppendRedoAction(pTrans, &action); if (code != 0) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index d776271131..400725b041 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -68,11 +68,8 @@ set( "src/tq/tqOffset.c" "src/tq/tqPush.c" "src/tq/tqSink.c" - "src/tq/tqCommit.c" "src/tq/tqStreamTask.c" - "src/tq/tqHandleSnapshot.c" - "src/tq/tqCheckInfoSnapshot.c" - "src/tq/tqOffsetSnapshot.c" + "src/tq/tqSnapshot.c" "src/tq/tqStreamStateSnap.c" "src/tq/tqStreamTaskSnap.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 651fe2cda4..d97a953d94 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -41,8 +41,6 @@ extern "C" { #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -typedef struct STqOffsetStore STqOffsetStore; - #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) // tqExec @@ -101,10 +99,11 @@ struct STQ { SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo - STqOffsetStore* pOffsetStore; + SHashObj* pOffset; // subKey -> STqOffsetVal TDB* pMetaDB; TTB* pExecStore; TTB* pCheckStore; + TTB* pOffsetStore; SStreamMeta* pStreamMeta; }; @@ -128,29 +127,22 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); -int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); -int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); -int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); -int32_t tqMetaRestoreCheckInfo(STQ* pTq); -int32_t tqMetaGetHandle(STQ* pTq, const char* key); -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); - -STqOffsetStore* tqOffsetOpen(STQ* pTq); -int32_t tqMetaTransform(STQ* pTq); -void tqOffsetClose(STqOffsetStore*); -STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); -int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetCommitFile(STqOffsetStore* pStore); - +int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen); +int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen); +int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); +int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen); +int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle); +void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid); +void* tqMetaGetOffset(STQ* pTq, const char* subkey); +int32_t tqMetaTransform(STQ* pTq); // tqSink int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr, bool newSubTableRule); void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); // tqOffset -char* tqOffsetBuildFName(const char* path, int32_t fVer); -int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); +int32_t tqBuildFName(char** data, const char* path, char* name); +int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name); // tq util int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); @@ -166,6 +158,25 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule); +#define TQ_ERR_GO_TO_END(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + goto END; \ + } \ + } while (0) + +#define TQ_ERR_RETURN(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + return code; \ + } \ + } while (0) + +#define TQ_SUBSCRIBE_NAME "subscribe" +#define TQ_OFFSET_NAME "offset-ver0" + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4a47e08730..a7f87300b5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -71,10 +71,6 @@ typedef struct STsdbSnapRAWReader STsdbSnapRAWReader; typedef struct STsdbSnapRAWWriter STsdbSnapRAWWriter; typedef struct STqSnapReader STqSnapReader; typedef struct STqSnapWriter STqSnapWriter; -typedef struct STqOffsetReader STqOffsetReader; -typedef struct STqOffsetWriter STqOffsetWriter; -typedef struct STqCheckInfoReader STqCheckInfoReader; -typedef struct STqCheckInfoWriter STqCheckInfoWriter; typedef struct SStreamTaskReader SStreamTaskReader; typedef struct SStreamTaskWriter SStreamTaskWriter; typedef struct SStreamStateReader SStreamStateReader; @@ -268,7 +264,6 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); -int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); // tq-mq @@ -351,29 +346,17 @@ int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter); int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback); // STqSnapshotReader == -int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader); +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader); int32_t tqSnapReaderClose(STqSnapReader** ppReader); int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData); // STqSnapshotWriter ====================================== int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter); int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback); -int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -// STqCheckInfoshotReader == -int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader); -int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader); -int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData); -// STqCheckInfoshotWriter ====================================== -int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter); -int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback); -int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData); -// STqOffsetReader ======================================== -int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader); -int32_t tqOffsetReaderClose(STqOffsetReader** ppReader); -int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData); -// STqOffsetWriter ======================================== -int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter); -int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback); -int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); + +int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData); + // SStreamTaskWriter ====================================== int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ac57a003c5..bb64a0ccf6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -76,9 +76,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosInitRWLatch(&pTq->lock); pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); + pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK); + int32_t code = tqInitialize(pTq); if (code != TSDB_CODE_SUCCESS) { tqClose(pTq); @@ -98,18 +100,10 @@ int32_t tqInitialize(STQ* pTq) { streamMetaLoadAllTasks(pTq->pStreamMeta); - if (tqMetaTransform(pTq) < 0) { + if (tqMetaOpen(pTq) < 0) { return -1; } - if (tqMetaRestoreCheckInfo(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } return 0; } @@ -133,10 +127,10 @@ void tqClose(STQ* pTq) { pIter = taosHashIterate(pTq->pPushMgr, pIter); } - tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->pHandle); taosHashCleanup(pTq->pPushMgr); taosHashCleanup(pTq->pCheckInfo); + taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); @@ -221,7 +215,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t goto end; } - STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + STqOffset* pSavedOffset = (STqOffset*)tqMetaGetOffset(pTq, pOffset->subKey); if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) { tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); @@ -229,10 +223,14 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } // save the new offset value - code = tqOffsetWrite(pTq->pOffsetStore, pOffset); - if(code != 0) { - code = TSDB_CODE_INVALID_MSG; - goto end; + if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } return 0; @@ -284,25 +282,16 @@ end: } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { - void* pIter = NULL; + STqCheckInfo* pCheck = tqMetaGetCheckInfo(pTq, tbUid); + if(pCheck == NULL) { + return 0; + } - while (1) { - pIter = taosHashIterate(pTq->pCheckInfo, pIter); - if (pIter == NULL) { - break; - } - - STqCheckInfo* pCheck = (STqCheckInfo*)pIter; - - if (pCheck->ntbUid == tbUid) { - int32_t sz = taosArrayGetSize(pCheck->colIdList); - for (int32_t i = 0; i < sz; i++) { - int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); - if (forbidColId == colId) { - taosHashCancelIterate(pTq->pCheckInfo, pIter); - return -1; - } - } + int32_t sz = taosArrayGetSize(pCheck->colIdList); + for (int32_t i = 0; i < sz; i++) { + int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); + if (forbidColId == colId) { + return -1; } } @@ -360,21 +349,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { while (1) { taosWLockLatch(&pTq->lock); // 1. find handle - pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle == NULL) { - do { - if (tqMetaGetHandle(pTq, req.subKey) == 0) { - pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle != NULL) { - break; - } - } - tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_MSG; - taosWUnLockLatch(&pTq->lock); - code = -1; - goto END; - } while (0); + code = tqMetaGetHandle(pTq, req.subKey, &pHandle); + if (code != TDB_CODE_SUCCESS) { + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_MSG; + taosWUnLockLatch(&pTq->lock); + return -1; } // 2. check rebalance status @@ -443,7 +423,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); - STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, vgOffset.offset.subKey); + STqOffset* pSavedOffset = (STqOffset*)tqMetaGetOffset(pTq, vgOffset.offset.subKey); if (pSavedOffset == NULL) { terrno = TSDB_CODE_TMQ_NO_COMMITTED; return terrno; @@ -523,7 +503,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { dataRsp.common.rspOffset.version = reqOffset.version; } else if (reqOffset.type < 0) { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); + STqOffset* pOffset = (STqOffset*)(STqOffset*)tqMetaGetOffset(pTq, req.subKey); if (pOffset != NULL) { if (pOffset->val.type != TMQ_OFFSET__LOG) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey); @@ -590,12 +570,14 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); - if (code != 0) { - tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey); + if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey) != 0)) { + tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey); + } + if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) { + tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey); } - if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { + if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) { tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey); } taosWUnLockLatch(&pTq->lock); @@ -603,20 +585,39 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg return 0; } +void mergeTwoArray(SArray* a, SArray* b){ + size_t bLen = taosArrayGetSize(b); + for(int i = 0; i < bLen; i++){ + void* dataB = taosArrayGet(b, i); + size_t aLen = taosArrayGetSize(a); + int j = 0; + for(; j < aLen; j++){ + void* dataA = taosArrayGet(a, i); + if(*(int64_t*)dataA == *(int64_t*)dataB){ + break; + } + } + if(j == aLen){ + taosArrayPush(a, dataB); + } + } +} + int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqCheckInfo info = {0}; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + if(tqMetaDecodeCheckInfo(&info, msg, msgLen) != 0){ return -1; } - tDecoderClear(&decoder); - if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + STqCheckInfo *old = tqMetaGetCheckInfo(pTq, info.ntbUid); + if (old != NULL){ + mergeTwoArray(info.colIdList, old->colIdList); + } + if (taosHashPut(pTq->pCheckInfo, &info.ntbUid, sizeof(info.ntbUid), &info, sizeof(STqCheckInfo)) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tDeleteSTqCheckInfo(&info); return -1; } - if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) { + if (tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -628,7 +629,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (tqMetaDeleteCheckInfo(pTq, msg) < 0) { + if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -652,19 +653,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); + taosRLockLatch(&pTq->lock); STqHandle* pHandle = NULL; - while (1) { - pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle) { - break; - } - taosRLockLatch(&pTq->lock); - ret = tqMetaGetHandle(pTq, req.subKey); - taosRUnLockLatch(&pTq->lock); - if (ret < 0) { - break; - } - } + ret = tqMetaGetHandle(pTq, req.subKey, &pHandle); + taosRUnLockLatch(&pTq->lock); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -675,7 +667,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg goto end; } STqHandle handle = {0}; - ret = tqCreateHandle(pTq, &req, &handle); + ret = tqMetaCreateHandle(pTq, &req, &handle); if (ret < 0) { tqDestroyTqHandle(&handle); goto end; diff --git a/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c b/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c deleted file mode 100644 index a3bd22eef0..0000000000 --- a/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "meta.h" -#include "tdbInt.h" -#include "tq.h" - -// STqCheckInfoReader ======================================== -struct STqCheckInfoReader { - STQ* pTq; - int64_t sver; - int64_t ever; - TBC* pCur; -}; - -int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) { - int32_t code = 0; - STqCheckInfoReader* pReader = NULL; - - // alloc - pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader)); - if (pReader == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pReader->pTq = pTq; - pReader->sver = sver; - pReader->ever = ever; - - // impl - code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL); - if (code) { - taosMemoryFree(pReader); - goto _err; - } - - code = tdbTbcMoveToFirst(pReader->pCur); - if (code) { - taosMemoryFree(pReader); - goto _err; - } - - tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode)); - - *ppReader = pReader; - return code; - -_err: - tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppReader = NULL; - return code; -} - -int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) { - int32_t code = 0; - - tdbTbcClose((*ppReader)->pCur); - taosMemoryFree(*ppReader); - *ppReader = NULL; - - return code; -} - -int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) { - int32_t code = 0; - void* pKey = NULL; - void* pVal = NULL; - int32_t kLen = 0; - int32_t vLen = 0; - - if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - goto _exit; - } - - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); - if (*ppData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_TQ_CHECKINFO; - pHdr->size = vLen; - memcpy(pHdr->data, pVal, vLen); - -_exit: - tdbFree(pKey); - tdbFree(pVal); - - tqInfo("vgId:%d, vnode check info tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); - return code; - -_err: - tdbFree(pKey); - tdbFree(pVal); - - tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); - return code; -} - -// STqCheckInfoWriter ======================================== -struct STqCheckInfoWriter { - STQ* pTq; - int64_t sver; - int64_t ever; - TXN* txn; -}; - -int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) { - int32_t code = 0; - STqCheckInfoWriter* pWriter; - - // alloc - pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pWriter->pTq = pTq; - pWriter->sver = sver; - pWriter->ever = ever; - - if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - code = -1; - taosMemoryFree(pWriter); - goto _err; - } - - *ppWriter = pWriter; - return code; - -_err: - tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; -} - -int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - STqCheckInfoWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; - - if (rollback) { - tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); - } else { - code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; - } - - taosMemoryFree(pWriter); - *ppWriter = NULL; - - return code; - -_err: - tqError("vgId:%d, tq check info writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - return code; -} - -int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STQ* pTq = pWriter->pTq; - STqCheckInfo info = {0}; - SDecoder decoder; - SDecoder* pDecoder = &decoder; - - tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeSTqCheckInfo(pDecoder, &info); - if (code) goto _err; - code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)); - if (code) goto _err; - code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - if (code) goto _err; - tDecoderClear(pDecoder); - - return code; - -_err: - tDecoderClear(pDecoder); - tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - return code; -} diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c deleted file mode 100644 index 0f5daa31ad..0000000000 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tq.h" - -int tqCommit(STQ* pTq) { -#if 0 - // stream meta commit does not be aligned to the vnode commit - if (streamMetaCommit(pTq->pStreamMeta) < 0) { - tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr()); - return -1; - } -#endif - - return tqOffsetCommitFile(pTq->pOffsetStore); -} diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 02184f1d50..4b30e79d44 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -75,125 +75,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } -int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL) < 0) { - return -1; +int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){ + SDecoder decoder = {0}; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + int32_t code = tDecodeSTqCheckInfo(&decoder, info); + if (code != 0) { + tDeleteSTqCheckInfo(info); + return TSDB_CODE_OUT_OF_MEMORY; } - - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) { - return -1; - } - - if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) { - return -1; - } - - return 0; -} - -int32_t tqMetaClose(STQ* pTq) { - if (pTq->pExecStore) { - tdbTbClose(pTq->pExecStore); - } - if (pTq->pCheckStore) { - tdbTbClose(pTq->pCheckStore); - } - tdbClose(pTq->pMetaDB); - return 0; -} - -int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) { - TXN* txn; - - if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - - if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) { - return -1; - } - - if (tdbCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - return 0; -} - -int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { - TXN* txn; - - if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - - if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) { - tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key); - } - - if (tdbCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - return 0; -} - -int32_t tqMetaRestoreCheckInfo(STQ* pTq) { - TBC* pCur = NULL; - if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { - return -1; - } - - void* pKey = NULL; - int kLen = 0; - void* pVal = NULL; - int vLen = 0; - SDecoder decoder; - int32_t code = 0; - - tdbTbcMoveToFirst(pCur); - - while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqCheckInfo info; - tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - code = tDecodeSTqCheckInfo(&decoder, &info); - if (code != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } - tDecoderClear(&decoder); - code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)); - if (code != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } - } - -END: - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); + tDecoderClear(&decoder); return code; } +void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid){ + void* data = taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid)); + if (data == NULL) { + int vLen = 0; + if (tdbTbGet(pTq->pCheckStore, &tbUid, sizeof(tbUid), &data, &vLen) < 0) { + tdbFree(data); + return NULL; + } + STqCheckInfo info= {0}; + if(tqMetaDecodeCheckInfo(&info, data, vLen) != 0) { + tdbFree(data); + return NULL; + } + tdbFree(data); + + if(taosHashPut(pTq->pCheckInfo, &tbUid, sizeof(tbUid), &info, sizeof(STqCheckInfo)) != 0){ + tDeleteSTqCheckInfo(&info); + return NULL; + } + return taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid)); + } else { + return data; + } +} + +int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) { + int32_t code = TDB_CODE_SUCCESS; + TXN* txn = NULL; + + TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); + TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + + return 0; +} + +int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { + int32_t code = TDB_CODE_SUCCESS; + TXN* txn = NULL; + + TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); + TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + + return 0; +} + +static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) { + int32_t code = TDB_CODE_SUCCESS; + void* pIter = NULL; + + TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq->pOffset, path)); + while (1) { + pIter = taosHashIterate(pTq->pOffset, pIter); + if (pIter == NULL) { + break; + } + + STqOffset* offset = (STqOffset*)pIter; + TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset->subKey, strlen(offset->subKey), offset, sizeof(STqOffset))); + } + +END: + taosHashCancelIterate(pTq->pOffset, pIter); + return code; +} + +void* tqMetaGetOffset(STQ* pTq, const char* subkey){ + void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey)); + if (data == NULL) { + int vLen = 0; + if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) { + tdbFree(data); + return NULL; + } + + if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), data, sizeof(STqOffset)) != 0){ + tdbFree(data); + return NULL; + } + tdbFree(data); + + return taosHashGet(pTq->pOffset, subkey, strlen(subkey)); + } else { + return data; + } +} + int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code; + int32_t code = TDB_CODE_SUCCESS; int32_t vlen; void* buf = NULL; SEncoder encoder; tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); if (code < 0) { - goto end; + goto END; } tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey, @@ -202,71 +193,33 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { buf = taosMemoryCalloc(1, vlen); if (buf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto end; + goto END; } - tEncoderInit(&encoder, buf, vlen); - code = tEncodeSTqHandle(&encoder, pHandle); if (code < 0) { - goto end; + goto END; } - TXN* txn = NULL; - code = tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - if (code < 0) { - goto end; - } + TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, (int)strlen(key), buf, vlen)); - code = tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn); - if (code < 0) { - goto end; - } - - code = tdbCommit(pTq->pMetaDB, txn); - if (code < 0) { - goto end; - } - - code = tdbPostCommit(pTq->pMetaDB, txn); -end: +END: tEncoderClear(&encoder); taosMemoryFree(buf); return code; } -int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { - TXN* txn; - - if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - - if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) { - } - - if (tdbCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { - return -1; - } - - return 0; -} - -static int buildHandle(STQ* pTq, STqHandle* handle){ +static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){ + int32_t code = TDB_CODE_SUCCESS; SVnode* pVnode = pTq->pVnode; int32_t vgId = TD_VID(pVnode); handle->pRef = walOpenRef(pVnode->pWal); if (handle->pRef == NULL) { - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } - walSetRefVer(handle->pRef, handle->snapshotVer); + TQ_ERR_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer)); SReadHandle reader = { .vnode = pVnode, @@ -282,18 +235,18 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId); if (handle->execHandle.task == NULL) { tqError("cannot create exec task for %s", handle->subKey); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } void* scanner = NULL; - qExtractStreamScanner(handle->execHandle.task, &scanner); + (void)qExtractStreamScanner(handle->execHandle.task, &scanner); if (scanner == NULL) { tqError("cannot extract stream scanner for %s", handle->subKey); - return -1; + return TSDB_CODE_SCH_INTERNAL_ERROR; } handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); if (handle->execHandle.pTqReader == NULL) { tqError("cannot extract exec reader for %s", handle->subKey); - return -1; + return TSDB_CODE_SCH_INTERNAL_ERROR; } } else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) { handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0); @@ -308,7 +261,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s", terrstr()); - return -1; + return TSDB_CODE_SCH_INTERNAL_ERROR; } } buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType, @@ -320,7 +273,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ if(ret != TDB_CODE_SUCCESS) { tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId); taosArrayDestroy(tbUidList); - return -1; + return TSDB_CODE_SCH_INTERNAL_ERROR; } tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); handle->execHandle.pTqReader = tqReaderOpen(pVnode); @@ -330,24 +283,23 @@ static int buildHandle(STQ* pTq, STqHandle* handle){ return 0; } -static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ +static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ int32_t vgId = TD_VID(pTq->pVnode); - SDecoder decoder; - int32_t code = 0; + SDecoder decoder = {0}; + int32_t code = TDB_CODE_SUCCESS; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - code = tDecodeSTqHandle(&decoder, handle); - if (code) goto end; - code = buildHandle(pTq, handle); - if (code) goto end; - tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle)); + TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle)); + tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); -end: +END: tDecoderClear(&decoder); return code; } -int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ +int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t vgId = TD_VID(pTq->pVnode); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -367,165 +319,186 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); - if(buildHandle(pTq, handle) < 0){ + if(tqMetaInitHandle(pTq, handle) < 0){ return -1; } - tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); + tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } -static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ - TBC* pCur = NULL; - if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) { - return -1; - } - - TXN* txn; - if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - return -1; - } - +static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ + TBC* pCur = NULL; void* pKey = NULL; int kLen = 0; void* pVal = NULL; int vLen = 0; + TXN* txn = NULL; - tdbTbcMoveToFirst(pCur); + int32_t code = TDB_CODE_SUCCESS; + + TQ_ERR_GO_TO_END(tdbTbcOpen(pExecStoreOld, &pCur, NULL)); + TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + + TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) { - tqError("transform sub info error"); - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; - } + TQ_ERR_GO_TO_END (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn)); } + + TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); + TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn)); + +END: tdbFree(pKey); tdbFree(pVal); - tdbTbcClose(pCur); + (void)tdbTbcClose(pCur); + return code; +} - if (tdbCommit(pMetaDB, txn) < 0) { - return -1; +static int32_t tqMetaTransformCheckInfo(TDB* pMetaDB, TTB* pCheckOld, TTB* pCheckNew){ + TBC* pCur = NULL; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + TXN* txn = NULL; + + int32_t code = TDB_CODE_SUCCESS; + + TQ_ERR_GO_TO_END(tdbTbcOpen(pCheckOld, &pCur, NULL)); + TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + + TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur)); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqCheckInfo info= {0}; + TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen)); + int64_t uid = info.ntbUid; + tDeleteSTqCheckInfo(&info); + TQ_ERR_GO_TO_END (tdbTbUpsert(pCheckNew, &uid, sizeof(uid), pVal, vLen, txn)); } - if (tdbPostCommit(pMetaDB, txn) < 0) { - return -1; + TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn)); + TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn)); +END: + tdbFree(pKey); + tdbFree(pVal); + (void)tdbTbcClose(pCur); + return code; +} + +int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) { + void* data = taosHashGet(pTq->pHandle, key, strlen(key)); + if(data == NULL){ + int vLen = 0; + if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) { + tdbFree(data); + return TSDB_CODE_OUT_OF_MEMORY; + } + STqHandle handle = {0}; + if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){ + tdbFree(data); + tqDestroyTqHandle(&handle); + return TSDB_CODE_OUT_OF_MEMORY; + } + tdbFree(data); + if(taosHashPut(pTq->pHandle, key, strlen(key), &handle, sizeof(STqHandle)) != 0){ + tqDestroyTqHandle(&handle); + return TSDB_CODE_OUT_OF_MEMORY; + } + *pHandle = taosHashGet(pTq->pCheckInfo, key, strlen(key)); + if(*pHandle == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } + }else{ + *pHandle = data; } - return 0; + return TDB_CODE_SUCCESS; +} + +int32_t tqMetaOpenTdb(STQ* pTq) { + int32_t code = TDB_CODE_SUCCESS; + TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL)); + TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0)); + TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0)); + TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0)); + +END: + return code; +} + +static int32_t replaceTqPath(char** path){ + char* tpath = NULL; + int32_t code = TDB_CODE_SUCCESS; + TQ_ERR_RETURN(tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME)); + taosMemoryFree(*path); + *path = tpath; + return TDB_CODE_SUCCESS; +} + +int32_t tqMetaOpen(STQ* pTq) { + char* maindb = NULL; + int32_t code = TDB_CODE_SUCCESS; + TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); + if(!taosCheckExistFile(maindb)){ + TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path)); + TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); + }else{ + TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); + } + +END: + taosMemoryFree(maindb); + return code; } int32_t tqMetaTransform(STQ* pTq) { - int32_t len = strlen(pTq->path) + 64; - char* maindb = taosMemoryCalloc(1, len); - sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME); + int32_t code = TDB_CODE_SUCCESS; + TDB* pMetaDB = NULL; + TTB* pExecStore = NULL; + TTB* pCheckStore = NULL; + char* offsetNew = NULL; + char* offset = NULL; + TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME)); - if(!taosCheckExistFile(maindb)){ - taosMemoryFree(maindb); - char* tpath = taosMemoryCalloc(1, len); - if(tpath == NULL){ - return -1; - } - sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); - taosMemoryFree(pTq->path); - pTq->path = tpath; - return tqMetaOpen(pTq); - } + TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL)); + TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0)); + TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0)); - int32_t code = 0; - TDB* pMetaDB = NULL; - TTB* pExecStore = NULL; - TTB* pCheckStore = NULL; - char* offsetNew = NULL; - char* offset = tqOffsetBuildFName(pTq->path, 0); - if(offset == NULL){ - code = -1; - goto END; - } + TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path)); + TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); + TQ_ERR_GO_TO_END(tqMetaTransformStoreInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore)); + TQ_ERR_GO_TO_END(tqMetaTransformCheckInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); - if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) { - code = -1; - goto END; - } - - if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) { - code = -1; - goto END; - } - - if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) { - code = -1; - goto END; - } - - char* tpath = taosMemoryCalloc(1, len); - if(tpath == NULL){ - code = -1; - goto END; - } - sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); - taosMemoryFree(pTq->path); - pTq->path = tpath; - if (tqMetaOpen(pTq) < 0) { - code = -1; - goto END; - } - - if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){ - code = -1; - goto END; - } - - if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){ - code = -1; - goto END; - } - - tdbTbClose(pExecStore); - pExecStore = NULL; - tdbTbClose(pCheckStore); - pCheckStore = NULL; - tdbClose(pMetaDB); - pMetaDB = NULL; - - offsetNew = tqOffsetBuildFName(pTq->path, 0); - if(offsetNew == NULL){ - code = -1; - goto END; - } + TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ tqError("copy offset file error"); - code = -1; - goto END; } - taosRemoveFile(maindb); - taosRemoveFile(offset); + TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew)); + (void)taosRemoveFile(offset); - END: - taosMemoryFree(maindb); +END: taosMemoryFree(offset); taosMemoryFree(offsetNew); - tdbTbClose(pExecStore); - tdbTbClose(pCheckStore); - tdbClose(pMetaDB); + //return 0 always, so ignore + (void)tdbTbClose(pExecStore); + (void)tdbTbClose(pCheckStore); + (void)tdbClose(pMetaDB); return code; } -int32_t tqMetaGetHandle(STQ* pTq, const char* key) { - void* pVal = NULL; - int vLen = 0; - - if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) { - return -1; +int32_t tqMetaClose(STQ* pTq) { + if (pTq->pExecStore) { + (void)tdbTbClose(pTq->pExecStore); } - STqHandle handle = {0}; - int code = restoreHandle(pTq, pVal, vLen, &handle); - if (code < 0){ - tqDestroyTqHandle(&handle); + if (pTq->pCheckStore) { + (void)tdbTbClose(pTq->pCheckStore); } - tdbFree(pVal); - return code; -} + if (pTq->pOffsetStore) { + (void)tdbTbClose(pTq->pOffsetStore); + } + (void)tdbClose(pTq->pMetaDB); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 8b0e039ad5..8295ec4824 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -16,192 +16,76 @@ #include "tq.h" -struct STqOffsetStore { - STQ* pTq; - SHashObj* pHash; // SHashObj - int8_t needCommit; -}; - -char* tqOffsetBuildFName(const char* path, int32_t fVer) { - int32_t len = strlen(path); - char* fname = taosMemoryCalloc(1, len + 40); +int32_t tqBuildFName(char** data, const char* path, char* name) { + int32_t len = strlen(path) + strlen(name) + 2; + char* fname = taosMemoryCalloc(1, len); if(fname == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } - snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer); - return fname; + int32_t code = snprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); + if (code < 0){ + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(fname); + return code; + } + *data = fname; + return TDB_CODE_SUCCESS; } -int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { - TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); +int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) { + int32_t code = TDB_CODE_SUCCESS; + void* pMemBuf = NULL; + SDecoder decoder = {0}; + + TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ); if (pFile == NULL) { - return TSDB_CODE_SUCCESS; + code = TAOS_SYSTEM_ERROR(errno); + goto END; } - int32_t vgId = TD_VID(pStore->pTq->pVnode); - int64_t code = 0; + int64_t ret = 0; int32_t size = 0; while (1) { - if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { - if (code == 0) { - break; - } else { - return -1; + if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { + if (ret != 0) { + code = TSDB_CODE_INVALID_MSG; } + goto END; } size = htonl(size); - void* pMemBuf = taosMemoryCalloc(1, size); + pMemBuf = taosMemoryCalloc(1, size); if (pMemBuf == NULL) { - tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } - if ((code = taosReadFile(pFile, pMemBuf, size)) != size) { - taosMemoryFree(pMemBuf); - return -1; + if (taosReadFile(pFile, pMemBuf, size) != size) { + terrno = TSDB_CODE_INVALID_MSG; + goto END; } - STqOffset offset = {0}; - SDecoder decoder; + STqOffset offset; tDecoderInit(&decoder, pMemBuf, size); if (tDecodeSTqOffset(&decoder, &offset) < 0) { - taosMemoryFree(pMemBuf); - tDecoderClear(&decoder); - return code; + code = TSDB_CODE_INVALID_MSG; + goto END; + } + + if (taosHashPut(pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } tDecoderClear(&decoder); - if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - return -1; - } - - // todo remove this - if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle) { - if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) { -// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey, -// offset.val.version); - } - } - } - taosMemoryFree(pMemBuf); + pMemBuf = NULL; } +END: taosCloseFile(&pFile); - return TSDB_CODE_SUCCESS; -} - -STqOffsetStore* tqOffsetOpen(STQ* pTq) { - STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore)); - if (pStore == NULL) { - return NULL; - } - - pStore->pTq = pTq; - pStore->needCommit = 0; - pTq->pOffsetStore = pStore; - - pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - if (pStore->pHash == NULL) { - taosMemoryFree(pStore); - return NULL; - } - - taosHashSetFreeFp(pStore->pHash, tOffsetDestroy); - char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); - if (tqOffsetRestoreFromFile(pStore, fname) < 0) { - taosMemoryFree(fname); - taosMemoryFree(pStore); - return NULL; - } - - taosMemoryFree(fname); - return pStore; -} - -void tqOffsetClose(STqOffsetStore* pStore) { - if(pStore == NULL) return; - tqOffsetCommitFile(pStore); - taosHashCleanup(pStore->pHash); - taosMemoryFree(pStore); -} - -STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { - return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey)); -} - -int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { - pStore->needCommit = 1; - return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); -} - -int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { - return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey)); -} - -int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { - if (!pStore->needCommit) { - return 0; - } - - // TODO file name should be with a newer version - char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); - TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - const char* err = strerror(errno); - tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err); - taosMemoryFree(fname); - return -1; - } - - taosMemoryFree(fname); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pStore->pHash, pIter); - if (pIter == NULL) { - break; - } - - STqOffset* pOffset = (STqOffset*)pIter; - int32_t bodyLen; - int32_t code; - tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code); - - if (code < 0) { - taosHashCancelIterate(pStore->pHash, pIter); - return -1; - } - - int32_t totLen = INT_BYTES + bodyLen; - void* buf = taosMemoryCalloc(1, totLen); - void* abuf = POINTER_SHIFT(buf, INT_BYTES); - - *(int32_t*)buf = htonl(bodyLen); - SEncoder encoder; - tEncoderInit(&encoder, abuf, bodyLen); - tEncodeSTqOffset(&encoder, pOffset); - - // write file - int64_t writeLen; - if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) { - tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen); - taosHashCancelIterate(pStore->pHash, pIter); - taosMemoryFree(buf); - return -1; - } - - taosMemoryFree(buf); - } - - // close and rename file - taosCloseFile(&pFile); - pStore->needCommit = 0; - return 0; + taosMemoryFree(pMemBuf); + tDecoderClear(&decoder); + + return code; } diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c deleted file mode 100644 index 8a7f672e5d..0000000000 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "meta.h" -#include "tdbInt.h" -#include "tq.h" - -// STqOffsetReader ======================================== -struct STqOffsetReader { - STQ* pTq; - int64_t sver; - int64_t ever; - int8_t readEnd; -}; - -int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader) { - STqOffsetReader* pReader = NULL; - - pReader = taosMemoryCalloc(1, sizeof(STqOffsetReader)); - if (pReader == NULL) { - *ppReader = NULL; - return -1; - } - pReader->pTq = pTq; - pReader->sver = sver; - pReader->ever = ever; - - tqInfo("vgId:%d, vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode)); - - *ppReader = pReader; - return 0; -} - -int32_t tqOffsetReaderClose(STqOffsetReader** ppReader) { - taosMemoryFree(*ppReader); - *ppReader = NULL; - return 0; -} - -int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) { - if (pReader->readEnd != 0) return 0; - - char* fname = tqOffsetBuildFName(pReader->pTq->path, 0); - TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - if (pFile == NULL) { - taosMemoryFree(fname); - return 0; - } - - int64_t sz = 0; - if (taosStatFile(fname, &sz, NULL, NULL) < 0) { - taosCloseFile(&pFile); - taosMemoryFree(fname); - return -1; - } - taosMemoryFree(fname); - - SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr)); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosCloseFile(&pFile); - return terrno; - } - void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr)); - int64_t contLen = taosReadFile(pFile, abuf, sz); - if (contLen != sz) { - taosCloseFile(&pFile); - taosMemoryFree(buf); - return -1; - } - buf->size = sz; - buf->type = SNAP_DATA_TQ_OFFSET; - *ppData = (uint8_t*)buf; - - pReader->readEnd = 1; - taosCloseFile(&pFile); - return 0; -} - -// STqOffseWriter ======================================== -struct STqOffsetWriter { - STQ* pTq; - int64_t sver; - int64_t ever; - int32_t tmpFileVer; - char* fname; -}; - -int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter) { - int32_t code = 0; - STqOffsetWriter* pWriter; - - pWriter = (STqOffsetWriter*)taosMemoryCalloc(1, sizeof(STqOffsetWriter)); - if (pWriter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pWriter->pTq = pTq; - pWriter->sver = sver; - pWriter->ever = ever; - - *ppWriter = pWriter; - return code; - -_err: - tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; - return code; -} - -int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) { - STqOffsetWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; - char* fname = tqOffsetBuildFName(pTq->path, 0); - - if (rollback) { - if (taosRemoveFile(pWriter->fname) < 0) { - taosMemoryFree(fname); - return -1; - } - } else { - if (taosRenameFile(pWriter->fname, fname) < 0) { - taosMemoryFree(fname); - return -1; - } - if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) { - taosMemoryFree(fname); - return -1; - } - } - taosMemoryFree(fname); - taosMemoryFree(pWriter->fname); - taosMemoryFree(pWriter); - *ppWriter = NULL; - return 0; -} - -int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData) { - STQ* pTq = pWriter->pTq; - pWriter->tmpFileVer = 1; - pWriter->fname = tqOffsetBuildFName(pTq->path, pWriter->tmpFileVer); - TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE); - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - int64_t size = pHdr->size; - if (pFile) { - int64_t contLen = taosWriteFile(pFile, pHdr->data, size); - if (contLen != size) { - taosCloseFile(&pFile); - return -1; - } - taosCloseFile(&pFile); - } else { - return -1; - } - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqHandleSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c similarity index 71% rename from source/dnode/vnode/src/tq/tqHandleSnapshot.c rename to source/dnode/vnode/src/tq/tqSnapshot.c index 28fd315eb6..755274ff5e 100644 --- a/source/dnode/vnode/src/tq/tqHandleSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -23,9 +23,10 @@ struct STqSnapReader { int64_t sver; int64_t ever; TBC* pCur; + int8_t type; }; -int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { +int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) { int32_t code = 0; STqSnapReader* pReader = NULL; @@ -38,9 +39,21 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; + pReader->type = type; // impl - code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL); + TTB *pTb = NULL; + if (type == SNAP_DATA_TQ_CHECKINFO) { + pTb = pTq->pCheckStore; + } else if (type == SNAP_DATA_TQ_HANDLE) { + pTb = pTq->pExecStore; + } else if (type == SNAP_DATA_TQ_OFFSET) { + pTb = pTq->pOffsetStore; + } else { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } + code = tdbTbcOpen(pTb, &pReader->pCur, NULL); if (code) { taosMemoryFree(pReader); goto _err; @@ -91,7 +104,7 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { } SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); - pHdr->type = SNAP_DATA_TQ_HANDLE; + pHdr->type = pReader->type; pHdr->size = vLen; memcpy(pHdr->data, pVal, vLen); @@ -169,7 +182,7 @@ _err: return code; } -int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { +int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; SDecoder decoder = {0}; @@ -180,7 +193,7 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { code = tDecodeSTqHandle(pDecoder, &handle); if (code) goto end; taosWLockLatch(&pTq->lock); - code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, (int)strlen(handle.subKey), pData, nData); taosWUnLockLatch(&pTq->lock); end: @@ -189,3 +202,36 @@ end: tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code); return code; } + +int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + STqCheckInfo info = {0}; + if(tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)) != 0){ + goto _err; + } + + code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + if (code) goto _err; + + return code; + + _err: + tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} + +int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; + STqOffset *info = (STqOffset*)(pData + sizeof(SSnapDataHdr)); + + code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info->subKey, strlen(info->subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); + if (code) goto _err; + + return code; + + _err: + tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; +} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5290c39d42..8c3417239b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -77,7 +77,7 @@ static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { uint64_t consumerId = pRequest->consumerId; - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); + STqOffset* pOffset = (STqOffset*)tqMetaGetOffset(pTq, pRequest->subKey); int32_t vgId = TD_VID(pTq->pVnode); *pBlockReturned = false; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 5acaf2bce4..12b26a4b26 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -458,11 +458,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { TSDB_CHECK_CODE(code, lino, _exit); } - if (tqCommit(pVnode->pTq) < 0) { - code = TSDB_CODE_FAILED; - TSDB_CHECK_CODE(code, lino, _exit); - } - // commit info if (vnodeCommitInfo(dir) < 0) { code = terrno; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 611a603c63..a050734cb9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -59,9 +59,9 @@ struct SVSnapReader { int8_t tqHandleDone; STqSnapReader *pTqSnapReader; int8_t tqOffsetDone; - STqOffsetReader *pTqOffsetReader; + STqSnapReader *pTqOffsetReader; int8_t tqCheckInfoDone; - STqCheckInfoReader *pTqCheckInfoReader; + STqSnapReader *pTqCheckInfoReader; // stream int8_t streamTaskDone; SStreamTaskReader *pStreamTaskReader; @@ -233,11 +233,11 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { } if (pReader->pTqOffsetReader) { - tqOffsetReaderClose(&pReader->pTqOffsetReader); + tqSnapReaderClose(&pReader->pTqOffsetReader); } if (pReader->pTqCheckInfoReader) { - tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); + tqSnapReaderClose(&pReader->pTqCheckInfoReader); } taosMemoryFree(pReader); @@ -365,7 +365,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) vInfo("vgId:%d tq transform start", vgId); if (!pReader->tqHandleDone) { if (pReader->pTqSnapReader == NULL) { - code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader); + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE, &pReader->pTqSnapReader); if (code < 0) goto _err; } @@ -384,11 +384,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } if (!pReader->tqCheckInfoDone) { if (pReader->pTqCheckInfoReader == NULL) { - code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader); + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO, &pReader->pTqCheckInfoReader); if (code < 0) goto _err; } - code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData); + code = tqSnapRead(pReader->pTqCheckInfoReader, ppData); if (code) { goto _err; } else { @@ -396,18 +396,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) goto _exit; } else { pReader->tqCheckInfoDone = 1; - code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); + code = tqSnapReaderClose(&pReader->pTqCheckInfoReader); if (code) goto _err; } } } if (!pReader->tqOffsetDone) { if (pReader->pTqOffsetReader == NULL) { - code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader); + code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET, &pReader->pTqOffsetReader); if (code < 0) goto _err; } - code = tqOffsetSnapRead(pReader->pTqOffsetReader, ppData); + code = tqSnapRead(pReader->pTqOffsetReader, ppData); if (code) { goto _err; } else { @@ -415,7 +415,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) goto _exit; } else { pReader->tqOffsetDone = 1; - code = tqOffsetReaderClose(&pReader->pTqOffsetReader); + code = tqSnapReaderClose(&pReader->pTqOffsetReader); if (code) goto _err; } } @@ -536,9 +536,9 @@ struct SVSnapWriter { // tsdb raw STsdbSnapRAWWriter *pTsdbSnapRAWWriter; // tq - STqSnapWriter *pTqSnapWriter; - STqOffsetWriter *pTqOffsetWriter; - STqCheckInfoWriter *pTqCheckInfoWriter; + STqSnapWriter *pTqSnapHandleWriter; + STqSnapWriter *pTqSnapOffsetWriter; + STqSnapWriter *pTqSnapCheckInfoWriter; // stream SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; @@ -736,18 +736,18 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } - if (pWriter->pTqSnapWriter) { - code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback); + if (pWriter->pTqSnapHandleWriter) { + code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback); if (code) goto _exit; } - if (pWriter->pTqCheckInfoWriter) { - code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback); + if (pWriter->pTqSnapCheckInfoWriter) { + code = tqSnapWriterClose(&pWriter->pTqSnapCheckInfoWriter, rollback); if (code) goto _exit; } - if (pWriter->pTqOffsetWriter) { - code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback); + if (pWriter->pTqSnapOffsetWriter) { + code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback); if (code) goto _exit; } @@ -872,32 +872,32 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { } break; case SNAP_DATA_TQ_HANDLE: { // tq handle - if (pWriter->pTqSnapWriter == NULL) { - code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter); + if (pWriter->pTqSnapHandleWriter == NULL) { + code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter); if (code) goto _err; } - code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData); + code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData); if (code) goto _err; } break; case SNAP_DATA_TQ_CHECKINFO: { // tq checkinfo - if (pWriter->pTqCheckInfoWriter == NULL) { - code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter); + if (pWriter->pTqSnapCheckInfoWriter == NULL) { + code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter); if (code) goto _err; } - code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData); + code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData); if (code) goto _err; } break; case SNAP_DATA_TQ_OFFSET: { // tq offset - if (pWriter->pTqOffsetWriter == NULL) { - code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter); + if (pWriter->pTqSnapOffsetWriter == NULL) { + code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter); if (code) goto _err; } - code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData); + code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData); if (code) goto _err; } break; case SNAP_DATA_STREAM_TASK: diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index b7169dec53..7f47517257 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -52,15 +52,14 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { taosThreadMutexLock(&pWal->mutex); if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { taosThreadMutexUnlock(&pWal->mutex); - terrno = TSDB_CODE_WAL_INVALID_VER; - return -1; + return TSDB_CODE_WAL_INVALID_VER; } pRef->refVer = ver; taosThreadMutexUnlock(&pWal->mutex); } - return 0; + return TSDB_CODE_SUCCESS; } void walRefFirstVer(SWal *pWal, SWalRef *pRef) {