diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 829d3a0ca0..534503ad56 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -158,6 +158,7 @@ int32_t tPutDelIdx(uint8_t *p, SDelIdx *pDelIdx); int32_t tGetDelIdx(uint8_t *p, SDelIdx *pDelIdx); // SDelData +int32_t tDelDataClear(SDelData *pDelData); int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem); int32_t tDelDataGetItemByIdx(SDelData *pDelData, SDelDataItem *pItem, int32_t idx); int32_t tDelDataGetItem(SDelData *pDelData, SDelDataItem *pItem, int64_t version); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 60bd9054d4..90546469e1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -37,13 +37,14 @@ struct SCommitter { SArray *aOBlockIdx; SArray *aBlockIdx; // commit table data - STbData *pTbData; SBlockIdx *pBlockIdx; /* commit del */ SDelFReader *pDelFReader; SDelFWriter *pDelFWriter; SDelIdx delIdxOld; SDelIdx delIdxNew; + STbData *pTbData; + SDelIdxItem *pDelIdxItem; SDelData delDataOld; SDelData delDataNew; SDelIdxItem delIdxItem; @@ -207,30 +208,59 @@ _err: static int32_t tsdbCommitTableDel(SCommitter *pCommitter); static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); - int32_t iDelIdx = 0; - int32_t nDelIdx = 0; // TODO - STbData *pTbData = NULL; - SDelIdx *pDelIdx = NULL; - SDelIdx delIdx; + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + int32_t c; + int32_t iTbData = 0; + int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); + int32_t iDelIdxItem = 0; + int32_t nDelIdxItem = pCommitter->delIdxOld.offset.nOffset; + STbData *pTbData = NULL; + SDelIdxItem *pDelIdxItem = NULL; + SDelIdxItem item; - // if (iTbData < nTbData) { - // pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - // } - // if (iDelIdx < nDelIdx) { - // // tIMapGet(); - // pDelIdx = &delIdx; - // } - - while (iTbData < nTbData || iDelIdx < nDelIdx) { - if (pTbData && pDelIdx) { - } else { + while (iTbData < nTbData || iDelIdxItem < nDelIdxItem) { + pTbData = NULL; + pDelIdxItem = NULL; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } + if (iDelIdxItem < nDelIdxItem) { + tDelIdxGetItemByIdx(&pCommitter->delIdxOld, &item, iDelIdxItem); + pDelIdxItem = &item; } + if (pTbData && pDelIdxItem) { + c = tTABLEIDCmprFn(pTbData, pDelIdxItem); + if (c == 0) { + iTbData++; + iDelIdxItem++; + } else if (c < 0) { + iTbData++; + pDelIdxItem = NULL; + } else { + iDelIdxItem++; + pTbData = NULL; + } + } else { + if (pTbData) { + iTbData++; + } + if (pDelIdxItem) { + iDelIdxItem++; + } + } + + if (pTbData && pTbData->pHead == NULL) { + pTbData = NULL; + } + + if (pTbData == NULL && pDelIdxItem == NULL) continue; + + // do merge + pCommitter->pTbData = pTbData; + pCommitter->pDelIdxItem = pDelIdxItem; code = tsdbCommitTableDel(pCommitter); if (code) goto _err; } @@ -542,17 +572,41 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) { } static int32_t tsdbCommitTableDelStart(SCommitter *pCommitter) { - int32_t code = 0; + int32_t code = 0; + tb_uid_t suid; + tb_uid_t uid; + + if (pCommitter->pTbData) { + suid = pCommitter->pTbData->suid; + uid = pCommitter->pTbData->uid; + } // load old pCommitter->delDataOld = (SDelData){0}; - if (0) { - code = tsdbReadDelData(pCommitter->pDelFReader, NULL /*TODO*/, &pCommitter->delDataOld, &pCommitter->pBuf4); + if (pCommitter->pDelIdxItem) { + suid = pCommitter->pDelIdxItem->suid; + uid = pCommitter->pDelIdxItem->uid; + code = + tsdbReadDelData(pCommitter->pDelFReader, pCommitter->pDelIdxItem, &pCommitter->delDataOld, &pCommitter->pBuf5); if (code) goto _err; } // prepare new - pCommitter->delDataNew = (SDelData){0}; + pCommitter->delDataNew.suid = suid; + pCommitter->delDataNew.uid = uid; + pCommitter->delDataNew.offset.flag = 0; + pCommitter->delDataNew.offset.nOffset = 0; + pCommitter->delDataNew.nData = 0; + pCommitter->delIdxItem = (SDelIdxItem){ + .suid = suid, + .uid = uid, + .minKey = TSKEY_MAX, + .maxKey = TSKEY_MIN, + .minVersion = INT64_MAX, + .maxVersion = INT64_MIN, + .offset = -1, + .size = -1, + }; return code; @@ -562,14 +616,40 @@ _err: } static int32_t tsdbCommitTableDelImpl(SCommitter *pCommitter) { - int32_t code = 0; - SDelOp *pDelOp = NULL; + int32_t code = 0; + SDelDataItem item; - for (; pDelOp; pDelOp = pDelOp->pNext) { - SDelDataItem item = {.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey}; + // old + if (pCommitter->pDelIdxItem) { + for (int32_t iDelIdxItem = 0; iDelIdxItem < pCommitter->delDataOld.offset.nOffset; iDelIdxItem++) { + code = tDelDataGetItemByIdx(&pCommitter->delDataOld, &item, iDelIdxItem); + if (code) goto _err; - code = tDelDataPutItem(&pCommitter->delDataNew, &item); - if (code) goto _err; + code = tDelDataPutItem(&pCommitter->delDataNew, &item); + if (code) goto _err; + + // update index + if (item.version < pCommitter->delIdxItem.minVersion) pCommitter->delIdxItem.minVersion = item.version; + if (item.version > pCommitter->delIdxItem.maxVersion) pCommitter->delIdxItem.maxVersion = item.version; + if (item.sKey < pCommitter->delIdxItem.minKey) pCommitter->delIdxItem.minKey = item.sKey; + if (item.eKey > pCommitter->delIdxItem.maxKey) pCommitter->delIdxItem.maxKey = item.eKey; + } + } + + // new + if (pCommitter->pTbData) { + for (SDelOp *pDelOp = pCommitter->pTbData->pHead; pDelOp; pDelOp = pDelOp->pNext) { + item = (SDelDataItem){.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey}; + + code = tDelDataPutItem(&pCommitter->delDataNew, &item); + if (code) goto _err; + + // update index + if (item.version < pCommitter->delIdxItem.minVersion) pCommitter->delIdxItem.minVersion = item.version; + if (item.version > pCommitter->delIdxItem.maxVersion) pCommitter->delIdxItem.maxVersion = item.version; + if (item.sKey < pCommitter->delIdxItem.minKey) pCommitter->delIdxItem.minKey = item.sKey; + if (item.eKey > pCommitter->delIdxItem.maxKey) pCommitter->delIdxItem.maxKey = item.eKey; + } } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 2f8290987b..22dd72d92b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -147,7 +147,6 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SDelData *pDelData, uint8_t **ppB // prepare pDelData->delimiter = TSDB_FILE_DLMT; - // todo // alloc if (!ppBuf) ppBuf = &pBuf; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index a322e786a0..fe7700a283 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -350,6 +350,13 @@ static FORCE_INLINE int32_t tGetDelDataItem(uint8_t *p, SDelDataItem *pItem) { } // SDelData ====================================================== +int32_t tDelDataClear(SDelData *pDelData) { + int32_t code = 0; + tsdbFree(pDelData->offset.pOffset); + tsdbFree(pDelData->pData); + return code; +} + int32_t tDelDataPutItem(SDelData *pDelData, SDelDataItem *pItem) { int32_t code = 0; uint32_t offset = pDelData->nData;