enh: use hash table for filtering fset in tsdbSnapReaderOpen and tsdbSnapWriterOpen
This commit is contained in:
parent
4e74533878
commit
eb8c4d3e8c
|
@ -680,6 +680,7 @@ typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges
|
||||||
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
|
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
|
||||||
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
|
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
|
||||||
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
|
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
|
||||||
|
SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges);
|
||||||
|
|
||||||
// snap partition list
|
// snap partition list
|
||||||
typedef TARRAY2(SVersionRange) SVerRangeList;
|
typedef TARRAY2(SVersionRange) SVerRangeList;
|
||||||
|
|
|
@ -991,11 +991,12 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr,
|
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRanges, TFileSetArray **fsetArr,
|
||||||
TFileOpArray *fopArr) {
|
TFileOpArray *fopArr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
STFileSet *fset1;
|
STFileSet *fset1;
|
||||||
|
SHashObj *pHash = NULL;
|
||||||
|
|
||||||
fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
|
fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
|
||||||
if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1003,21 +1004,19 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclu
|
||||||
TARRAY2_INIT(fsetArr[0]);
|
TARRAY2_INIT(fsetArr[0]);
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
if (pRanges) {
|
||||||
|
pHash = tsdbGetSnapRangeHash(pRanges);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
int64_t ever = VERSION_MAX;
|
int64_t ever = VERSION_MAX;
|
||||||
while (pExclude && i < TARRAY2_SIZE(pExclude)) {
|
if (pHash) {
|
||||||
STSnapRange *u = TARRAY2_GET(pExclude, i);
|
int32_t fid = fset->fid;
|
||||||
if (fset->fid > u->fid) {
|
STSnapRange *u = taosHashGet(pHash, &fid, sizeof(fid));
|
||||||
i++;
|
if (u) {
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (fset->fid == u->fid) {
|
|
||||||
ever = u->sver - 1;
|
ever = u->sver - 1;
|
||||||
i++;
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
|
code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
|
||||||
|
@ -1033,14 +1032,37 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclu
|
||||||
taosMemoryFree(fsetArr[0]);
|
taosMemoryFree(fsetArr[0]);
|
||||||
fsetArr[0] = NULL;
|
fsetArr[0] = NULL;
|
||||||
}
|
}
|
||||||
|
if (pHash) {
|
||||||
|
taosHashCleanup(pHash);
|
||||||
|
pHash = NULL;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges) {
|
||||||
|
int32_t capacity = TARRAY2_SIZE(pRanges) * 2;
|
||||||
|
SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||||
|
if (pHash == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
|
||||||
|
STSnapRange *u = TARRAY2_GET(pRanges, i);
|
||||||
|
int32_t fid = u->fid;
|
||||||
|
int32_t code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
|
||||||
|
ASSERT(code == 0);
|
||||||
|
tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
|
||||||
|
}
|
||||||
|
return pHash;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges,
|
||||||
TSnapRangeArray **fsrArr) {
|
TSnapRangeArray **fsrArr) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
STSnapRange *fsr1 = NULL;
|
STSnapRange *fsr1 = NULL;
|
||||||
|
SHashObj *pHash = NULL;
|
||||||
|
|
||||||
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
|
||||||
if (fsrArr[0] == NULL) {
|
if (fsrArr[0] == NULL) {
|
||||||
|
@ -1048,30 +1070,32 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
|
||||||
code = 0;
|
code = 0;
|
||||||
|
if (pRanges) {
|
||||||
|
pHash = tsdbGetSnapRangeHash(pRanges);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
int64_t sver1 = sver;
|
int64_t sver1 = sver;
|
||||||
int64_t ever1 = ever;
|
int64_t ever1 = ever;
|
||||||
|
|
||||||
while (pRanges && i < TARRAY2_SIZE(pRanges)) {
|
if (pHash) {
|
||||||
STSnapRange *u = TARRAY2_GET(pRanges, i);
|
int32_t fid = fset->fid;
|
||||||
if (fset->fid > u->fid) {
|
STSnapRange *u = taosHashGet(pHash, &fid, sizeof(fid));
|
||||||
i++;
|
if (u) {
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (fset->fid == u->fid) {
|
|
||||||
sver1 = u->sver;
|
sver1 = u->sver;
|
||||||
i++;
|
tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sver1 > ever1) continue;
|
if (sver1 > ever1) {
|
||||||
|
tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
|
||||||
|
|
||||||
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
|
@ -1090,6 +1114,10 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
||||||
}
|
}
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
|
if (pHash) {
|
||||||
|
taosHashCleanup(pHash);
|
||||||
|
pHash = NULL;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1175,6 +1175,12 @@ static int32_t tVersionRangeCmprFn(SVersionRange* x, SVersionRange* y) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSnapRangeCmprFn(STSnapRange* x, STSnapRange* y) {
|
||||||
|
if (x->fid < y->fid) return -1;
|
||||||
|
if (x->fid > y->fid) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
STsdbSnapPartition* tsdbSnapPartitionCreate() {
|
STsdbSnapPartition* tsdbSnapPartitionCreate() {
|
||||||
STsdbSnapPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbSnapPartition));
|
STsdbSnapPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbSnapPartition));
|
||||||
if (pSP == NULL) {
|
if (pSP == NULL) {
|
||||||
|
@ -1478,10 +1484,13 @@ int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray**
|
||||||
r->fid = part->fid;
|
r->fid = part->fid;
|
||||||
r->sver = maxVerValid + 1;
|
r->sver = maxVerValid + 1;
|
||||||
r->ever = VERSION_MAX;
|
r->ever = VERSION_MAX;
|
||||||
tsdbInfo("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever);
|
tsdbDebug("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever);
|
||||||
TARRAY2_APPEND(pDiff, r);
|
int32_t code = TARRAY2_SORT_INSERT(pDiff, r, tsdbSnapRangeCmprFn);
|
||||||
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
ppRanges[0] = pDiff;
|
ppRanges[0] = pDiff;
|
||||||
|
|
||||||
|
tsdbInfo("pDiff size:%d", TARRAY2_SIZE(pDiff));
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
Loading…
Reference in New Issue