refact FGroup Iter make it multi-thread safe
This commit is contained in:
parent
89c8104238
commit
0271f94394
|
@ -183,10 +183,10 @@ typedef struct {
|
||||||
} STsdbFileH;
|
} STsdbFileH;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int numOfFGroups;
|
|
||||||
SFileGroup *base;
|
|
||||||
SFileGroup *pFileGroup;
|
|
||||||
int direction;
|
int direction;
|
||||||
|
STsdbFileH* pFileH;
|
||||||
|
int fileId;
|
||||||
|
int index;
|
||||||
} SFileGroupIter;
|
} SFileGroupIter;
|
||||||
|
|
||||||
// ------------------ tsdbMain.c
|
// ------------------ tsdbMain.c
|
||||||
|
|
|
@ -156,8 +156,10 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||||
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
||||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||||
|
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||||
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,54 +170,72 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO
|
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
|
||||||
|
pIter->pFileH = pFileH;
|
||||||
pIter->direction = direction;
|
pIter->direction = direction;
|
||||||
pIter->base = pFileH->pFGroup;
|
|
||||||
pIter->numOfFGroups = pFileH->nFGroups;
|
|
||||||
if (pFileH->nFGroups == 0) {
|
if (pFileH->nFGroups == 0) {
|
||||||
pIter->pFileGroup = NULL;
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
} else {
|
} else {
|
||||||
if (direction == TSDB_FGROUP_ITER_FORWARD) {
|
if (direction == TSDB_FGROUP_ITER_FORWARD) {
|
||||||
pIter->pFileGroup = pFileH->pFGroup;
|
pIter->index = 0;
|
||||||
} else {
|
} else {
|
||||||
pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1;
|
pIter->index = pFileH->nFGroups - 1;
|
||||||
}
|
}
|
||||||
|
pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO
|
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
|
||||||
if (pIter->numOfFGroups == 0) {
|
STsdbFileH *pFileH = pIter->pFileH;
|
||||||
assert(pIter->pFileGroup == NULL);
|
|
||||||
|
if (pFileH->nFGroups == 0) {
|
||||||
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
||||||
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
pIter->pFileGroup = NULL;
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
} else {
|
} else {
|
||||||
pIter->pFileGroup = (SFileGroup *)ptr;
|
pIter->index = POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup);
|
||||||
|
pIter->fileId = ((SFileGroup *)ptr)->fileId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO
|
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
||||||
SFileGroup *ret = pIter->pFileGroup;
|
STsdbFileH *pFileH = pIter->pFileH;
|
||||||
if (ret == NULL) return NULL;
|
SFileGroup *pFGroup = NULL;
|
||||||
|
|
||||||
|
if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL;
|
||||||
|
|
||||||
|
pFGroup = &pFileH->pFGroup[pIter->index];
|
||||||
|
if (pFGroup->fileId != pIter->fileId) {
|
||||||
|
tsdbSeekFileGroupIter(pIter, pIter->fileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->index < 0) return NULL;
|
||||||
|
|
||||||
|
pFGroup = &pFileH->pFGroup[pIter->index];
|
||||||
|
ASSERT(pFGroup->fileId == pIter->fileId);
|
||||||
|
|
||||||
if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
|
if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
|
||||||
if ((pIter->pFileGroup + 1) == (pIter->base + pIter->numOfFGroups)) {
|
pIter->index++;
|
||||||
pIter->pFileGroup = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
pIter->pFileGroup += 1;
|
pIter->index--;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) {
|
||||||
|
pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
|
||||||
} else {
|
} else {
|
||||||
if (pIter->pFileGroup == pIter->base) {
|
pIter->fileId = -1;
|
||||||
pIter->pFileGroup = NULL;
|
|
||||||
} else {
|
|
||||||
pIter->pFileGroup -= 1;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return ret;
|
return pFGroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbOpenFile(SFile *pFile, int oflag) {
|
int tsdbOpenFile(SFile *pFile, int oflag) {
|
||||||
|
|
|
@ -1539,8 +1539,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
||||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||||
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
||||||
|
|
||||||
|
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
||||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
|
||||||
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue