more work
This commit is contained in:
parent
e6e629ae84
commit
bb0f20f6e6
|
@ -59,7 +59,7 @@ void tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t
|
|||
// STbDataIter
|
||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
||||
bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
|
||||
|
||||
|
@ -172,6 +172,7 @@ int32_t tPutBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx);
|
|||
int32_t tGetBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx);
|
||||
|
||||
// SBlock
|
||||
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
||||
|
||||
// SDelIdx
|
||||
int32_t tDelIdxClear(SDelIdx *pDelIdx);
|
||||
|
|
|
@ -39,6 +39,8 @@ struct SCommitter {
|
|||
SMapData oBlockIdx;
|
||||
SMapData nBlockIdx;
|
||||
// commit table data
|
||||
STbDataIter iter;
|
||||
STbDataIter *pIter;
|
||||
SBlockIdx *pBlockIdx;
|
||||
SMapData oBlock;
|
||||
SMapData nBlock;
|
||||
|
@ -503,7 +505,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pTbData && pTbData->sl.size == 0) {
|
||||
if (pTbData && !tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}), 0,
|
||||
&pCommitter->iter) {
|
||||
pTbData = NULL;
|
||||
}
|
||||
|
||||
|
@ -613,26 +616,23 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
// merge loop
|
||||
for (;;) {
|
||||
tsdbTbDataIterGet(pIter, pRow);
|
||||
|
||||
code = tsdbColDataBlockAppend(&pCommitter->nColDataBlock, pRow, pTSchema);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pCommitter->nColDataBlock.nRow >= pCommitter->maxRow) {
|
||||
code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
tsdbColDataBlockReset(&pCommitter->nColDataBlock);
|
||||
}
|
||||
|
||||
if (!tsdbTbDataIterNext(pIter)) break;
|
||||
if (iBlock < nBlock) {
|
||||
pBlock = █
|
||||
} else {
|
||||
pBlock = NULL;
|
||||
}
|
||||
|
||||
if (pCommitter->nColDataBlock.nRow) {
|
||||
code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL);
|
||||
if (code) goto _err;
|
||||
tsdbTbDataIterGet(pIter, pRow);
|
||||
|
||||
// loop to merge memory data and disk data
|
||||
for (; pBlock == NULL || (pRow && pRow->pTSRow->ts <= pCommitter->maxKey);) {
|
||||
if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) {
|
||||
// only has block data, then move to new index file
|
||||
} else if (0) {
|
||||
// only commit memory data
|
||||
} else {
|
||||
// merge memory and block data
|
||||
}
|
||||
}
|
||||
|
||||
tsdbTbDataIterDestroy(pIter);
|
||||
|
|
|
@ -211,9 +211,15 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
|
||||
bool tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter) {
|
||||
SMemSkipListNode *pos[SL_MAX_LEVEL];
|
||||
SMemSkipListNode *pHead;
|
||||
SMemSkipListNode *pTail;
|
||||
|
||||
if (pTbData == NULL) return false;
|
||||
|
||||
pHead = pTbData->sl.pHead;
|
||||
pTail = pTbData->sl.pTail;
|
||||
pIter->pTbData = pTbData;
|
||||
pIter->backward = backward;
|
||||
if (pFrom == NULL) {
|
||||
|
@ -233,6 +239,12 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
|
|||
pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
|
||||
}
|
||||
}
|
||||
|
||||
if ((backward && pIter->pNode == pHead) || (!backward && pIter->pNode == pTail)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||
|
|
|
@ -484,6 +484,21 @@ int32_t tGetBlockIdx(uint8_t *p, SBlockIdx *pBlockIdx) {
|
|||
return n;
|
||||
}
|
||||
|
||||
// SBlock ======================================================
|
||||
int32_t tBlockCmprFn(const void *p1, const void *p2) {
|
||||
int32_t c;
|
||||
SBlock *pBlock1 = (SBlock *)p1;
|
||||
SBlock *pBlock2 = (SBlock *)p2;
|
||||
|
||||
if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
|
||||
return -1;
|
||||
} else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
|
||||
return 1
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// SDelIdx ======================================================
|
||||
int32_t tDelIdxClear(SDelIdx *pDelIdx) {
|
||||
int32_t code = 0;
|
||||
|
|
Loading…
Reference in New Issue