From 14be179081d795e5b293ad6b343dc1ce0f11b455 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 16 Dec 2019 16:18:51 +0800 Subject: [PATCH] fix bug #929.[TBASE-1349] --- src/system/detail/inc/vnode.h | 2 - src/system/detail/inc/vnodeRead.h | 1 + src/system/detail/src/vnodeQueryImpl.c | 356 ++++++++++++++----------- src/system/detail/src/vnodeRead.c | 2 - 4 files changed, 203 insertions(+), 158 deletions(-) diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 3a5e7e7260..3ffbb6ab55 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -267,9 +267,7 @@ typedef struct SQuery { int16_t checkBufferInLoop; // check if the buffer is full during scan each block SLimitVal limit; int32_t rowSize; - int32_t dataRowSize; // row size of each loaded data from disk, the value is - // used for prepare buffer SSqlGroupbyExpr * pGroupbyExpr; SSqlFunctionExpr * pSelectExpr; SColumnInfoEx * colList; diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 3a1b6f7949..8011595e41 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -40,6 +40,7 @@ typedef struct SQueryLoadBlockInfo { int32_t fileId; int32_t slotIdx; int32_t sid; + bool tsLoaded; // if timestamp column of current block is loaded or not } SQueryLoadBlockInfo; typedef struct SQueryLoadCompBlockInfo { diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 45c3385ed0..045f79fa75 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -38,9 +38,16 @@ enum { TS_JOIN_TAG_NOT_EQUALS = 2, }; +enum { + DISK_BLOCK_NO_NEED_TO_LOAD = 0, + DISK_BLOCK_LOAD_TS = 1, + DISK_BLOCK_LOAD_BLOCK = 2, +}; + #define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0) -//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, +// static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t +// offset, // int32_t size); static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); @@ -68,17 +75,17 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete __block_search_fn_t searchFn); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); -static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, - int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, - SField *pFields, __block_search_fn_t searchFn); +static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, + int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, + SField *pFields, __block_search_fn_t searchFn); -static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); +static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, - const SQueryRuntimeEnv *pRuntimeEnv); -static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); -static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); -static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); + const SQueryRuntimeEnv *pRuntimeEnv); +static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); +static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); +static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -121,8 +128,8 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const c return 0; } -static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, char *pBlock, - int32_t vid, TSCKSUM checksum) { +static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, + char *pBlock, int32_t vid, TSCKSUM checksum) { uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock); if (checksum != taosCalcChecksum(0, (uint8_t *)pBlock, size)) { @@ -195,7 +202,8 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj // if vnodeFreeFields is called, the pQuery->pFields is NULL if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && pQuery->pFields != NULL && pQuery->fileId > 0) { - assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); + assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && + pQuery->numOfBlocks > 0); return true; } @@ -216,7 +224,8 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn pCompBlockLoadInfo->fileListIndex = -1; } -static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { +static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, + bool loadPrimaryTS) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; @@ -224,13 +233,20 @@ static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMe if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 && pLoadInfo->sid == pMeterObj->sid) { assert(fileIndex == pLoadInfo->fileListIndex); - return true; + + // previous load operation does not load the primary timestamp column, we only need to load the timestamp column + if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) { + return DISK_BLOCK_LOAD_TS; + } else { + return DISK_BLOCK_NO_NEED_TO_LOAD; + } } - return false; + return DISK_BLOCK_LOAD_BLOCK; } -static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { +static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, + bool tsLoaded) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; @@ -238,6 +254,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj pLoadInfo->slotIdx = pQuery->slot; pLoadInfo->fileListIndex = fileIndex; pLoadInfo->sid = pMeterObj->sid; + pLoadInfo->tsLoaded = tsLoaded; } static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { @@ -247,35 +264,35 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } -static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { +static void vnodeSetOpenedFileNames(SQueryFilesInfo *pVnodeFilesInfo) { assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles); - - SHeaderFileInfo* pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; - + + SHeaderFileInfo *pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; + /* * set the full file path for current opened files * the maximum allowed path string length is PATH_MAX in Linux, 100 bytes is used to * suppress the compiler warnings */ - char str[PATH_MAX + 100] = {0}; + char str[PATH_MAX + 100] = {0}; int32_t PATH_WITH_EXTRA = PATH_MAX + 100; - + int32_t vnodeId = pVnodeFilesInfo->vnodeId; int32_t fileId = pCurrentFileInfo->fileID; - + int32_t len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->headerFilePath, str, PATH_MAX); - + len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->dataFilePath, str, PATH_MAX); - + len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->lastFilePath, str, PATH_MAX); } @@ -287,31 +304,31 @@ static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { * @return */ static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) { - SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg; + SVnodeCfg *pVnodeCfg = &vnodeList[vnodeId].cfg; return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); } -static bool checkIsHeaderFileEmpty(SQueryFilesInfo* pVnodeFilesInfo, int32_t vnodeId) { +static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo, int32_t vnodeId) { struct stat fstat = {0}; if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) { return true; } - + pVnodeFilesInfo->headFileSize = fstat.st_size; - + return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize); } -static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { +static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { tclose(pVnodeFilesInfo->headerFd); tclose(pVnodeFilesInfo->dataFd); tclose(pVnodeFilesInfo->lastFd); } -static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { +static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { pVnodeFilesInfo->current = -1; pVnodeFilesInfo->headFileSize = -1; - + pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value pVnodeFilesInfo->dataFd = FD_INITIALIZER; pVnodeFilesInfo->lastFd = FD_INITIALIZER; @@ -320,15 +337,15 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { /* * clean memory and other corresponding resources are delegated to invoker */ -static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileInfo, int32_t vnodeId) { - SHeaderFileInfo* pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; - +static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo, int32_t vnodeId) { + SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; + pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->headerFd)) { dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno)); return -1; } - + /* * current header file is empty or broken, return directly. * @@ -339,55 +356,54 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileIn if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) { qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize); - + return -1; } - + pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->dataFd)) { dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno)); return -1; } - + pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->lastFd)) { dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno)); return -1; } - - pVnodeFileInfo->pHeaderFileData = mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, - pVnodeFileInfo->headerFd, 0); - + + pVnodeFileInfo->pHeaderFileData = + mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0); + if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) { pVnodeFileInfo->pHeaderFileData = NULL; - + doCloseQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo); - + dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath, pVnodeFileInfo->headFileSize, strerror(errno)); - + return -1; } else { if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) { dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); } } - + return TSDB_CODE_SUCCESS; } -static void doUnmapHeaderFile(SQueryFilesInfo* pVnodeFileInfo) { +static void doUnmapHeaderFile(SQueryFilesInfo *pVnodeFileInfo) { munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize); pVnodeFileInfo->pHeaderFileData = NULL; pVnodeFileInfo->headFileSize = -1; } -static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) { +static void doCloseOpenedFileData(SQueryFilesInfo *pVnodeFileInfo) { if (pVnodeFileInfo->current >= 0) { - assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); - + doUnmapHeaderFile(pVnodeFileInfo); doCloseQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo); @@ -412,22 +428,22 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; - + if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) { if (pVnodeFileInfo->current >= 0) { assert(pVnodeFileInfo->pHeaderFileData != NULL); } - + // do close the current memory mapped header file and corresponding fd doCloseOpenedFileData(pVnodeFileInfo); assert(pVnodeFileInfo->pHeaderFileData == NULL); - + // set current opened file Index pVnodeFileInfo->current = fileIndex; - + // set the current opened files(header, data, last) path vnodeSetOpenedFileNames(pVnodeFileInfo); - + if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. return pVnodeFileInfo->pHeaderFileData; @@ -445,7 +461,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); - SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; + SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; int64_t st = taosGetTimestampUs(); @@ -466,7 +482,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim if (data == NULL) { return -1; // failed to load the header file data into memory } - + #else char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN); read(fd, data, tmsize + TSDB_FILE_HEADER_LEN); @@ -487,7 +503,8 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim } // corrupted file may cause the invalid compInfoOffset, check needs - if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) { + if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, + getCompHeaderStartPosition(pCfg)) < 0) { return -1; } @@ -751,8 +768,7 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo // load checksum TSCKSUM checksum = 0; - ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, - sizeof(TSCKSUM)); + ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, sizeof(TSCKSUM)); if (ret != 0) { return ret; } @@ -774,11 +790,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo } static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); - SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; + SQuery * pQuery = pRuntimeEnv->pQuery; + SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); + SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo; - + size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM); // if *pField != NULL, this block is loaded once, in current query do nothing @@ -822,6 +838,21 @@ static void fillWithNull(SQuery *pQuery, char *dst, int32_t col, int32_t numOfPo setNullN(dst, type, bytes, numOfPoints); } +static int32_t loadPrimaryTSColumn(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField, + int32_t *columnBytes) { + SQuery *pQuery = pRuntimeEnv->pQuery; + assert(PRIMARY_TSCOL_LOADED(pQuery) == false); + + if (columnBytes != NULL) { + (*columnBytes) += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); + } + + int32_t ret = loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, + pRuntimeEnv->primaryColBuffer, pRuntimeEnv->unzipBuffer, + pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); + return ret; +} + static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, bool loadPrimaryCol, bool loadSField) { int32_t i = 0, j = 0; @@ -831,16 +862,40 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR SData ** sdata = pRuntimeEnv->colDataBuffer; assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); + + SData **primaryTSBuf = &pRuntimeEnv->primaryColBuffer; + void * tmpBuf = pRuntimeEnv->unzipBuffer; + int32_t columnBytes = 0; + + SQueryCostSummary *pSummary = &pRuntimeEnv->summary; + + int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); + if (status == DISK_BLOCK_NO_NEED_TO_LOAD) { + dTrace( + "QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d, " + "brange:%lld-%lld, rows:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol, + pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); - SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer; - void * tmpBuf = pRuntimeEnv->unzipBuffer; + if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) { + loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); + } + + return TSDB_CODE_SUCCESS; + } else if (status == DISK_BLOCK_LOAD_TS) { + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId); + + assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true); + if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { + loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); + } - if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx)) { - dTrace("QInfo:%p vid:%d sid:%d id:%s, data block has been loaded, ts:%d, slot:%d, brange:%lld-%lld, rows:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, loadPrimaryCol, pQuery->slot, - pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); - - return 0; + // load primary timestamp + int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes); + + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); + return ret; } /* failed to load fields info, return with error info */ @@ -848,21 +903,15 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR return -1; } - SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - int32_t columnBytes = 0; - int64_t st = taosGetTimestampUs(); if (loadPrimaryCol) { if (PRIMARY_TSCOL_LOADED(pQuery)) { *primaryTSBuf = sdata[0]; } else { - columnBytes += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); - int32_t ret = - loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf, - tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); - if (ret != 0) { - return -1; + int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes); + if (ret != TSDB_CODE_SUCCESS) { + return ret; } pSummary->numOfSeek++; @@ -936,7 +985,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR pSummary->loadBlocksUs += (et - st); pSummary->readDiskBlocks++; - vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx); + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); return ret; } @@ -1848,8 +1897,8 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, return -1; } - SQueryFilesInfo* pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; - + SQueryFilesInfo *pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; + /* set the initial file for current query */ if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) { *fid = pVnodeFiles->pFileInfo[0].fileID; @@ -2967,7 +3016,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p * currently opened file is not the start file, reset to the start file */ int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order); - if (fileIdx < 0) { // ignore the files on disk + if (fileIdx < 0) { // ignore the files on disk dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId); position->fileId = -1; return -1; @@ -3061,7 +3110,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo); pVnodeFilesInfo->vnodeId = vnodeId; - + sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); if (pDir == NULL) { @@ -3116,10 +3165,11 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { closedir(pDir); dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, - pVnodeFilesInfo->dbFilePathPrefix); + pVnodeFilesInfo->dbFilePathPrefix); /* order the files information according their names */ - qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), file_order_comparator); + qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), + file_order_comparator); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) { @@ -3602,9 +3652,9 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu int32_t offset = 0; - for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i, ++j) { - pInterpoSupport->pPrevPoint[j] = prev + offset; - pInterpoSupport->pNextPoint[j] = next + offset; + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + pInterpoSupport->pPrevPoint[i] = prev + offset; + pInterpoSupport->pNextPoint[i] = next + offset; offset += pQuery->colList[i].data.bytes; } @@ -3702,7 +3752,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pQuery->lastKey = pQuery->skey; doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); - + vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3871,7 +3921,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->pointsRead = 0; changeExecuteScanOrder(pQuery, true); - + doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3933,16 +3983,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - + pSupporter->numOfPages = pSupporter->numOfMeters; ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); if (ret != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, - strerror(errno)); + strerror(errno)); return TSDB_CODE_SERV_NO_DISKSPACE; } - + pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->lastPageId = -1; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; @@ -4026,7 +4076,7 @@ TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) { /* * NOTE: pQuery->pos will not change, the corresponding data block will be loaded into buffer * loadDataBlockOnDemand will change the value of pQuery->pos, according to the pQuery->lastKey - * */ + */ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4045,23 +4095,16 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { bool loadTimestamp = true; int32_t fileId = pQuery->fileId; int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order); - - if (!vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIndex)) { - dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); - - // todo handle failed to load data, file corrupted - // todo refactor the return value - int32_t ret = + + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); + + int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true); - UNUSED(ret); + if (ret != TSDB_CODE_SUCCESS) { + return -1; } - - // the fields info is not loaded, load it into memory - if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { - loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); - } - + SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus); @@ -4757,17 +4800,17 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); - if (ret < 0) { // not enough disk space to save the data into disk + if (ret < 0) { // not enough disk space to save the data into disk return -1; } - + pSupporter->subgroupIdx += 1; // this group generates at least one result, return results if (ret > 0) { break; } - + assert(pSupporter->numOfGroupResultPages == 0); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); } @@ -4784,7 +4827,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) // current results of group has been sent to client, try next group if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) { - return; // failed to save data in the disk + return; // failed to save data in the disk } // set current query completed @@ -4866,7 +4909,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { return -1; } - + resetMergeResultBuf(pQuery, pCtx); } @@ -4914,11 +4957,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery if (buffer[0]->numOfElems != 0) { // there are data in buffer if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { - dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile); + dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), + pSupporter->extBufFile); tfree(pTree); tfree(pValidMeter); tfree(posArray); - + return -1; } } @@ -4939,11 +4983,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { +static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); - - SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery); - + + SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); + int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); pSupporter->numOfPages = numOfPages; @@ -4957,26 +5001,27 @@ static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSuppo strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; pQInfo->killed = 1; - + return pQInfo->code; } pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); - + if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; - + return pQInfo->code; } - + return TSDB_CODE_SUCCESS; } -int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { +int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, + const SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); @@ -5039,7 +5084,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { if (ret != TSDB_CODE_SUCCESS) { return ret; } - + ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -5047,7 +5092,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { } } } - + return TSDB_CODE_SUCCESS; } @@ -5355,6 +5400,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { // usually this load operation will incure load disk block operation TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos); + assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) || (!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey)); @@ -5600,18 +5646,19 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - + SVnodeObj *pVnode = &vnodeList[vid]; - char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); - if (pHeaderFileData == NULL) { // failed to load header file into buffer + char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); + if (pHeaderFileData == NULL) { // failed to load header file into buffer return 0; } - + int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); // file is corrupted, abort query in current file - if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < 0) { + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < + 0) { *numOfMeters = 0; return 0; } @@ -5754,7 +5801,7 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY } } -static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { +static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { return NULL; @@ -5765,9 +5812,10 @@ static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter return getFilePage(pSupporter, *pageId); } -tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { - uint32_t pageId = 0; - +tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, + SMeterQuerySupportObj *pSupporter) { + uint32_t pageId = 0; + tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results return NULL; @@ -5907,7 +5955,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, * @return */ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData, - int32_t numOfMeters, const char* filePath, SMeterDataInfo **pMeterDataInfo) { + int32_t numOfMeters, const char *filePath, SMeterDataInfo **pMeterDataInfo) { uint32_t numOfBlocks = 0; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; @@ -6273,8 +6321,8 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; - SQuery* pQuery = pRuntimeEnv->pQuery; - + SQuery *pQuery = pRuntimeEnv->pQuery; + // in the first scan, new space needed for results if (pMeterQueryInfo->numOfPages == 0) { pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); @@ -6289,7 +6337,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete } } } - + if (pData == NULL) { return -1; } @@ -6298,12 +6346,12 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; } - + return TSDB_CODE_SUCCESS; } int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, - SMeterQueryInfo *pMeterQueryInfo) { + SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; if (IS_MASTER_SCAN(pRuntimeEnv)) { @@ -6620,8 +6668,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) { int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; @@ -6826,7 +6874,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); #endif } - + return TSDB_CODE_SUCCESS; } @@ -7077,7 +7125,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SQuery * pQuery = &pQInfo->query; int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; - + // for metric query, bufIndex always be 0. for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 int32_t bytes = pQuery->pSelectExpr[col].resBytes; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index ccd59f356b..d062f1c712 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -193,8 +193,6 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM } else { pQuery->colList[i].data.filters = NULL; } - - pQuery->dataRowSize += colList[i].bytes; } vnodeUpdateQueryColumnIndex(pQuery, pMeterObj);