Merge pull request #948 from taosdata/feature/liaohj
fix bug in issue #946 [tbase-1363]
This commit is contained in:
commit
b9e05f05a7
|
@ -264,7 +264,6 @@ void *taosProcessAlarmSignal(void *tharg) {
|
||||||
callback(0);
|
callback(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(0);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -372,24 +372,24 @@ static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileIn
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnodeFileInfo->pHeaderFileData =
|
// pVnodeFileInfo->pHeaderFileData =
|
||||||
mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0);
|
// mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0);
|
||||||
|
//
|
||||||
if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) {
|
// if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) {
|
||||||
pVnodeFileInfo->pHeaderFileData = NULL;
|
// pVnodeFileInfo->pHeaderFileData = NULL;
|
||||||
|
//
|
||||||
doCloseQueryFileInfoFD(pVnodeFileInfo);
|
// doCloseQueryFileInfoFD(pVnodeFileInfo);
|
||||||
doInitQueryFileInfoFD(pVnodeFileInfo);
|
// doInitQueryFileInfoFD(pVnodeFileInfo);
|
||||||
|
//
|
||||||
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath,
|
// dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath,
|
||||||
pVnodeFileInfo->headFileSize, strerror(errno));
|
// pVnodeFileInfo->headFileSize, strerror(errno));
|
||||||
|
//
|
||||||
return -1;
|
// return -1;
|
||||||
} else {
|
// } else {
|
||||||
if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
|
// if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) {
|
||||||
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
|
// dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -431,7 +431,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
|
||||||
|
|
||||||
if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) {
|
if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) {
|
||||||
if (pVnodeFileInfo->current >= 0) {
|
if (pVnodeFileInfo->current >= 0) {
|
||||||
assert(pVnodeFileInfo->pHeaderFileData != NULL);
|
// assert(pVnodeFileInfo->pHeaderFileData != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// do close the current memory mapped header file and corresponding fd
|
// do close the current memory mapped header file and corresponding fd
|
||||||
|
@ -450,7 +450,7 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pVnodeFileInfo->pHeaderFileData;
|
return 1;//pVnodeFileInfo->pHeaderFileData;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -488,15 +488,16 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
read(fd, data, tmsize + TSDB_FILE_HEADER_LEN);
|
read(fd, data, tmsize + TSDB_FILE_HEADER_LEN);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int64_t offset = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pMeterObj->sid;
|
||||||
|
|
||||||
|
#if 0
|
||||||
// check the offset value integrity
|
// check the offset value integrity
|
||||||
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, data,
|
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, data,
|
||||||
getCompHeaderSegSize(pCfg)) < 0) {
|
getCompHeaderSegSize(pCfg)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t offset = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pMeterObj->sid;
|
|
||||||
SCompHeader *compHeader = (SCompHeader *)(data + offset);
|
SCompHeader *compHeader = (SCompHeader *)(data + offset);
|
||||||
|
|
||||||
// no data in this file for specified meter, abort
|
// no data in this file for specified meter, abort
|
||||||
if (compHeader->compInfoOffset == 0) {
|
if (compHeader->compInfoOffset == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -507,14 +508,29 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
getCompHeaderStartPosition(pCfg)) < 0) {
|
getCompHeaderStartPosition(pCfg)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
char* buf = calloc(1, getCompHeaderSegSize(pCfg));
|
||||||
|
SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo;
|
||||||
|
|
||||||
#if 1
|
lseek(pVnodeFileInfo->headerFd, TSDB_FILE_HEADER_LEN, SEEK_SET);
|
||||||
|
read(pVnodeFileInfo->headerFd, buf, getCompHeaderSegSize(pCfg));
|
||||||
|
|
||||||
|
// check the offset value integrity
|
||||||
|
if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, pMeterObj->vnode, buf - TSDB_FILE_HEADER_LEN,
|
||||||
|
getCompHeaderSegSize(pCfg)) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if 0
|
||||||
SCompInfo *compInfo = (SCompInfo *)(data + compHeader->compInfoOffset);
|
SCompInfo *compInfo = (SCompInfo *)(data + compHeader->compInfoOffset);
|
||||||
#else
|
#else
|
||||||
lseek(fd, compHeader->compInfoOffset, SEEK_SET);
|
SCompHeader *compHeader = (SCompHeader *)(buf + sizeof(SCompHeader) * pMeterObj->sid);
|
||||||
|
lseek(pVnodeFileInfo->headerFd, compHeader->compInfoOffset, SEEK_SET);
|
||||||
|
|
||||||
SCompInfo CompInfo = {0};
|
SCompInfo CompInfo = {0};
|
||||||
SCompInfo *compInfo = &CompInfo;
|
SCompInfo *compInfo = &CompInfo;
|
||||||
read(fd, compInfo, sizeof(SCompInfo));
|
read(pVnodeFileInfo->headerFd, compInfo, sizeof(SCompInfo));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// check compblock info integrity
|
// check compblock info integrity
|
||||||
|
@ -542,19 +558,19 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
|
|
||||||
memset(pQuery->pBlock, 0, (size_t)pQuery->blockBufferSize);
|
memset(pQuery->pBlock, 0, (size_t)pQuery->blockBufferSize);
|
||||||
|
|
||||||
#if 1
|
#if 0
|
||||||
memcpy(pQuery->pBlock, (char *)compInfo + sizeof(SCompInfo), (size_t)compBlockSize);
|
memcpy(pQuery->pBlock, (char *)compInfo + sizeof(SCompInfo), (size_t)compBlockSize);
|
||||||
TSCKSUM checksum = *(TSCKSUM *)((char *)compInfo + sizeof(SCompInfo) + compBlockSize);
|
TSCKSUM checksum = *(TSCKSUM *)((char *)compInfo + sizeof(SCompInfo) + compBlockSize);
|
||||||
#else
|
#else
|
||||||
TSCKSUM checksum;
|
TSCKSUM checksum;
|
||||||
read(fd, pQuery->pBlock, compBlockSize);
|
read(pVnodeFileInfo->headerFd, pQuery->pBlock, compBlockSize);
|
||||||
read(fd, &checksum, sizeof(TSCKSUM));
|
read(pVnodeFileInfo->headerFd, &checksum, sizeof(TSCKSUM));
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// check comp block integrity
|
// check comp block integrity
|
||||||
if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, compInfo, (char *)pQuery->pBlock,
|
if (validateCompBlockSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, compInfo, (char *)pQuery->pBlock,
|
||||||
pMeterObj->vnode, checksum) < 0) {
|
pMeterObj->vnode, checksum) < 0) {
|
||||||
return -1;
|
return -1; //TODO free resource in error process
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->pFields = (SField **)((char *)pQuery->pBlock + compBlockSize);
|
pQuery->pFields = (SField **)((char *)pQuery->pBlock + compBlockSize);
|
||||||
|
@ -568,6 +584,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
pSummary->totalCompInfoSize += compBlockSize;
|
pSummary->totalCompInfoSize += compBlockSize;
|
||||||
pSummary->loadCompInfoUs += (et - st);
|
pSummary->loadCompInfoUs += (et - st);
|
||||||
|
|
||||||
|
free(buf);
|
||||||
return pQuery->numOfBlocks;
|
return pQuery->numOfBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1411,11 +1428,12 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
||||||
*
|
*
|
||||||
* first filter the data block according to the value filter condition, then, if the top/bottom query applied,
|
* first filter the data block according to the value filter condition, then, if the top/bottom query applied,
|
||||||
* invoke the filter function to decide if the data block need to be accessed or not.
|
* invoke the filter function to decide if the data block need to be accessed or not.
|
||||||
|
* TODO handle the whole data block is NULL situation
|
||||||
* @param pQuery
|
* @param pQuery
|
||||||
* @param pField
|
* @param pField
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *pCtx) {
|
static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *pCtx, int32_t numOfTotalPoints) {
|
||||||
if (pField == NULL) {
|
if (pField == NULL) {
|
||||||
return false; // no need to load data
|
return false; // no need to load data
|
||||||
}
|
}
|
||||||
|
@ -1434,6 +1452,11 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all points in current column are NULL, no need to check its boundary value
|
||||||
|
if (pField[colIndex].numOfNullPoints == numOfTotalPoints) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pFilterInfo->info.data.type == TSDB_DATA_TYPE_FLOAT) {
|
if (pFilterInfo->info.data.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
float minval = *(double *)(&pField[colIndex].min);
|
float minval = *(double *)(&pField[colIndex].min);
|
||||||
float maxval = *(double *)(&pField[colIndex].max);
|
float maxval = *(double *)(&pField[colIndex].max);
|
||||||
|
@ -1968,6 +1991,7 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// failed to mmap header file into memory will cause the retrieval of compblock info failed
|
// failed to mmap header file into memory will cause the retrieval of compblock info failed
|
||||||
if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) {
|
if (vnodeGetCompBlockInfo(pMeterObj, pRuntimeEnv, fid) > 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -6724,7 +6748,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
|
||||||
* filter the data block according to the value filter condition.
|
* filter the data block according to the value filter condition.
|
||||||
* no need to load the data block, continue for next block
|
* no need to load the data block, continue for next block
|
||||||
*/
|
*/
|
||||||
if (!needToLoadDataBlock(pQuery, *pFields, pRuntimeEnv->pCtx)) {
|
if (!needToLoadDataBlock(pQuery, *pFields, pRuntimeEnv->pCtx, pBlock->numOfPoints)) {
|
||||||
#if defined(_DEBUG_VIEW)
|
#if defined(_DEBUG_VIEW)
|
||||||
dTrace("QInfo:%p fileId:%d, slot:%d, block discarded by per-filter, ", GET_QINFO_ADDR(pQuery), pQuery->fileId,
|
dTrace("QInfo:%p fileId:%d, slot:%d, block discarded by per-filter, ", GET_QINFO_ADDR(pQuery), pQuery->fileId,
|
||||||
pQuery->slot);
|
pQuery->slot);
|
||||||
|
|
Loading…
Reference in New Issue