From 930b8500354b0839ceeaff132da7ca444f4e3893 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 23 May 2022 19:51:40 +0800 Subject: [PATCH] refacotr: remove tq meta store --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/inc/tq.h | 163 ------- source/dnode/vnode/src/tq/tq.c | 130 +---- source/dnode/vnode/src/tq/tqMetaStore.c | 622 ------------------------ source/dnode/vnode/test/tqMetaTest.cpp | 279 ----------- 5 files changed, 1 insertion(+), 1194 deletions(-) delete mode 100644 source/dnode/vnode/src/tq/tqMetaStore.c delete mode 100644 source/dnode/vnode/test/tqMetaTest.cpp diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index b10bee4732..4141485d28 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -48,7 +48,6 @@ target_sources( # tq "src/tq/tq.c" "src/tq/tqCommit.c" - "src/tq/tqMetaStore.c" "src/tq/tqOffset.c" "src/tq/tqPush.c" "src/tq/tqRead.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 3017523aa9..f89df4a96f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -41,45 +41,6 @@ extern "C" { #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define TQ_BUFFER_SIZE 4 - -#define TQ_BUCKET_MASK 0xFF -#define TQ_BUCKET_SIZE 256 - -#define TQ_PAGE_SIZE 4096 -// key + offset + size -#define TQ_IDX_SIZE 24 -// 4096 / 24 -#define TQ_MAX_IDX_ONE_PAGE 170 -// 24 * 170 -#define TQ_IDX_PAGE_BODY_SIZE 4080 -// 4096 - 4080 -#define TQ_IDX_PAGE_HEAD_SIZE 16 - -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 -#define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 - -#define TQ_SVER 0 - -// TODO: inplace mode is not implemented -#define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 - -#define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 - -static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } - -static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } - -static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; - -#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE - -typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; - typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetStore STqOffsetStore; @@ -98,53 +59,6 @@ struct STqReadHandle { STSchema* pSchema; }; -typedef struct { - int16_t ver; - int16_t action; - int32_t checksum; - int64_t ssize; - char content[]; -} STqSerializedHead; - -typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); -typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj); -typedef void (*FTqDelete)(void*); - -typedef struct { - int64_t key; - int64_t offset; - int64_t serializedSize; - void* valueInUse; - void* valueInTxn; -} STqMetaHandle; - -typedef struct STqMetaList { - STqMetaHandle handle; - struct STqMetaList* next; - // struct STqMetaList* inTxnPrev; - // struct STqMetaList* inTxnNext; - struct STqMetaList* unpersistPrev; - struct STqMetaList* unpersistNext; -} STqMetaList; - -typedef struct { - STQ* pTq; - STqMetaList* bucket[TQ_BUCKET_SIZE]; - // a table head - STqMetaList* unpersistHead; - // topics that are not connectted - STqMetaList* unconnectTopic; - - TdFilePtr pFile; - TdFilePtr pIdxFile; - - char* dirPath; - int32_t tqConfigFlag; - FTqSerialize pSerializer; - FTqDeserialize pDeserializer; - FTqDelete pDeleter; -} STqMetaStore; - typedef struct { int64_t consumerId; int32_t epoch; @@ -189,87 +103,10 @@ typedef struct { static STqMgmt tqMgmt; -typedef struct { - int8_t status; - int64_t offset; - qTaskInfo_t task; - STqReadHandle* pReadHandle; -} STqTaskItem; - -// new version -typedef struct { - int64_t firstOffset; - int64_t lastOffset; - STqTaskItem output[TQ_BUFFER_SIZE]; -} STqBuffer; - -typedef struct { - char topicName[TSDB_TOPIC_FNAME_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - char* qmsg; - STqBuffer buffer; - SWalReadHandle* pReadhandle; -} STqTopic; - -typedef struct { - int64_t consumerId; - int32_t epoch; - char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray -} STqConsumer; - -typedef struct { - int8_t type; - int8_t nodeType; - int8_t reserved[6]; - int64_t streamId; - qTaskInfo_t task; - // TODO sync function -} STqStreamPusher; - -typedef struct { - int8_t inited; - tmr_h timer; -} STqPushMgmt; - -static STqPushMgmt tqPushMgmt; - // init once int tqInit(); void tqCleanUp(); -// open in each vnode -// required by vnode - -int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); -int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); - -static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } - -// tqMetaStore.h -STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, - FTqDelete pDeleter, int32_t tqConfigFlag); -int32_t tqStoreClose(STqMetaStore*); -// int32_t tqStoreDelete(TqMetaStore*); -// int32_t tqStoreCommitAll(TqMetaStore*); -int32_t tqStorePersist(STqMetaStore*); -// clean deleted idx and data from persistent file -int32_t tqStoreCompact(STqMetaStore*); - -void* tqHandleGet(STqMetaStore*, int64_t key); -// make it unpersist -void* tqHandleTouchGet(STqMetaStore*, int64_t key); -int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); -int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); -// delete committed kv pair -// notice that a delete action still needs to be committed -int32_t tqHandleDel(STqMetaStore*, int64_t key); -int32_t tqHandlePurge(STqMetaStore*, int64_t key); -int32_t tqHandleCommit(STqMetaStore*, int64_t key); -int32_t tqHandleAbort(STqMetaStore*, int64_t key); - // tqOffset STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); void STqOffsetClose(STqOffsetStore*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bc9a053a43..6ca523b580 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -214,7 +214,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) { - // TODO error handle + // TODO handle sma error } void* data = taosMemoryMalloc(msgLen); if (data == NULL) { @@ -230,134 +230,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) int tqCommit(STQ* pTq) { // do nothing - /*return tqStorePersist(pTq->tqMeta);*/ - return 0; -} - -int32_t tqGetTopicHandleSize(const STqTopic* pTopic) { - return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) + - sizeof(int64_t) * 3; -} - -int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) { - int num = taosArrayGetSize(pConsumer->topics); - int32_t sz = 0; - for (int i = 0; i < num; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - sz += tqGetTopicHandleSize(pTopic); - } - return sz; -} - -static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pTopic->topicName); - /*tlen += taosEncodeString(buf, pTopic->sql);*/ - /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/ - tlen += taosEncodeString(buf, pTopic->qmsg); - /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/ - /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/ - /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/ - return tlen; -} - -static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) { - buf = taosDecodeStringTo(buf, pTopic->topicName); - /*buf = taosDecodeString(buf, &pTopic->sql);*/ - /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/ - buf = taosDecodeString(buf, &pTopic->qmsg); - /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/ - /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/ - /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/ - return buf; -} - -static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pConsumer) { - int32_t sz; - - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); - tlen += taosEncodeFixedI32(buf, pConsumer->epoch); - tlen += taosEncodeString(buf, pConsumer->cgroup); - sz = taosArrayGetSize(pConsumer->topics); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - tlen += tEncodeSTqTopic(buf, pTopic); - } - return tlen; -} - -static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* pConsumer) { - int32_t sz; - - buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); - buf = taosDecodeFixedI32(buf, &pConsumer->epoch); - buf = taosDecodeStringTo(buf, pConsumer->cgroup); - buf = taosDecodeFixedI32(buf, &sz); - pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic)); - if (pConsumer->topics == NULL) return NULL; - for (int32_t i = 0; i < sz; i++) { - STqTopic pTopic; - buf = tDecodeSTqTopic(buf, &pTopic); - taosArrayPush(pConsumer->topics, &pTopic); - } - return buf; -} - -int tqSerializeConsumer(const STqConsumer* pConsumer, STqSerializedHead** ppHead) { - int32_t sz = tEncodeSTqConsumer(NULL, pConsumer); - - if (sz > (*ppHead)->ssize) { - void* tmpPtr = taosMemoryRealloc(*ppHead, sizeof(STqSerializedHead) + sz); - if (tmpPtr == NULL) { - taosMemoryFree(*ppHead); - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - *ppHead = tmpPtr; - (*ppHead)->ssize = sz; - } - - void* ptr = (*ppHead)->content; - void* abuf = ptr; - tEncodeSTqConsumer(&abuf, pConsumer); - - return 0; -} - -int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsumer** ppConsumer) { - const void* str = pHead->content; - *ppConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); - if (*ppConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - if (tDecodeSTqConsumer(str, *ppConsumer) == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - STqConsumer* pConsumer = *ppConsumer; - int32_t sz = taosArrayGetSize(pConsumer->topics); - for (int32_t i = 0; i < sz; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); - if (pTopic->pReadhandle == NULL) { - ASSERT(false); - } - for (int j = 0; j < TQ_BUFFER_SIZE; j++) { - pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - }; - pTopic->buffer.output[j].pReadHandle = pReadHandle; - pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); - } - } - return 0; } diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c deleted file mode 100644 index ca09cc1dc1..0000000000 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ /dev/null @@ -1,622 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include "tq.h" -// #include -// #include -// #include -// #include "osDir.h" - -#define TQ_META_NAME "tq.meta" -#define TQ_IDX_NAME "tq.idx" - -static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value); -static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key); - -static inline void tqLinkUnpersist(STqMetaStore* pMeta, STqMetaList* pNode) { - if (pNode->unpersistNext == NULL) { - pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; - pNode->unpersistPrev = pMeta->unpersistHead; - pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode; - pMeta->unpersistHead->unpersistNext = pNode; - } -} - -static inline int64_t tqSeekLastPage(TdFilePtr pFile) { - int offset = taosLSeekFile(pFile, 0, SEEK_END); - int pageNo = offset / TQ_PAGE_SIZE; - int curPageOffset = pageNo * TQ_PAGE_SIZE; - return taosLSeekFile(pFile, curPageOffset, SEEK_SET); -} - -// TODO: the struct is tightly coupled with index entry -typedef struct STqIdxPageHead { - int16_t writeOffset; - int8_t unused[14]; -} STqIdxPageHead; - -typedef struct STqIdxPageBuf { - STqIdxPageHead head; - char buffer[TQ_IDX_PAGE_BODY_SIZE]; -} STqIdxPageBuf; - -static inline int tqReadLastPage(TdFilePtr pFile, STqIdxPageBuf* pBuf) { - int offset = tqSeekLastPage(pFile); - int nBytes; - if ((nBytes = taosReadFile(pFile, pBuf, TQ_PAGE_SIZE)) == -1) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - if (nBytes == 0) { - memset(pBuf, 0, TQ_PAGE_SIZE); - pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; - } - ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset); - - return taosLSeekFile(pFile, offset, SEEK_SET); -} - -STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, FTqDeserialize deserializer, - FTqDelete deleter, int32_t tqConfigFlag) { - STqMetaStore* pMeta = taosMemoryCalloc(1, sizeof(STqMetaStore)); - if (pMeta == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return NULL; - } - pMeta->pTq = pTq; - - // concat data file name and index file name - size_t pathLen = strlen(path); - pMeta->dirPath = taosMemoryMalloc(pathLen + 1); - if (pMeta->dirPath == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - taosMemoryFree(pMeta); - return NULL; - } - strcpy(pMeta->dirPath, path); - - char* name = taosMemoryMalloc(pathLen + 10); - - strcpy(name, path); - if (!taosDirExist(name) && taosMkDir(name) != 0) { - terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR; - tqError("failed to create dir:%s since %s ", name, terrstr()); - } - strcat(name, "/" TQ_IDX_NAME); - TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ); - if (pIdxFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - tqError("failed to open file:%s since %s ", name, terrstr()); - // free memory - taosMemoryFree(name); - return NULL; - } - - pMeta->pIdxFile = pIdxFile; - pMeta->unpersistHead = taosMemoryCalloc(1, sizeof(STqMetaList)); - if (pMeta->unpersistHead == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - taosMemoryFree(name); - return NULL; - } - pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead; - - strcpy(name, path); - strcat(name, "/" TQ_META_NAME); - TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - tqError("failed to open file:%s since %s", name, terrstr()); - taosMemoryFree(name); - return NULL; - } - taosMemoryFree(name); - - pMeta->pFile = pFile; - - pMeta->pSerializer = serializer; - pMeta->pDeserializer = deserializer; - pMeta->pDeleter = deleter; - pMeta->tqConfigFlag = tqConfigFlag; - - // read idx file and load into memory - STqIdxPageBuf idxBuf; - STqSerializedHead* serializedObj = taosMemoryMalloc(TQ_PAGE_SIZE); - if (serializedObj == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - } - int idxRead; - int allocated = TQ_PAGE_SIZE; - bool readEnd = false; - while ((idxRead = taosReadFile(pIdxFile, &idxBuf, TQ_PAGE_SIZE))) { - if (idxRead == -1) { - // TODO: handle error - terrno = TAOS_SYSTEM_ERROR(errno); - tqError("failed to read tq index file since %s", terrstr()); - } - ASSERT(idxBuf.head.writeOffset == idxRead); - // loop read every entry - for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { - STqMetaList* pNode = taosMemoryCalloc(1, sizeof(STqMetaList)); - if (pNode == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - // TODO: free memory - } - memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); - - taosLSeekFile(pFile, pNode->handle.offset, SEEK_SET); - if (allocated < pNode->handle.serializedSize) { - void* ptr = taosMemoryRealloc(serializedObj, pNode->handle.serializedSize); - if (ptr == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - // TODO: free memory - } - serializedObj = ptr; - allocated = pNode->handle.serializedSize; - } - serializedObj->ssize = pNode->handle.serializedSize; - if (taosReadFile(pFile, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) { - // TODO: read error - } - if (serializedObj->action == TQ_ACTION_INUSE) { - if (serializedObj->ssize != sizeof(STqSerializedHead)) { - pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse); - } else { - pNode->handle.valueInUse = TQ_DELETE_TOKEN; - } - } else if (serializedObj->action == TQ_ACTION_INTXN) { - if (serializedObj->ssize != sizeof(STqSerializedHead)) { - pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInTxn); - } else { - pNode->handle.valueInTxn = TQ_DELETE_TOKEN; - } - } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) { - if (serializedObj->ssize != sizeof(STqSerializedHead)) { - pMeta->pDeserializer(pTq, serializedObj, &pNode->handle.valueInUse); - } else { - pNode->handle.valueInUse = TQ_DELETE_TOKEN; - } - STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); - if (ptr->ssize != sizeof(STqSerializedHead)) { - pMeta->pDeserializer(pTq, ptr, &pNode->handle.valueInTxn); - } else { - pNode->handle.valueInTxn = TQ_DELETE_TOKEN; - } - } else { - ASSERT(0); - } - - // put into list - int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; - STqMetaList* pBucketNode = pMeta->bucket[bucketKey]; - if (pBucketNode == NULL) { - pMeta->bucket[bucketKey] = pNode; - } else if (pBucketNode->handle.key == pNode->handle.key) { - pNode->next = pBucketNode->next; - pMeta->bucket[bucketKey] = pNode; - } else { - while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) { - pBucketNode = pBucketNode->next; - } - if (pBucketNode->next) { - ASSERT(pBucketNode->next->handle.key == pNode->handle.key); - STqMetaList* pNodeFound = pBucketNode->next; - pNode->next = pNodeFound->next; - pBucketNode->next = pNode; - pBucketNode = pNodeFound; - } else { - pNode->next = pMeta->bucket[bucketKey]; - pMeta->bucket[bucketKey] = pNode; - pBucketNode = NULL; - } - } - if (pBucketNode) { - if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pBucketNode->handle.valueInUse); - } - if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pBucketNode->handle.valueInTxn); - } - taosMemoryFree(pBucketNode); - } - } - } - taosMemoryFree(serializedObj); - return pMeta; -} - -int32_t tqStoreClose(STqMetaStore* pMeta) { - // commit data and idx - tqStorePersist(pMeta); - ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next == NULL); - taosCloseFile(&pMeta->pFile); - taosCloseFile(&pMeta->pIdxFile); - // free memory - for (int i = 0; i < TQ_BUCKET_SIZE; i++) { - STqMetaList* pNode = pMeta->bucket[i]; - while (pNode) { - ASSERT(pNode->unpersistNext == NULL); - ASSERT(pNode->unpersistPrev == NULL); - if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInTxn); - } - if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInUse); - } - STqMetaList* next = pNode->next; - taosMemoryFree(pNode); - pNode = next; - } - } - taosMemoryFree(pMeta->dirPath); - taosMemoryFree(pMeta->unpersistHead); - taosMemoryFree(pMeta); - return 0; -} - -int32_t tqStoreDelete(STqMetaStore* pMeta) { - taosCloseFile(&pMeta->pFile); - taosCloseFile(&pMeta->pIdxFile); - // free memory - for (int i = 0; i < TQ_BUCKET_SIZE; i++) { - STqMetaList* pNode = pMeta->bucket[i]; - pMeta->bucket[i] = NULL; - while (pNode) { - if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInTxn); - } - if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInUse); - } - STqMetaList* next = pNode->next; - taosMemoryFree(pNode); - pNode = next; - } - } - taosMemoryFree(pMeta->unpersistHead); - taosRemoveDir(pMeta->dirPath); - taosMemoryFree(pMeta->dirPath); - taosMemoryFree(pMeta); - return 0; -} - -int32_t tqStorePersist(STqMetaStore* pMeta) { - STqIdxPageBuf idxBuf; - int64_t* bufPtr = (int64_t*)idxBuf.buffer; - STqMetaList* pHead = pMeta->unpersistHead; - STqMetaList* pNode = pHead->unpersistNext; - STqSerializedHead* pSHead = taosMemoryMalloc(sizeof(STqSerializedHead)); - if (pSHead == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - pSHead->ver = TQ_SVER; - pSHead->checksum = 0; - pSHead->ssize = sizeof(STqSerializedHead); - /*int allocatedSize = sizeof(STqSerializedHead);*/ - int offset = taosLSeekFile(pMeta->pFile, 0, SEEK_CUR); - - tqReadLastPage(pMeta->pIdxFile, &idxBuf); - - if (idxBuf.head.writeOffset == TQ_PAGE_SIZE) { - taosLSeekFile(pMeta->pIdxFile, 0, SEEK_END); - memset(&idxBuf, 0, TQ_PAGE_SIZE); - idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; - } else { - bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset); - } - - while (pHead != pNode) { - int nBytes = 0; - - if (pNode->handle.valueInUse) { - if (pNode->handle.valueInTxn) { - pSHead->action = TQ_ACTION_INUSE_CONT; - } else { - pSHead->action = TQ_ACTION_INUSE; - } - - if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) { - pSHead->ssize = sizeof(STqSerializedHead); - } else { - pMeta->pSerializer(pNode->handle.valueInUse, &pSHead); - } - nBytes = taosWriteFile(pMeta->pFile, pSHead, pSHead->ssize); - ASSERT(nBytes == pSHead->ssize); - } - - if (pNode->handle.valueInTxn) { - pSHead->action = TQ_ACTION_INTXN; - if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) { - pSHead->ssize = sizeof(STqSerializedHead); - } else { - pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead); - } - int nBytesTxn = taosWriteFile(pMeta->pFile, pSHead, pSHead->ssize); - ASSERT(nBytesTxn == pSHead->ssize); - nBytes += nBytesTxn; - } - pNode->handle.offset = offset; - offset += nBytes; - - // write idx file - // TODO: endian check and convert - *(bufPtr++) = pNode->handle.key; - *(bufPtr++) = pNode->handle.offset; - *(bufPtr++) = (int64_t)nBytes; - idxBuf.head.writeOffset += TQ_IDX_SIZE; - - if (idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { - nBytes = taosWriteFile(pMeta->pIdxFile, &idxBuf, TQ_PAGE_SIZE); - // TODO: handle error with tfile - ASSERT(nBytes == TQ_PAGE_SIZE); - memset(&idxBuf, 0, TQ_PAGE_SIZE); - idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; - bufPtr = (int64_t*)&idxBuf.buffer; - } - // remove from unpersist list - pHead->unpersistNext = pNode->unpersistNext; - pHead->unpersistNext->unpersistPrev = pHead; - pNode->unpersistPrev = pNode->unpersistNext = NULL; - pNode = pHead->unpersistNext; - - // remove from bucket - if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) { - int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; - STqMetaList* pBucketHead = pMeta->bucket[bucketKey]; - if (pBucketHead == pNode) { - pMeta->bucket[bucketKey] = pNode->next; - } else { - STqMetaList* pBucketNode = pBucketHead; - while (pBucketNode->next != NULL && pBucketNode->next != pNode) { - pBucketNode = pBucketNode->next; - } - // impossible for pBucket->next == NULL - ASSERT(pBucketNode->next == pNode); - pBucketNode->next = pNode->next; - } - taosMemoryFree(pNode); - } - } - - // write left bytes - taosMemoryFree(pSHead); - // TODO: write new version in tfile - if ((char*)bufPtr != idxBuf.buffer) { - int nBytes = taosWriteFile(pMeta->pIdxFile, &idxBuf, idxBuf.head.writeOffset); - // TODO: handle error in tfile - ASSERT(nBytes == idxBuf.head.writeOffset); - } - // TODO: using fsync in tfile - taosFsyncFile(pMeta->pIdxFile); - taosFsyncFile(pMeta->pFile); - return 0; -} - -static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInUse); - } - // change pointer ownership - pNode->handle.valueInUse = value; - return 0; - } else { - pNode = pNode->next; - } - } - STqMetaList* pNewNode = taosMemoryCalloc(1, sizeof(STqMetaList)); - if (pNewNode == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - pNewNode->handle.key = key; - pNewNode->handle.valueInUse = value; - pNewNode->next = pMeta->bucket[bucketKey]; - // put into unpersist list - pNewNode->unpersistPrev = pMeta->unpersistHead; - pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext; - pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode; - pMeta->unpersistHead->unpersistNext = pNewNode; - return 0; -} - -void* tqHandleGet(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - return pNode->handle.valueInUse; - } else { - return NULL; - } - } else { - pNode = pNode->next; - } - } - return NULL; -} - -void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - tqLinkUnpersist(pMeta, pNode); - return pNode->handle.valueInUse; - } else { - return NULL; - } - } else { - pNode = pNode->next; - } - } - return NULL; -} - -static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInTxn) { - if (tqDupIntxnReject(pMeta->tqConfigFlag)) { - terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN; - return -1; - } - if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInTxn); - } - } - pNode->handle.valueInTxn = value; - tqLinkUnpersist(pMeta, pNode); - return 0; - } else { - pNode = pNode->next; - } - } - STqMetaList* pNewNode = taosMemoryCalloc(1, sizeof(STqMetaList)); - if (pNewNode == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - pNewNode->handle.key = key; - pNewNode->handle.valueInTxn = value; - pNewNode->next = pMeta->bucket[bucketKey]; - pMeta->bucket[bucketKey] = pNewNode; - tqLinkUnpersist(pMeta, pNewNode); - return 0; -} - -int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); } - -int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { - void* vmem = taosMemoryMalloc(vsize); - if (vmem == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - memcpy(vmem, value, vsize); - return tqHandlePutImpl(pMeta, key, vmem); -} - -static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - return pNode->handle.valueInTxn; - } else { - return NULL; - } - } else { - pNode = pNode->next; - } - } - return NULL; -} - -int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInTxn == NULL) { - terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; - return -1; - } - if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInUse); - } - pNode->handle.valueInUse = pNode->handle.valueInTxn; - pNode->handle.valueInTxn = NULL; - tqLinkUnpersist(pMeta, pNode); - return 0; - } else { - pNode = pNode->next; - } - } - terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; - return -1; -} - -int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInTxn) { - if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->pDeleter(pNode->handle.valueInTxn); - } - pNode->handle.valueInTxn = NULL; - tqLinkUnpersist(pMeta, pNode); - return 0; - } - terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; - return -1; - } else { - pNode = pNode->next; - } - } - terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; - return -1; -} - -int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - if (pNode->handle.valueInTxn) { - pMeta->pDeleter(pNode->handle.valueInTxn); - } - - pNode->handle.valueInTxn = TQ_DELETE_TOKEN; - tqLinkUnpersist(pMeta, pNode); - return 0; - } - } else { - pNode = pNode->next; - } - } - terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; - return -1; -} - -int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_MASK; - STqMetaList* pNode = pMeta->bucket[bucketKey]; - while (pNode) { - if (pNode->handle.key == key) { - pNode->handle.valueInUse = TQ_DELETE_TOKEN; - tqLinkUnpersist(pMeta, pNode); - return 0; - } else { - pNode = pNode->next; - } - } - terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; - return -1; -} - -// TODO: clean deleted idx and data from persistent file -int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; } diff --git a/source/dnode/vnode/test/tqMetaTest.cpp b/source/dnode/vnode/test/tqMetaTest.cpp deleted file mode 100644 index 168daeb19f..0000000000 --- a/source/dnode/vnode/test/tqMetaTest.cpp +++ /dev/null @@ -1,279 +0,0 @@ -#include -#include -#include -#include - -#include "tqMetaStore.h" - -struct Foo { - int32_t a; -}; - -int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { - Foo* foo = (Foo*)pObj; - if ((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { - *ppHead = (STqSerializedHead*)taosMemoryRealloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t)); - (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t); - } - *(int32_t*)(*ppHead)->content = foo->a; - return (*ppHead)->ssize; -} - -const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) { - if (*ppObj == NULL) { - *ppObj = taosMemoryRealloc(*ppObj, sizeof(int32_t)); - } - Foo* pFoo = *(Foo**)ppObj; - pFoo->a = *(int32_t*)pHead->content; - return NULL; -} - -void FooDeleter(void* pObj) { taosMemoryFree(pObj); } - -class TqMetaUpdateAppendTest : public ::testing::Test { - protected: - void SetUp() override { - taosRemoveDir(pathName); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - } - - void TearDown() override { tqStoreClose(pMeta); } - - STqMetaStore* pMeta; - const char* pathName = TD_TMP_DIR_PATH "tq_test"; -}; - -TEST_F(TqMetaUpdateAppendTest, copyPutTest) { - Foo foo; - foo.a = 3; - tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); - - Foo* pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); - - tqHandleCommit(pMeta, 1); - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo->a, 3); -} - -TEST_F(TqMetaUpdateAppendTest, persistTest) { - Foo* pFoo = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pFoo->a = 2; - tqHandleMovePut(pMeta, 1, pFoo); - Foo* pBar = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pBar == NULL, true); - tqHandleCommit(pMeta, 1); - pBar = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pBar->a, pFoo->a); - pBar = (Foo*)tqHandleGet(pMeta, 2); - EXPECT_EQ(pBar == NULL, true); - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - pBar = (Foo*)tqHandleGet(pMeta, 1); - ASSERT_EQ(pBar != NULL, true); - EXPECT_EQ(pBar->a, 2); - - pBar = (Foo*)tqHandleGet(pMeta, 2); - EXPECT_EQ(pBar == NULL, true); -} - -TEST_F(TqMetaUpdateAppendTest, uncommittedTest) { - Foo* pFoo = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pFoo->a = 3; - tqHandleMovePut(pMeta, 1, pFoo); - - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); -} - -TEST_F(TqMetaUpdateAppendTest, abortTest) { - Foo* pFoo = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pFoo->a = 3; - tqHandleMovePut(pMeta, 1, pFoo); - - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); - - tqHandleAbort(pMeta, 1); - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); -} - -TEST_F(TqMetaUpdateAppendTest, deleteTest) { - Foo* pFoo = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pFoo->a = 3; - tqHandleMovePut(pMeta, 1, pFoo); - - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); - - tqHandleCommit(pMeta, 1); - - pFoo = (Foo*)tqHandleGet(pMeta, 1); - ASSERT_EQ(pFoo != NULL, true); - EXPECT_EQ(pFoo->a, 3); - - tqHandleDel(pMeta, 1); - pFoo = (Foo*)tqHandleGet(pMeta, 1); - ASSERT_EQ(pFoo != NULL, true); - EXPECT_EQ(pFoo->a, 3); - - tqHandleCommit(pMeta, 1); - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - pFoo = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo == NULL, true); -} - -TEST_F(TqMetaUpdateAppendTest, intxnPersist) { - Foo* pFoo = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pFoo->a = 3; - tqHandleMovePut(pMeta, 1, pFoo); - tqHandleCommit(pMeta, 1); - - Foo* pBar = (Foo*)taosMemoryMalloc(sizeof(Foo)); - pBar->a = 4; - tqHandleMovePut(pMeta, 1, pBar); - - Foo* pFoo1 = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo1->a, 3); - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - pFoo1 = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo1->a, 3); - - tqHandleCommit(pMeta, 1); - - pFoo1 = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo1->a, 4); - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - pFoo1 = (Foo*)tqHandleGet(pMeta, 1); - EXPECT_EQ(pFoo1->a, 4); -} - -TEST_F(TqMetaUpdateAppendTest, multiplePage) { - taosSeedRand(0); - std::vector v; - for (int i = 0; i < 1000; i++) { - v.push_back(taosRand()); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - } - for (int i = 0; i < 500; i++) { - tqHandleCommit(pMeta, i); - Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); - ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; - EXPECT_EQ(pFoo->a, v[i]); - } - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - for (int i = 500; i < 1000; i++) { - tqHandleCommit(pMeta, i); - Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); - ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; - EXPECT_EQ(pFoo->a, v[i]); - } - - for (int i = 0; i < 1000; i++) { - Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); - ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; - EXPECT_EQ(pFoo->a, v[i]); - } -} - -TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { - taosSeedRand(0); - std::vector v; - for (int i = 0; i < 1000; i++) { - v.push_back(taosRand()); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - } - - for (int i = 0; i < 500; i++) { - tqHandleCommit(pMeta, i); - v[i] = taosRand(); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - } - - for (int i = 500; i < 1000; i++) { - v[i] = taosRand(); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - } - - for (int i = 0; i < 1000; i++) { - tqHandleCommit(pMeta, i); - } - - tqStoreClose(pMeta); - pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND); - ASSERT(pMeta); - - for (int i = 500; i < 1000; i++) { - v[i] = taosRand(); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - tqHandleCommit(pMeta, i); - } - - for (int i = 0; i < 1000; i++) { - Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); - ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; - EXPECT_EQ(pFoo->a, v[i]); - } -} - -TEST_F(TqMetaUpdateAppendTest, dupCommit) { - taosSeedRand(0); - std::vector v; - for (int i = 0; i < 1000; i++) { - v.push_back(taosRand()); - Foo foo; - foo.a = v[i]; - tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); - } - - for (int i = 0; i < 1000; i++) { - int ret = tqHandleCommit(pMeta, i); - EXPECT_EQ(ret, 0); - ret = tqHandleCommit(pMeta, i); - EXPECT_EQ(ret, -1); - } - - for (int i = 0; i < 1000; i++) { - int ret = tqHandleCommit(pMeta, i); - EXPECT_EQ(ret, -1); - } - - for (int i = 0; i < 1000; i++) { - Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); - ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; - EXPECT_EQ(pFoo->a, v[i]); - } -}