commit
f7d21f6c2c
|
@ -3191,7 +3191,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
||||||
} else { \
|
} else { \
|
||||||
*(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \
|
*(type *)(ctx)->aOutputBuf = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64Key)); \
|
||||||
*(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \
|
*(type *)(&(ctx)->param[1].i64Key) = *(type *)(d); \
|
||||||
*(int64_t *)(ctx)->ptsOutputBuf = *(int64_t *)((ctx)->ptsList + (TSDB_KEYSIZE)*index); \
|
*(int64_t *)(ctx)->ptsOutputBuf = (ctx)->ptsList[index]; \
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
|
|
@ -26,12 +26,14 @@ extern "C" {
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#ifndef STDERR_FILENO
|
#ifndef STDERR_FILENO
|
||||||
#define VALIDFD(x) ((x) > 2)
|
#define STDERR_FILENO (2)
|
||||||
#else
|
|
||||||
#define VALIDFD(x) ((x) > STDERR_FILENO)
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define FD_VALID(x) ((x) > STDERR_FILENO)
|
||||||
|
#define FD_INITIALIZER ((int32_t)-1)
|
||||||
|
|
||||||
#define WCHAR wchar_t
|
#define WCHAR wchar_t
|
||||||
|
|
||||||
#define tfree(x) \
|
#define tfree(x) \
|
||||||
{ \
|
{ \
|
||||||
if (x) { \
|
if (x) { \
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
|
|
||||||
#define taosCloseSocket(x) \
|
#define taosCloseSocket(x) \
|
||||||
{ \
|
{ \
|
||||||
if (VALIDFD(x)) { \
|
if (FD_VALID(x)) { \
|
||||||
close(x); \
|
close(x); \
|
||||||
x = -1; \
|
x = -1; \
|
||||||
} \
|
} \
|
||||||
|
|
|
@ -75,11 +75,12 @@ extern "C" {
|
||||||
|
|
||||||
#define taosCloseSocket(x) \
|
#define taosCloseSocket(x) \
|
||||||
{ \
|
{ \
|
||||||
if (VALIDFD(x)) { \
|
if (FD_VALID(x)) { \
|
||||||
close(x); \
|
close(x); \
|
||||||
x = -1; \
|
x = -1; \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
|
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
|
||||||
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
|
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
|
||||||
|
|
||||||
|
|
|
@ -192,9 +192,9 @@ int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInf
|
||||||
|
|
||||||
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData,
|
uint32_t getDataBlocksForMeters(SMeterQuerySupportObj* pSupporter, SQuery* pQuery, char* pHeaderData,
|
||||||
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
|
int32_t numOfMeters, SQueryFileInfo* pQueryFileInfo, SMeterDataInfo** pMeterDataInfo);
|
||||||
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, int8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
|
int32_t LoadDatablockOnDemand(SCompBlock* pBlock, SField** pFields, uint8_t* blkStatus, SQueryRuntimeEnv* pRuntimeEnv,
|
||||||
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
|
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand);
|
||||||
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex);
|
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create SMeterQueryInfo.
|
* Create SMeterQueryInfo.
|
||||||
|
|
|
@ -52,9 +52,9 @@ typedef struct SQueryLoadCompBlockInfo {
|
||||||
*/
|
*/
|
||||||
typedef struct SQueryFileInfo {
|
typedef struct SQueryFileInfo {
|
||||||
int32_t fileID; /* file id */
|
int32_t fileID; /* file id */
|
||||||
char headerFilePath[256]; /* full file name */
|
char headerFilePath[PATH_MAX]; /* full file name */
|
||||||
char dataFilePath[256];
|
char dataFilePath[PATH_MAX];
|
||||||
char lastFilePath[256];
|
char lastFilePath[PATH_MAX];
|
||||||
int32_t defaultMappingSize; /* default mapping size */
|
int32_t defaultMappingSize; /* default mapping size */
|
||||||
|
|
||||||
int32_t headerFd; /* file handler */
|
int32_t headerFd; /* file handler */
|
||||||
|
|
|
@ -81,7 +81,7 @@ int vnodeRenewCommitLog(int vnode) {
|
||||||
|
|
||||||
pthread_mutex_lock(&(pVnode->logMutex));
|
pthread_mutex_lock(&(pVnode->logMutex));
|
||||||
|
|
||||||
if (VALIDFD(pVnode->logFd)) {
|
if (FD_VALID(pVnode->logFd)) {
|
||||||
munmap(pVnode->pMem, pVnode->mappingSize);
|
munmap(pVnode->pMem, pVnode->mappingSize);
|
||||||
close(pVnode->logFd);
|
close(pVnode->logFd);
|
||||||
rename(fileName, oldName);
|
rename(fileName, oldName);
|
||||||
|
@ -243,7 +243,7 @@ int vnodeInitCommit(int vnode) {
|
||||||
void vnodeCleanUpCommit(int vnode) {
|
void vnodeCleanUpCommit(int vnode) {
|
||||||
SVnodeObj *pVnode = vnodeList + vnode;
|
SVnodeObj *pVnode = vnodeList + vnode;
|
||||||
|
|
||||||
if (VALIDFD(pVnode->logFd)) close(pVnode->logFd);
|
if (FD_VALID(pVnode->logFd)) close(pVnode->logFd);
|
||||||
|
|
||||||
if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
|
if (pVnode->cfg.commitLog && (pVnode->logFd > 0 && remove(pVnode->logFn) < 0)) {
|
||||||
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
|
dError("vid:%d, failed to remove:%s", vnode, pVnode->logFn);
|
||||||
|
|
|
@ -247,6 +247,47 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) {
|
||||||
pBlockLoadInfo->fileListIndex = -1;
|
pBlockLoadInfo->fileListIndex = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if the header is smaller than a threshold value(header size + initial offset value)
|
||||||
|
*
|
||||||
|
* @param vnodeId
|
||||||
|
* @param headerFileSize
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
static bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) {
|
||||||
|
SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg;
|
||||||
|
return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doCloseQueryFileInfoFD(SQueryFileInfo* pVnodeFiles) {
|
||||||
|
tclose(pVnodeFiles->headerFd);
|
||||||
|
tclose(pVnodeFiles->dataFd);
|
||||||
|
tclose(pVnodeFiles->lastFd);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doOpenQueryFileInfoDF(SQInfo* pQInfo, SQueryFileInfo* pVnodeFiles) {
|
||||||
|
// if the header is smaller than a threshold value, this file is empty, no need to
|
||||||
|
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
|
||||||
|
if (!FD_VALID(pVnodeFiles->headerFd)) {
|
||||||
|
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
|
||||||
|
if (!FD_VALID(pVnodeFiles->headerFd)) {
|
||||||
|
dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFiles->dataFilePath, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
|
||||||
|
if (!FD_VALID(pVnodeFiles->headerFd)) {
|
||||||
|
dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFiles->lastFilePath, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
|
static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
if (pRuntimeEnv->mmapedHFileIndex >= 0) {
|
if (pRuntimeEnv->mmapedHFileIndex >= 0) {
|
||||||
assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0);
|
assert(pRuntimeEnv->mmapedHFileIndex < pRuntimeEnv->numOfFiles && pRuntimeEnv->mmapedHFileIndex >= 0);
|
||||||
|
@ -255,6 +296,8 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize);
|
munmap(otherVnodeFiles->pHeaderFileData, otherVnodeFiles->headFileSize);
|
||||||
|
|
||||||
otherVnodeFiles->pHeaderFileData = NULL;
|
otherVnodeFiles->pHeaderFileData = NULL;
|
||||||
|
doCloseQueryFileInfoFD(otherVnodeFiles);
|
||||||
|
|
||||||
pRuntimeEnv->mmapedHFileIndex = -1;
|
pRuntimeEnv->mmapedHFileIndex = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,34 +306,46 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to
|
* mmap the data file into memory. For each query, only one header file is allowed to mmap into memory, in order to
|
||||||
* avoid too many mmapped files at the save time to cause OS return the message of "Cannot allocate memory",
|
* avoid too many memory mapped files at the save time to cause OS return the message of "Cannot allocate memory",
|
||||||
* during query processing.
|
* during query processing.
|
||||||
*
|
*
|
||||||
* @param pRuntimeEnv
|
* @param pRuntimeEnv
|
||||||
* @param fileIndex
|
* @param fileIndex
|
||||||
* @return the return value may be null, so any invoker needs to check the returned value
|
* @return the return value may be null, so any invoker needs to check the returned value
|
||||||
*/
|
*/
|
||||||
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) {
|
char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int32_t fileIndex) {
|
||||||
assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles);
|
assert(fileIndex >= 0 && fileIndex < pRuntimeEnv->numOfFiles);
|
||||||
|
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
|
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output
|
||||||
|
|
||||||
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex];
|
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[fileIndex];
|
||||||
size_t size = pVnodeFiles->headFileSize;
|
|
||||||
|
|
||||||
if (pVnodeFiles->pHeaderFileData == NULL) {
|
if (pVnodeFiles->pHeaderFileData == NULL) {
|
||||||
assert(pRuntimeEnv->mmapedHFileIndex != fileIndex);
|
assert(pRuntimeEnv->mmapedHFileIndex != fileIndex);
|
||||||
doUnmapHeaderFileData(pRuntimeEnv); // do close the other mmaped header file
|
doUnmapHeaderFileData(pRuntimeEnv); // do close the other memory mapped header file
|
||||||
|
|
||||||
pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
|
assert(pVnodeFiles->pHeaderFileData == NULL);
|
||||||
|
|
||||||
|
// current header file is empty or broken, return directly
|
||||||
|
if (isHeaderFileEmpty(vnodeId, pVnodeFiles->headFileSize)) {
|
||||||
|
return pVnodeFiles->pHeaderFileData;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (doOpenQueryFileInfoDF(pQInfo, pVnodeFiles) != TSDB_CODE_SUCCESS) {
|
||||||
|
return pVnodeFiles->pHeaderFileData;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnodeFiles->pHeaderFileData = mmap(NULL, pVnodeFiles->headFileSize, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
|
||||||
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
|
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
|
||||||
pVnodeFiles->pHeaderFileData = NULL;
|
pVnodeFiles->pHeaderFileData = NULL;
|
||||||
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size,
|
doCloseQueryFileInfoFD(pVnodeFiles);
|
||||||
strerror(errno));
|
|
||||||
|
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath,
|
||||||
|
pVnodeFiles->headFileSize, strerror(errno));
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file
|
pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file
|
||||||
if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) {
|
if (madvise(pVnodeFiles->pHeaderFileData, pVnodeFiles->headFileSize, MADV_SEQUENTIAL) == -1) {
|
||||||
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
|
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -326,7 +381,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
pSummary->numOfSeek++;
|
pSummary->numOfSeek++;
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
|
char *data = vnodeGetHeaderFileData(pRuntimeEnv, pMeterObj->vnode, fileIndex);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
return -1; // failed to load the header file data into memory
|
return -1; // failed to load the header file data into memory
|
||||||
}
|
}
|
||||||
|
@ -2928,34 +2983,24 @@ static int file_order_comparator(const void *p1, const void *p2) {
|
||||||
* @param prefix
|
* @param prefix
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId,
|
static int32_t vnodeOpenVnodeDBFiles(SQueryFileInfo *pVnodeFiles, int32_t fid, int32_t vnodeId, char *fileName,
|
||||||
char *fileName, char *prefix) {
|
char *prefix) {
|
||||||
|
|
||||||
pVnodeFiles->fileID = fid;
|
pVnodeFiles->fileID = fid;
|
||||||
pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE;
|
pVnodeFiles->defaultMappingSize = DEFAULT_DATA_FILE_MMAP_WINDOW_SIZE;
|
||||||
|
|
||||||
snprintf(pVnodeFiles->headerFilePath, 256, "%s%s", prefix, fileName);
|
snprintf(pVnodeFiles->headerFilePath, PATH_MAX, "%s%s", prefix, fileName);
|
||||||
pVnodeFiles->headerFd = open(pVnodeFiles->headerFilePath, O_RDONLY);
|
|
||||||
|
|
||||||
if (!VALIDFD(pVnodeFiles->headerFd)) {
|
|
||||||
dError("QInfo:%p failed open header file:%s reason:%s", pQInfo, pVnodeFiles->headerFilePath, strerror(errno));
|
|
||||||
goto _clean;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct stat fstat = {0};
|
struct stat fstat = {0};
|
||||||
if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1;
|
if (stat(pVnodeFiles->headerFilePath, &fstat) < 0) return -1;
|
||||||
pVnodeFiles->headFileSize = fstat.st_size;
|
pVnodeFiles->headFileSize = fstat.st_size;
|
||||||
|
|
||||||
snprintf(pVnodeFiles->dataFilePath, 256, "%sv%df%d.data", prefix, vnodeId, fid);
|
snprintf(pVnodeFiles->dataFilePath, PATH_MAX, "%sv%df%d.data", prefix, vnodeId, fid);
|
||||||
snprintf(pVnodeFiles->lastFilePath, 256, "%sv%df%d.last", prefix, vnodeId, fid);
|
snprintf(pVnodeFiles->lastFilePath, PATH_MAX, "%sv%df%d.last", prefix, vnodeId, fid);
|
||||||
|
|
||||||
pVnodeFiles->dataFd = open(pVnodeFiles->dataFilePath, O_RDONLY);
|
pVnodeFiles->headerFd = FD_INITIALIZER;
|
||||||
pVnodeFiles->lastFd = open(pVnodeFiles->lastFilePath, O_RDONLY);
|
pVnodeFiles->dataFd = FD_INITIALIZER;
|
||||||
|
pVnodeFiles->lastFd = FD_INITIALIZER;
|
||||||
// if (stat(pVnodeFiles->dataFilePath, &fstat) < 0) return -1;
|
|
||||||
// pVnodeFiles->dataFileSize = fstat.st_size;
|
|
||||||
//
|
|
||||||
// if (stat(pVnodeFiles->lastFilePath, &fstat) < 0) return -1;
|
|
||||||
// pVnodeFiles->lastFileSize = fstat.st_size;
|
|
||||||
|
|
||||||
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
|
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
|
||||||
/* enforce kernel to preload data when the file is mapping */
|
/* enforce kernel to preload data when the file is mapping */
|
||||||
|
@ -2975,19 +3020,19 @@ static int32_t vnodeOpenVnodeDBFiles(SQInfo *pQInfo, SQueryFileInfo *pVnodeFiles
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_clean:
|
|
||||||
|
|
||||||
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
|
#if DEFAULT_IO_ENGINE == IO_ENGINE_MMAP
|
||||||
|
_clean:
|
||||||
if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
|
if (pVnodeFiles->pDataFileData != MAP_FAILED && pVnodeFiles->pDataFileData != NULL) {
|
||||||
munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize);
|
munmap(pVnodeFiles->pDataFileData, pVnodeFiles->defaultMappingSize);
|
||||||
pVnodeFiles->pDataFileData = NULL;
|
pVnodeFiles->pDataFileData = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
tclose(pVnodeFiles->headerFd);
|
tclose(pVnodeFiles->headerFd);
|
||||||
tclose(pVnodeFiles->dataFd);
|
tclose(pVnodeFiles->dataFd);
|
||||||
tclose(pVnodeFiles->lastFd);
|
tclose(pVnodeFiles->lastFd);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
|
static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
|
||||||
|
@ -3047,7 +3092,7 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1];
|
SQueryFileInfo *pVnodeFiles = &pRuntimeEnv->pVnodeFiles[pRuntimeEnv->numOfFiles - 1];
|
||||||
int32_t ret = vnodeOpenVnodeDBFiles(pQInfo, pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix);
|
int32_t ret = vnodeOpenVnodeDBFiles(pVnodeFiles, fid, vnodeId, pEntry->d_name, dbFilePathPrefix);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information
|
memset(pVnodeFiles, 0, sizeof(SQueryFileInfo)); // reset information
|
||||||
pRuntimeEnv->numOfFiles -= 1;
|
pRuntimeEnv->numOfFiles -= 1;
|
||||||
|
@ -3767,7 +3812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (VALIDFD(pSupporter->meterOutputFd)) {
|
if (FD_VALID(pSupporter->meterOutputFd)) {
|
||||||
assert(pSupporter->meterOutputMMapBuf != NULL);
|
assert(pSupporter->meterOutputMMapBuf != NULL);
|
||||||
dTrace("QInfo:%p disk-based output buffer during query:%lld bytes", pQInfo, pSupporter->bufSize);
|
dTrace("QInfo:%p disk-based output buffer during query:%lld bytes", pQInfo, pSupporter->bufSize);
|
||||||
munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
|
munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
|
||||||
|
@ -3868,7 +3913,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
||||||
getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
|
getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
|
||||||
pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
|
pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
|
||||||
|
|
||||||
if (!VALIDFD(pSupporter->meterOutputFd)) {
|
if (!FD_VALID(pSupporter->meterOutputFd)) {
|
||||||
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
||||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -5492,7 +5537,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
|
||||||
|
|
||||||
SVnodeObj *pVnode = &vnodeList[vid];
|
SVnodeObj *pVnode = &vnodeList[vid];
|
||||||
|
|
||||||
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
|
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex);
|
||||||
if (pHeaderFileData == NULL) { // failed to load header file into buffer
|
if (pHeaderFileData == NULL) { // failed to load header file into buffer
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -6491,7 +6536,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) {
|
||||||
return loadPrimaryTS;
|
return loadPrimaryTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, int8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
|
int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv,
|
||||||
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
|
int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
|
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
|
||||||
|
@ -6979,7 +7024,7 @@ int32_t vnodeCopyQueryResultToMsg(void *handle, char *data, int32_t numOfRows) {
|
||||||
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
|
int32_t fd = open(pQuery->sdata[0]->data, O_RDONLY, 0666);
|
||||||
|
|
||||||
// make sure file exist
|
// make sure file exist
|
||||||
if (VALIDFD(fd)) {
|
if (FD_VALID(fd)) {
|
||||||
size_t s = lseek(fd, 0, SEEK_END);
|
size_t s = lseek(fd, 0, SEEK_END);
|
||||||
dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
|
dTrace("QInfo:%p ts comp data return, file:%s, size:%lld", pQInfo, pQuery->sdata[0]->data, s);
|
||||||
|
|
||||||
|
|
|
@ -290,8 +290,9 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
|
||||||
pSummary->numOfFiles++;
|
pSummary->numOfFiles++;
|
||||||
|
|
||||||
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
|
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
|
||||||
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
|
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, vnodeId, fileIdx);
|
||||||
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
|
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
|
||||||
|
fid += step;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue