fix: skiplist concurrent access
This commit is contained in:
parent
5660199016
commit
1567fe2f67
|
@ -772,8 +772,8 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||||
|
|
||||||
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
|
||||||
if (pIter == NULL) return NULL;
|
if (pIter == NULL) return NULL;
|
||||||
|
|
|
@ -22,6 +22,10 @@
|
||||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
|
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
|
||||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||||
|
#define SL_GET_NODE_FORWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_FORWARD(n, l)))
|
||||||
|
#define SL_GET_NODE_BACKWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_BACKWARD(n, l)))
|
||||||
|
#define SL_SET_NODE_FORWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_FORWARD(n, l), (int64_t)(p))
|
||||||
|
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_BACKWARD(n, l), (int64_t)(p))
|
||||||
|
|
||||||
#define SL_MOVE_BACKWARD 0x1
|
#define SL_MOVE_BACKWARD 0x1
|
||||||
#define SL_MOVE_FROM_POS 0x2
|
#define SL_MOVE_FROM_POS 0x2
|
||||||
|
@ -246,18 +250,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
|
||||||
if (pFrom == NULL) {
|
if (pFrom == NULL) {
|
||||||
// create from head or tail
|
// create from head or tail
|
||||||
if (backward) {
|
if (backward) {
|
||||||
pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0);
|
pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
|
||||||
} else {
|
} else {
|
||||||
pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0);
|
pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// create from a key
|
// create from a key
|
||||||
if (backward) {
|
if (backward) {
|
||||||
tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
|
tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
|
||||||
pIter->pNode = SL_NODE_BACKWARD(pos[0], 0);
|
pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
|
||||||
} else {
|
} else {
|
||||||
tbDataMovePosTo(pTbData, pos, pFrom, 0);
|
tbDataMovePosTo(pTbData, pos, pFrom, 0);
|
||||||
pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
|
pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,7 +275,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
|
pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
if (pIter->pNode == pIter->pTbData->sl.pHead) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -282,7 +286,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
|
pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
|
||||||
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
if (pIter->pNode == pIter->pTbData->sl.pTail) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -335,7 +339,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
||||||
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
|
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
|
||||||
|
|
||||||
ASSERT(pPool != NULL);
|
ASSERT(pPool != NULL);
|
||||||
pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
|
pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
|
||||||
if (pTbData == NULL) {
|
if (pTbData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -408,7 +412,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
if (fromPos) px = pos[pTbData->sl.level - 1];
|
if (fromPos) px = pos[pTbData->sl.level - 1];
|
||||||
|
|
||||||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||||
pn = SL_NODE_BACKWARD(px, iLevel);
|
pn = SL_GET_NODE_BACKWARD(px, iLevel);
|
||||||
while (pn != pTbData->sl.pHead) {
|
while (pn != pTbData->sl.pHead) {
|
||||||
tKey.version = pn->version;
|
tKey.version = pn->version;
|
||||||
tKey.ts = pn->pTSRow->ts;
|
tKey.ts = pn->pTSRow->ts;
|
||||||
|
@ -418,7 +422,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
px = pn;
|
px = pn;
|
||||||
pn = SL_NODE_BACKWARD(px, iLevel);
|
pn = SL_GET_NODE_BACKWARD(px, iLevel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,7 +442,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
if (fromPos) px = pos[pTbData->sl.level - 1];
|
if (fromPos) px = pos[pTbData->sl.level - 1];
|
||||||
|
|
||||||
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
|
||||||
pn = SL_NODE_FORWARD(px, iLevel);
|
pn = SL_GET_NODE_FORWARD(px, iLevel);
|
||||||
while (pn != pTbData->sl.pTail) {
|
while (pn != pTbData->sl.pTail) {
|
||||||
tKey.version = pn->version;
|
tKey.version = pn->version;
|
||||||
tKey.ts = pn->pTSRow->ts;
|
tKey.ts = pn->pTSRow->ts;
|
||||||
|
@ -448,7 +452,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
px = pn;
|
px = pn;
|
||||||
pn = SL_NODE_FORWARD(px, iLevel);
|
pn = SL_GET_NODE_FORWARD(px, iLevel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,59 +478,54 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
|
||||||
int8_t level;
|
int8_t level;
|
||||||
SMemSkipListNode *pNode;
|
SMemSkipListNode *pNode;
|
||||||
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
|
||||||
|
int64_t nSize;
|
||||||
|
|
||||||
// node
|
// create node
|
||||||
level = tsdbMemSkipListRandLevel(&pTbData->sl);
|
level = tsdbMemSkipListRandLevel(&pTbData->sl);
|
||||||
ASSERT(pPool != NULL);
|
nSize = SL_NODE_SIZE(level);
|
||||||
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
|
pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->len);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
pNode->level = level;
|
pNode->level = level;
|
||||||
pNode->version = version;
|
pNode->version = version;
|
||||||
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
|
pNode->pTSRow = (STSRow *)((char *)pNode + nSize);
|
||||||
if (NULL == pNode->pTSRow) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
memcpy(pNode->pTSRow, pRow, pRow->len);
|
memcpy(pNode->pTSRow, pRow, pRow->len);
|
||||||
|
|
||||||
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
|
// set node
|
||||||
SMemSkipListNode *pn = pos[iLevel];
|
|
||||||
SMemSkipListNode *px;
|
|
||||||
|
|
||||||
if (forward) {
|
if (forward) {
|
||||||
px = SL_NODE_FORWARD(pn, iLevel);
|
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
|
||||||
|
SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
|
||||||
SL_NODE_BACKWARD(pNode, iLevel) = pn;
|
SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
|
||||||
SL_NODE_FORWARD(pNode, iLevel) = px;
|
}
|
||||||
} else {
|
} else {
|
||||||
px = SL_NODE_BACKWARD(pn, iLevel);
|
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
|
||||||
|
SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
|
||||||
SL_NODE_BACKWARD(pNode, iLevel) = px;
|
SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
|
||||||
SL_NODE_FORWARD(pNode, iLevel) = pn;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
|
// set forward and backward
|
||||||
SMemSkipListNode *pn = pos[iLevel];
|
|
||||||
SMemSkipListNode *px;
|
|
||||||
|
|
||||||
if (forward) {
|
if (forward) {
|
||||||
px = SL_NODE_FORWARD(pn, iLevel);
|
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
|
||||||
|
SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
|
||||||
|
|
||||||
SL_NODE_FORWARD(pn, iLevel) = pNode;
|
SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
|
||||||
SL_NODE_BACKWARD(px, iLevel) = pNode;
|
SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
|
||||||
} else {
|
|
||||||
px = SL_NODE_BACKWARD(pn, iLevel);
|
|
||||||
|
|
||||||
SL_NODE_FORWARD(px, iLevel) = pNode;
|
|
||||||
SL_NODE_BACKWARD(pn, iLevel) = pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
pos[iLevel] = pNode;
|
pos[iLevel] = pNode;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
|
||||||
|
SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
|
||||||
|
|
||||||
|
SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
|
||||||
|
SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
|
||||||
|
|
||||||
|
pos[iLevel] = pNode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pTbData->sl.size++;
|
pTbData->sl.size++;
|
||||||
if (pTbData->sl.level < pNode->level) {
|
if (pTbData->sl.level < pNode->level) {
|
||||||
|
|
Loading…
Reference in New Issue