commit
18dba544c3
|
@ -183,10 +183,10 @@ typedef struct {
|
||||||
} STsdbFileH;
|
} STsdbFileH;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int numOfFGroups;
|
|
||||||
SFileGroup *base;
|
|
||||||
SFileGroup *pFileGroup;
|
|
||||||
int direction;
|
int direction;
|
||||||
|
STsdbFileH* pFileH;
|
||||||
|
int fileId;
|
||||||
|
int index;
|
||||||
} SFileGroupIter;
|
} SFileGroupIter;
|
||||||
|
|
||||||
// ------------------ tsdbMain.c
|
// ------------------ tsdbMain.c
|
||||||
|
|
|
@ -156,8 +156,10 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||||
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
||||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||||
|
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||||
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,54 +170,72 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO
|
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
|
||||||
|
pIter->pFileH = pFileH;
|
||||||
pIter->direction = direction;
|
pIter->direction = direction;
|
||||||
pIter->base = pFileH->pFGroup;
|
|
||||||
pIter->numOfFGroups = pFileH->nFGroups;
|
|
||||||
if (pFileH->nFGroups == 0) {
|
if (pFileH->nFGroups == 0) {
|
||||||
pIter->pFileGroup = NULL;
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
} else {
|
} else {
|
||||||
if (direction == TSDB_FGROUP_ITER_FORWARD) {
|
if (direction == TSDB_FGROUP_ITER_FORWARD) {
|
||||||
pIter->pFileGroup = pFileH->pFGroup;
|
pIter->index = 0;
|
||||||
} else {
|
} else {
|
||||||
pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1;
|
pIter->index = pFileH->nFGroups - 1;
|
||||||
}
|
}
|
||||||
|
pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO
|
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
|
||||||
if (pIter->numOfFGroups == 0) {
|
STsdbFileH *pFileH = pIter->pFileH;
|
||||||
assert(pIter->pFileGroup == NULL);
|
|
||||||
|
if (pFileH->nFGroups == 0) {
|
||||||
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
|
||||||
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
void *ptr = taosbsearch(&fid, (void *)pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
pIter->pFileGroup = NULL;
|
pIter->index = -1;
|
||||||
|
pIter->fileId = -1;
|
||||||
} else {
|
} else {
|
||||||
pIter->pFileGroup = (SFileGroup *)ptr;
|
pIter->index = POINTER_DISTANCE(ptr, pFileH->pFGroup) / sizeof(SFileGroup);
|
||||||
|
pIter->fileId = ((SFileGroup *)ptr)->fileId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO
|
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
||||||
SFileGroup *ret = pIter->pFileGroup;
|
STsdbFileH *pFileH = pIter->pFileH;
|
||||||
if (ret == NULL) return NULL;
|
SFileGroup *pFGroup = NULL;
|
||||||
|
|
||||||
|
if (pIter->index < 0 || pIter->index >= pFileH->nFGroups || pIter->fileId < 0) return NULL;
|
||||||
|
|
||||||
|
pFGroup = &pFileH->pFGroup[pIter->index];
|
||||||
|
if (pFGroup->fileId != pIter->fileId) {
|
||||||
|
tsdbSeekFileGroupIter(pIter, pIter->fileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->index < 0) return NULL;
|
||||||
|
|
||||||
|
pFGroup = &pFileH->pFGroup[pIter->index];
|
||||||
|
ASSERT(pFGroup->fileId == pIter->fileId);
|
||||||
|
|
||||||
if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
|
if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
|
||||||
if ((pIter->pFileGroup + 1) == (pIter->base + pIter->numOfFGroups)) {
|
pIter->index++;
|
||||||
pIter->pFileGroup = NULL;
|
|
||||||
} else {
|
|
||||||
pIter->pFileGroup += 1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if (pIter->pFileGroup == pIter->base) {
|
pIter->index--;
|
||||||
pIter->pFileGroup = NULL;
|
|
||||||
} else {
|
|
||||||
pIter->pFileGroup -= 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
|
if (pIter->index >= 0 && pIter->index < pFileH->nFGroups) {
|
||||||
|
pIter->fileId = pFileH->pFGroup[pIter->index].fileId;
|
||||||
|
} else {
|
||||||
|
pIter->fileId = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pFGroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbOpenFile(SFile *pFile, int oflag) {
|
int tsdbOpenFile(SFile *pFile, int oflag) {
|
||||||
|
|
|
@ -787,6 +787,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||||
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
|
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
|
||||||
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
|
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
|
||||||
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
|
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
|
||||||
|
if (tsdbLoadCompIdx(&rhelper, NULL) < 0) goto _err;
|
||||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
|
|
|
@ -603,6 +603,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// Loop to commit data in each table
|
// Loop to commit data in each table
|
||||||
for (int tid = 1; tid < pMem->maxTables; tid++) {
|
for (int tid = 1; tid < pMem->maxTables; tid++) {
|
||||||
SCommitIter *pIter = iters + tid;
|
SCommitIter *pIter = iters + tid;
|
||||||
|
@ -651,12 +656,20 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
tsdbCloseHelperFile(pHelper, 0);
|
tsdbCloseHelperFile(pHelper, 0);
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
#ifdef TSDB_IDX
|
#ifdef TSDB_IDX
|
||||||
pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper));
|
rename(helperNewIdxF(pHelper)->fname, helperIdxF(pHelper)->fname);
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_IDX].info = helperNewIdxF(pHelper)->info;
|
||||||
#endif
|
#endif
|
||||||
pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper));
|
|
||||||
pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper));
|
rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
|
||||||
pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper));
|
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
|
||||||
|
|
||||||
|
rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
|
||||||
|
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
|
||||||
|
|
||||||
pthread_rwlock_unlock(&(pFileH->fhlock));
|
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -118,12 +118,12 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
|
|
||||||
// Open the files
|
// Open the files
|
||||||
#ifdef TSDB_IDX
|
#ifdef TSDB_IDX
|
||||||
if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
#endif
|
#endif
|
||||||
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err;
|
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1;
|
||||||
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err;
|
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1;
|
||||||
|
|
||||||
#ifdef TSDB_IDX
|
#ifdef TSDB_IDX
|
||||||
// Create and open .i file
|
// Create and open .i file
|
||||||
|
@ -144,23 +144,20 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
// Create and open .l file if should
|
// Create and open .l file if should
|
||||||
if (tsdbShouldCreateNewLast(pHelper)) {
|
if (tsdbShouldCreateNewLast(pHelper)) {
|
||||||
pFile = helperNewLastF(pHelper);
|
pFile = helperNewLastF(pHelper);
|
||||||
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) goto _err;
|
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
|
||||||
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
||||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
pFile->info.len = 0;
|
pFile->info.len = 0;
|
||||||
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
|
if (tsdbUpdateFileHeader(pFile, 0) < 0) return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
|
helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
|
||||||
|
|
||||||
return tsdbLoadCompIdx(pHelper, NULL);
|
return 0;
|
||||||
|
|
||||||
_err:
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
|
@ -183,8 +180,12 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
pFile = helperDataF(pHelper);
|
pFile = helperDataF(pHelper);
|
||||||
if (pFile->fd > 0) {
|
if (pFile->fd > 0) {
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
tsdbUpdateFileHeader(pFile, 0);
|
if (!hasError) {
|
||||||
fsync(pFile->fd);
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
} else {
|
||||||
|
// TODO: shrink back to origin
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(pFile->fd);
|
close(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
|
@ -193,7 +194,12 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
pFile = helperLastF(pHelper);
|
pFile = helperLastF(pHelper);
|
||||||
if (pFile->fd > 0) {
|
if (pFile->fd > 0) {
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||||
fsync(pFile->fd);
|
if (!hasError) {
|
||||||
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
} else {
|
||||||
|
// TODO: shrink back to origin
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(pFile->fd);
|
close(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
|
@ -203,60 +209,36 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
#ifdef TSDB_IDX
|
#ifdef TSDB_IDX
|
||||||
pFile = helperNewIdxF(pHelper);
|
pFile = helperNewIdxF(pHelper);
|
||||||
if (pFile->fd > 0) {
|
if (pFile->fd > 0) {
|
||||||
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
if (!hasError) {
|
||||||
fsync(pFile->fd);
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
}
|
||||||
close(pFile->fd);
|
close(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
if (hasError) {
|
if (hasError) (void)remove(pFile->fname);
|
||||||
(void)remove(pFile->fname);
|
|
||||||
} else {
|
|
||||||
if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) {
|
|
||||||
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname,
|
|
||||||
strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
helperIdxF(pHelper)->info = pFile->info;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
pFile = helperNewHeadF(pHelper);
|
pFile = helperNewHeadF(pHelper);
|
||||||
if (pFile->fd > 0) {
|
if (pFile->fd > 0) {
|
||||||
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
if (!hasError) {
|
||||||
fsync(pFile->fd);
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
}
|
||||||
close(pFile->fd);
|
close(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
if (hasError) {
|
if (hasError) (void)remove(pFile->fname);
|
||||||
(void)remove(pFile->fname);
|
|
||||||
} else {
|
|
||||||
if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) {
|
|
||||||
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname,
|
|
||||||
strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
helperHeadF(pHelper)->info = pFile->info;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFile = helperNewLastF(pHelper);
|
pFile = helperNewLastF(pHelper);
|
||||||
if (pFile->fd > 0) {
|
if (pFile->fd > 0) {
|
||||||
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
if (!hasError) {
|
||||||
fsync(pFile->fd);
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
}
|
||||||
close(pFile->fd);
|
close(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
if (hasError) {
|
if (hasError) (void)remove(pFile->fname);
|
||||||
(void)remove(pFile->fname);
|
|
||||||
} else {
|
|
||||||
if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) {
|
|
||||||
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname,
|
|
||||||
strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
helperLastF(pHelper)->info = helperNewLastF(pHelper)->info;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -555,16 +555,6 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
|
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks) {
|
||||||
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
|
|
||||||
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
|
|
||||||
|
|
||||||
int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
|
|
||||||
|
|
||||||
//open file failed, return error code to client
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// load all the comp offset value for all tables in this file
|
// load all the comp offset value for all tables in this file
|
||||||
*numOfBlocks = 0;
|
*numOfBlocks = 0;
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
@ -1461,12 +1451,20 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
|
||||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
while (true) {
|
||||||
|
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
|
||||||
|
if ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) == NULL) {
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
|
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
|
||||||
|
|
||||||
// current file are not overlapped with query time window, ignore remain files
|
// current file are not overlapped with query time window, ignore remain files
|
||||||
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
|
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
|
||||||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
|
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle,
|
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %p", pQueryHandle,
|
||||||
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo);
|
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo);
|
||||||
pQueryHandle->pFileGroup = NULL;
|
pQueryHandle->pFileGroup = NULL;
|
||||||
|
@ -1474,6 +1472,19 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, pQueryHandle->pFileGroup) < 0) {
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
code = terrno;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
|
||||||
|
if (tsdbLoadCompIdx(&pQueryHandle->rhelper, NULL) < 0) {
|
||||||
|
code = terrno;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1528,8 +1539,10 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
||||||
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||||
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
int32_t fid = getFileIdFromKey(pQueryHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
|
||||||
|
|
||||||
|
pthread_rwlock_rdlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
||||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||||
|
pthread_rwlock_unlock(&pQueryHandle->pTsdb->tsdbFileH->fhlock);
|
||||||
|
|
||||||
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue