From 83edf4d611f5fde41acc92995260c02d897862cd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 14 Jun 2022 09:55:04 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 318 ++++++++++------------- 1 file changed, 144 insertions(+), 174 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 19f65b2660..00eff4f84f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -181,21 +181,15 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { // load old if (pDelFileR) { code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL); - if (code) { - goto _err; - } + if (code) goto _err; - code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, &pCommitter->pBuf1); - if (code) { - goto _err; - } + code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL); + if (code) goto _err; } // prepare new code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb); - if (code) { - goto _err; - } + if (code) goto _err; _exit: tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode)); @@ -206,64 +200,89 @@ _err: return code; } -static int32_t tsdbCommitTableDel(SCommitter *pCommitter); - static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - int32_t c; - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); - int32_t iDelIdxItem = 0; - int32_t nDelIdxItem = pCommitter->delIdxOld.offset.nOffset; - STbData *pTbData = NULL; - SDelIdxItem *pDelIdxItem = NULL; - SDelIdxItem item; + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SMemTable *pMemTable = pTsdb->imem; + int32_t iDelIdx = 0; + int32_t nDelIdx = pCommitter->oDelIdxMap.nItem; + int32_t iTbData = 0; + int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); + STbData *pTbData; + SDelIdx *pDelIdx; + SDelIdx delIdx; + int32_t c; - while (iTbData < nTbData || iDelIdxItem < nDelIdxItem) { - pTbData = NULL; - pDelIdxItem = NULL; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } - if (iDelIdxItem < nDelIdxItem) { - tDelIdxGetItemByIdx(&pCommitter->delIdxOld, &item, iDelIdxItem); - pDelIdxItem = &item; - } + ASSERT(nTbData > 0); - if (pTbData && pDelIdxItem) { - c = tTABLEIDCmprFn(pTbData, pDelIdxItem); + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + if (iDelIdx < nDelIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); + if (code) goto _err; + pDelIdx = &delIdx; + } else { + pDelIdx = NULL; + } + + while (true) { + if (pTbData == NULL && pDelIdx == NULL) break; + + if (pTbData && pDelIdx) { + c = tTABLEIDCmprFn(pTbData, pDelIdx); if (c == 0) { - iTbData++; - iDelIdxItem++; + goto _commit_mem_and_disk_del; } else if (c < 0) { - iTbData++; - pDelIdxItem = NULL; + goto _commit_mem_del; } else { - iDelIdxItem++; - pTbData = NULL; + goto _commit_disk_del; } } else { - if (pTbData) { - iTbData++; - } - if (pDelIdxItem) { - iDelIdxItem++; - } + if (pTbData) goto _commit_mem_del; + if (pDelIdx) goto _commit_disk_del; } - if (pTbData && pTbData->pHead == NULL) { + _commit_mem_del: + code = tsdbCommitTableDel(pCommitter, pTbData, NULL); + if (code) goto _err; + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { pTbData = NULL; } + continue; - if (pTbData == NULL && pDelIdxItem == NULL) continue; - - // do merge - pCommitter->pTbData = pTbData; - pCommitter->pDelIdxItem = pDelIdxItem; - code = tsdbCommitTableDel(pCommitter); + _commit_disk_del: + code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); if (code) goto _err; + iDelIdx++; + if (iDelIdx < nDelIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); + if (code) goto _err; + pDelIdx = &delIdx; + } else { + pDelIdx = NULL; + } + continue; + + _commit_mem_and_disk_del: + code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); + if (code) goto _err; + iTbData++; + iDelIdx++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + if (iDelIdx < nDelIdx) { + code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx); + if (code) goto _err; + pDelIdx = &delIdx; + } else { + pDelIdx = NULL; + } + continue; } return code; @@ -276,29 +295,20 @@ _err: static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { int32_t code = 0; - code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->delIdxNew, NULL); - if (code) { - goto _err; - } + code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL); + if (code) goto _err; code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL); - if (code) { - goto _err; - } + if (code) goto _err; code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1); - if (code) { - goto _err; - } + if (code) goto _err; if (pCommitter->pDelFReader) { code = tsdbDelFReaderClose(pCommitter->pDelFReader); if (code) goto _err; } - tDelDataClear(&pCommitter->delDataNew); - tDelIdxClear(&pCommitter->delIdxNew); - return code; _err: @@ -609,129 +619,89 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) { return code; } -static int32_t tsdbCommitTableDelStart(SCommitter *pCommitter) { - int32_t code = 0; - tb_uid_t suid; - tb_uid_t uid; +static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { + int32_t code = 0; + SDelData delData; + SDelOp *pDelOp; + tb_uid_t suid; + tb_uid_t uid; + SDelIdx delIdx; // TODO + SDelDataInfo info; // TODO - if (pCommitter->pTbData) { - suid = pCommitter->pTbData->suid; - uid = pCommitter->pTbData->uid; + // check no del data, just return + if (pTbData && pTbData->pHead == NULL) { + pTbData = NULL; } + if (pTbData == NULL && pDelIdx == NULL) goto _exit; - // load old - pCommitter->delDataOld = (SDelData){0}; - if (pCommitter->pDelIdxItem) { - suid = pCommitter->pDelIdxItem->suid; - uid = pCommitter->pDelIdxItem->uid; - code = - tsdbReadDelData(pCommitter->pDelFReader, pCommitter->pDelIdxItem, &pCommitter->delDataOld, &pCommitter->pBuf5); + // prepare + if (pTbData) { + info.suid = pTbData->suid; + info.uid = pTbData->uid; + } else { + info.suid = pDelIdx->suid; + info.uid = pDelIdx->uid; + } + delIdx.suid = info.suid; + delIdx.uid = info.uid; + delIdx.minKey = TSKEY_MAX; + delIdx.maxKey = TSKEY_MIN; + delIdx.minVersion = INT64_MAX; + delIdx.maxVersion = -1; + + // start + tMapDataReset(&pCommitter->oDelDataMap); + tMapDataReset(&pCommitter->nDelDataMap); + + if (pDelIdx) { + code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL); if (code) goto _err; } - // prepare new - pCommitter->delDataNew.suid = suid; - pCommitter->delDataNew.uid = uid; - pCommitter->delDataNew.offset.flag = 0; - pCommitter->delDataNew.offset.nOffset = 0; - pCommitter->delDataNew.nData = 0; - pCommitter->delIdxItem = (SDelIdxItem){ - .suid = suid, - .uid = uid, - .minKey = TSKEY_MAX, - .maxKey = TSKEY_MIN, - .minVersion = INT64_MAX, - .maxVersion = INT64_MIN, - .offset = -1, - .size = -1, - }; + // disk + for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) { + code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, &delData, tGetDelData); + if (code) goto _err; - return code; + code = tMapDataPutItem(&pCommitter->nDelDataMap, &delData, tPutDelData); + if (code) goto _err; -_err: - tsdbError("vgId:%d commit table del start failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitTableDelImpl(SCommitter *pCommitter) { - int32_t code = 0; - SDelDataItem item; - - // old - if (pCommitter->pDelIdxItem) { - for (int32_t iDelIdxItem = 0; iDelIdxItem < pCommitter->delDataOld.offset.nOffset; iDelIdxItem++) { - code = tDelDataGetItemByIdx(&pCommitter->delDataOld, &item, iDelIdxItem); - if (code) goto _err; - - code = tDelDataPutItem(&pCommitter->delDataNew, &item); - if (code) goto _err; - - // update index - if (item.version < pCommitter->delIdxItem.minVersion) pCommitter->delIdxItem.minVersion = item.version; - if (item.version > pCommitter->delIdxItem.maxVersion) pCommitter->delIdxItem.maxVersion = item.version; - if (item.sKey < pCommitter->delIdxItem.minKey) pCommitter->delIdxItem.minKey = item.sKey; - if (item.eKey > pCommitter->delIdxItem.maxKey) pCommitter->delIdxItem.maxKey = item.eKey; - } + if (delIdx.minKey > delData.sKey) delIdx.minKey = delData.sKey; + if (delIdx.maxKey < delData.eKey) delIdx.maxKey = delData.eKey; + if (delIdx.minVersion > delData.version) delIdx.minVersion = delData.version; + if (delIdx.maxVersion < delData.version) delIdx.maxVersion = delData.version; } - // new - if (pCommitter->pTbData) { - for (SDelOp *pDelOp = pCommitter->pTbData->pHead; pDelOp; pDelOp = pDelOp->pNext) { - item = (SDelDataItem){.version = pDelOp->version, .sKey = pDelOp->sKey, .eKey = pDelOp->eKey}; + // memory + pDelOp = pTbData ? pTbData->pHead : NULL; + for (; pDelOp; pDelOp = pDelOp->pNext) { + delData.version = pDelOp->version; + delData.sKey = pDelOp->sKey; + delData.eKey = pDelOp->eKey; - code = tDelDataPutItem(&pCommitter->delDataNew, &item); - if (code) goto _err; + code = tMapDataPutItem(&pCommitter->nDelDataMap, &delData, tPutDelData); + if (code) goto _err; - // update index - if (item.version < pCommitter->delIdxItem.minVersion) pCommitter->delIdxItem.minVersion = item.version; - if (item.version > pCommitter->delIdxItem.maxVersion) pCommitter->delIdxItem.maxVersion = item.version; - if (item.sKey < pCommitter->delIdxItem.minKey) pCommitter->delIdxItem.minKey = item.sKey; - if (item.eKey > pCommitter->delIdxItem.maxKey) pCommitter->delIdxItem.maxKey = item.eKey; - } + if (delIdx.minKey > delData.sKey) delIdx.minKey = delData.sKey; + if (delIdx.maxKey < delData.eKey) delIdx.maxKey = delData.eKey; + if (delIdx.minVersion > delData.version) delIdx.minVersion = delData.version; + if (delIdx.maxVersion < delData.version) delIdx.maxVersion = delData.version; } + ASSERT(pCommitter->nDelDataMap.nItem > 0); + + // write + code = tsdbWriteDelData(pCommitter->pDelFWriter, &info, &pCommitter->nDelDataMap, NULL, &delIdx.offset, &delIdx.size); + if (code) goto _err; + + // put delIdx + code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx); + if (code) goto _err; + +_exit: return code; _err: - return code; -} - -static int32_t tsdbCommitTableDelEnd(SCommitter *pCommitter) { - int32_t code = 0; - - // write table del data - code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->delDataNew, NULL, &pCommitter->delIdxItem.offset, - &pCommitter->delIdxItem.size); - if (code) goto _err; - - // add SDelIdxItem - code = tDelIdxPutItem(&pCommitter->delIdxNew, &pCommitter->delIdxItem); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d commit table del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitTableDel(SCommitter *pCommitter) { - int32_t code = 0; - - // start - code = tsdbCommitTableDelStart(pCommitter); - if (code) goto _err; - - // impl - code = tsdbCommitTableDelImpl(pCommitter); - if (code) goto _err; - - // end - code = tsdbCommitTableDelEnd(pCommitter); - if (code) goto _err; - - return code; - -_err: + tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } \ No newline at end of file