From 5ea611f2c8284c22f5895cdb3ae1cc69a53a505d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 22 Sep 2020 13:44:22 +0800 Subject: [PATCH] TD-1194 --- src/tsdb/src/tsdbMemTable.c | 7 ------ src/util/inc/tskiplist.h | 2 ++ src/util/src/tskiplist.c | 43 ++++++++++++++++++++++++++++--------- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 9c76a09ca3..e139b29d5e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -40,13 +40,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { TSKEY key = dataRowKey(row); SMemTable * pMemTable = pRepo->mem; STableData *pTableData = NULL; - // SSkipList * pSList = NULL; - - // if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && - // pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { - // pTableData = pMemTable->tData[TABLE_TID(pTable)]; - // pSList = pTableData->pData; - // } void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); if (pRow == NULL) { diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 1eccac6646..030a9d69c1 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -145,6 +145,8 @@ SSkipListNode * tSkipListIterGet(SSkipListIterator *iter); void * tSkipListDestroyIter(SSkipListIterator *iter); uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key); void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode); +SSkipListKey tSkipListGetMinKey(SSkipList *pSkipList); +SSkipListKey tSkipListGetMaxKey(SSkipList *pSkipList); #ifdef __cplusplus } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 3da137140f..603a9759b8 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -19,8 +19,6 @@ #include "tulog.h" #include "tutil.h" -#define DO_MEMSET_PTR_AREA(n) memset((n)->forwards, 0, ((n)->level * 2)) - static int initForwardBackwardPtr(SSkipList *pSkipList); static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); @@ -117,7 +115,10 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (dupMode == SL_UPDATE_DUP_KEY) { pNode = SL_NODE_GET_FORWARD_POINTER(forward[0], 0); atomic_store_ptr(&(pNode->pData), pData); - pNode->flags &= (~(SL_NODE_DELETED_FLAG)); + if (SL_IS_NODE_DELETED(pNode)) { + pNode->flags &= (~(SL_NODE_DELETED_FLAG)); + pSkipList->size++; + } } } else { pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); @@ -136,6 +137,8 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { uint32_t count = 0; + if (SL_DUP_MODE(pSkipList) == SL_DISCARD_DUP_KEY) return 0; + tSkipListWLock(pSkipList); SSkipListNode *pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); @@ -199,8 +202,8 @@ SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { } SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order) { - assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - assert(pSkipList != NULL); + ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); + ASSERT(pSkipList != NULL); SSkipListIterator *iter = doCreateSkipListIterator(pSkipList, order); if (val == NULL) { @@ -274,7 +277,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { while (p != pSkipList->pTail) { char *key = SL_GET_NODE_KEY(pSkipList, p); if (prev != NULL) { - assert(pSkipList->comparFn(prev, key) < 0); + ASSERT(pSkipList->comparFn(prev, key) < 0); } switch (pSkipList->type) { @@ -302,9 +305,29 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } } -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { - DO_MEMSET_PTR_AREA(pNode); +SSkipListKey tSkipListGetMinKey(SSkipList *pSkipList) { + if (pSkipList == NULL || SL_SIZE(pSkipList) == 0) return NULL; + SSkipListNode *pNode = pSkipList->pHead; + while ((pNode = SL_NODE_GET_FORWARD_POINTER(pNode, 0)) != pSkipList->pTail) { + if (!SL_IS_NODE_DELETED(pNode)) return pSkipList->keyFn(pNode->pData); + } + + return NULL; +} + +SSkipListKey tSkipListGetMaxKey(SSkipList *pSkipList) { + if (pSkipList == NULL || SL_SIZE(pSkipList) == 0) return NULL; + + SSkipListNode *pNode = pSkipList->pTail; + while ((pNode = SL_NODE_GET_BACKWARD_POINTER(pNode, 0)) != pSkipList->pHead) { + if (!SL_IS_NODE_DELETED(pNode)) return pSkipList->keyFn(pNode->pData); + } + + return NULL; +} + +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSkipListNode *pNode) { for (int32_t i = 0; i < pNode->level; ++i) { if (i >= pSkipList->level) { SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; @@ -365,7 +388,7 @@ static FORCE_INLINE int tSkipListUnlock(SSkipList *pSkipList) { } static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward, void *pData) { - int compare = 1; + int compare = 0; bool hasDupKey = false; char * pDataKey = pSkipList->keyFn(pData); @@ -497,7 +520,7 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { } } - assert(level <= pSkipList->maxLevel); + ASSERT(level <= pSkipList->maxLevel); return level; }