diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index acf83e4a9c..08cf652d20 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -50,9 +50,8 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); /* SMemDataIter */ -void tsdbMemDataIterOpen(SMemDataIter *pIter, TSDBKEY *pKey, int8_t backward); -void tsdbMemDataIterClose(SMemDataIter *pIter); -void tsdbMemDataIterNext(SMemDataIter *pIter); +void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter); +bool tsdbMemDataIterNext(SMemDataIter *pIter); void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow); // tsdbCommit2.c ============================================================================================== @@ -928,8 +927,11 @@ struct SMemData { }; struct SMemDataIter { - SMemData *pMemData; - TSDBROW row; + SMemData *pMemData; + int8_t backward; + TSDBROW *pRow; + SMemSkipListNode *pNode; // current node + TSDBROW row; }; #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 38e4795a83..afd5cd72ee 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -137,7 +137,7 @@ static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) { // check if there are data in the time range for (; iMemData < nMemData; iMemData++) { SMemData *pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); - tsdbMemDataIterOpen(&iter, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0); + tsdbMemDataIterOpen(pMemData, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0, &iter); tsdbMemDataIterGet(&iter, &pRow); if (pRow->tsRow.ts >= fidSKey && pRow->tsRow.ts <= fidEKey) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index dee025cae6..e581e9fe3d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -42,6 +42,7 @@ static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version, SVSubmitBlk *pSubmitBlk); +static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); // SMemTable ============================================== int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) { @@ -160,6 +161,92 @@ _err: return code; } +void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter) { + SMemSkipListNode *pos[SL_MAX_LEVEL]; + + pIter->pMemData = pMemData; + pIter->backward = backward; + pIter->pRow = NULL; + if (pKey == NULL) { + // create from head or tail + if (backward) { + pIter->pNode = SL_NODE_BACKWARD(pMemData->sl.pTail, 0); + } else { + pIter->pNode = SL_NODE_FORWARD(pMemData->sl.pHead, 0); + } + } else { + // create from a key + if (backward) { + memDataMovePosTo(pMemData, pos, pKey, SL_MOVE_BACKWARD); + pIter->pNode = SL_NODE_BACKWARD(pos[0], 0); + } else { + memDataMovePosTo(pMemData, pos, pKey, 0); + pIter->pNode = SL_NODE_FORWARD(pos[0], 0); + } + } +} + +bool tsdbMemDataIterNext(SMemDataIter *pIter) { + SMemSkipListNode *pHead = pIter->pMemData->sl.pHead; + SMemSkipListNode *pTail = pIter->pMemData->sl.pTail; + + pIter->pRow = NULL; + if (pIter->backward) { + ASSERT(pIter->pNode != pTail); + + if (pIter->pNode == pHead) { + return false; + } + + pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); + if (pIter->pNode == pHead) { + return false; + } + } else { + ASSERT(pIter->pNode != pHead); + + if (pIter->pNode == pTail) { + return false; + } + + pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); + if (pIter->pNode == pTail) { + return false; + } + } + + return true; +} + +void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) { + if (pIter->pRow) { + *ppRow = pIter->pRow; + } else { + SMemSkipListNode *pHead = pIter->pMemData->sl.pHead; + SMemSkipListNode *pTail = pIter->pMemData->sl.pTail; + + if (pIter->backward) { + ASSERT(pIter->pNode != pTail); + + if (pIter->pNode == pHead) { + *ppRow = NULL; + } else { + tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); + *ppRow = &pIter->row; + } + } else { + ASSERT(pIter->pNode != pHead); + + if (pIter->pNode == pTail) { + *ppRow = NULL; + } else { + tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); + *ppRow = &pIter->row; + } + } + } +} + static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) { int32_t code = 0; int32_t idx = 0;