From 14be179081d795e5b293ad6b343dc1ce0f11b455 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Mon, 16 Dec 2019 16:18:51 +0800 Subject: [PATCH 1/8] fix bug #929.[TBASE-1349] --- src/system/detail/inc/vnode.h | 2 - src/system/detail/inc/vnodeRead.h | 1 + src/system/detail/src/vnodeQueryImpl.c | 356 ++++++++++++++----------- src/system/detail/src/vnodeRead.c | 2 - 4 files changed, 203 insertions(+), 158 deletions(-) diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 3a5e7e7260..3ffbb6ab55 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -267,9 +267,7 @@ typedef struct SQuery { int16_t checkBufferInLoop; // check if the buffer is full during scan each block SLimitVal limit; int32_t rowSize; - int32_t dataRowSize; // row size of each loaded data from disk, the value is - // used for prepare buffer SSqlGroupbyExpr * pGroupbyExpr; SSqlFunctionExpr * pSelectExpr; SColumnInfoEx * colList; diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index 3a1b6f7949..8011595e41 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -40,6 +40,7 @@ typedef struct SQueryLoadBlockInfo { int32_t fileId; int32_t slotIdx; int32_t sid; + bool tsLoaded; // if timestamp column of current block is loaded or not } SQueryLoadBlockInfo; typedef struct SQueryLoadCompBlockInfo { diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 45c3385ed0..045f79fa75 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -38,9 +38,16 @@ enum { TS_JOIN_TAG_NOT_EQUALS = 2, }; +enum { + DISK_BLOCK_NO_NEED_TO_LOAD = 0, + DISK_BLOCK_LOAD_TS = 1, + DISK_BLOCK_LOAD_BLOCK = 2, +}; + #define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0) -//static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, +// static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t +// offset, // int32_t size); static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); @@ -68,17 +75,17 @@ static void doApplyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMete __block_search_fn_t searchFn); static int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t numOfResult); -static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, - int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, - SField *pFields, __block_search_fn_t searchFn); +static void applyIntervalQueryOnBlock(SMeterQuerySupportObj *pSupporter, SMeterDataInfo *pInfoEx, char *data, + int64_t *pPrimaryData, SBlockInfo *pBlockInfo, int32_t blockStatus, + SField *pFields, __block_search_fn_t searchFn); -static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); +static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx); static int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, - const SQueryRuntimeEnv *pRuntimeEnv); -static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); -static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); -static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); + const SQueryRuntimeEnv *pRuntimeEnv); +static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes); +static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid); +static void getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn); +static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -121,8 +128,8 @@ static FORCE_INLINE int32_t validateCompBlockInfoSegment(SQInfo *pQInfo, const c return 0; } -static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, char *pBlock, - int32_t vid, TSCKSUM checksum) { +static FORCE_INLINE int32_t validateCompBlockSegment(SQInfo *pQInfo, const char *filePath, SCompInfo *compInfo, + char *pBlock, int32_t vid, TSCKSUM checksum) { uint32_t size = compInfo->numOfBlocks * sizeof(SCompBlock); if (checksum != taosCalcChecksum(0, (uint8_t *)pBlock, size)) { @@ -195,7 +202,8 @@ static bool vnodeIsCompBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj // if vnodeFreeFields is called, the pQuery->pFields is NULL if (pLoadCompBlockInfo->fileListIndex == fileIndex && pLoadCompBlockInfo->sid == pMeterObj->sid && pQuery->pFields != NULL && pQuery->fileId > 0) { - assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && pQuery->numOfBlocks > 0); + assert(pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex].fileID == pLoadCompBlockInfo->fileId && + pQuery->numOfBlocks > 0); return true; } @@ -216,7 +224,8 @@ static void vnodeInitLoadCompBlockInfo(SQueryLoadCompBlockInfo *pCompBlockLoadIn pCompBlockLoadInfo->fileListIndex = -1; } -static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { +static int32_t vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, + bool loadPrimaryTS) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; @@ -224,13 +233,20 @@ static bool vnodeIsDatablockLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMe if (pLoadInfo->fileId == pQuery->fileId && pLoadInfo->slotIdx == pQuery->slot && pQuery->slot != -1 && pLoadInfo->sid == pMeterObj->sid) { assert(fileIndex == pLoadInfo->fileListIndex); - return true; + + // previous load operation does not load the primary timestamp column, we only need to load the timestamp column + if (pLoadInfo->tsLoaded == false && pLoadInfo->tsLoaded != loadPrimaryTS) { + return DISK_BLOCK_LOAD_TS; + } else { + return DISK_BLOCK_NO_NEED_TO_LOAD; + } } - return false; + return DISK_BLOCK_LOAD_BLOCK; } -static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex) { +static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj, int32_t fileIndex, + bool tsLoaded) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryLoadBlockInfo *pLoadInfo = &pRuntimeEnv->loadBlockInfo; @@ -238,6 +254,7 @@ static void vnodeSetDataBlockInfoLoaded(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj pLoadInfo->slotIdx = pQuery->slot; pLoadInfo->fileListIndex = fileIndex; pLoadInfo->sid = pMeterObj->sid; + pLoadInfo->tsLoaded = tsLoaded; } static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { @@ -247,35 +264,35 @@ static void vnodeInitDataBlockInfo(SQueryLoadBlockInfo *pBlockLoadInfo) { pBlockLoadInfo->fileListIndex = -1; } -static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { +static void vnodeSetOpenedFileNames(SQueryFilesInfo *pVnodeFilesInfo) { assert(pVnodeFilesInfo->current >= 0 && pVnodeFilesInfo->current < pVnodeFilesInfo->numOfFiles); - - SHeaderFileInfo* pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; - + + SHeaderFileInfo *pCurrentFileInfo = &pVnodeFilesInfo->pFileInfo[pVnodeFilesInfo->current]; + /* * set the full file path for current opened files * the maximum allowed path string length is PATH_MAX in Linux, 100 bytes is used to * suppress the compiler warnings */ - char str[PATH_MAX + 100] = {0}; + char str[PATH_MAX + 100] = {0}; int32_t PATH_WITH_EXTRA = PATH_MAX + 100; - + int32_t vnodeId = pVnodeFilesInfo->vnodeId; int32_t fileId = pCurrentFileInfo->fileID; - + int32_t len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.head", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->headerFilePath, str, PATH_MAX); - + len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.data", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->dataFilePath, str, PATH_MAX); - + len = snprintf(str, PATH_WITH_EXTRA, "%sv%df%d.last", pVnodeFilesInfo->dbFilePathPrefix, vnodeId, fileId); assert(len <= PATH_MAX); - + strncpy(pVnodeFilesInfo->lastFilePath, str, PATH_MAX); } @@ -287,31 +304,31 @@ static void vnodeSetOpenedFileNames(SQueryFilesInfo* pVnodeFilesInfo) { * @return */ static FORCE_INLINE bool isHeaderFileEmpty(int32_t vnodeId, size_t headerFileSize) { - SVnodeCfg* pVnodeCfg = &vnodeList[vnodeId].cfg; + SVnodeCfg *pVnodeCfg = &vnodeList[vnodeId].cfg; return headerFileSize <= getCompHeaderStartPosition(pVnodeCfg); } -static bool checkIsHeaderFileEmpty(SQueryFilesInfo* pVnodeFilesInfo, int32_t vnodeId) { +static bool checkIsHeaderFileEmpty(SQueryFilesInfo *pVnodeFilesInfo, int32_t vnodeId) { struct stat fstat = {0}; if (stat(pVnodeFilesInfo->headerFilePath, &fstat) < 0) { return true; } - + pVnodeFilesInfo->headFileSize = fstat.st_size; - + return isHeaderFileEmpty(vnodeId, pVnodeFilesInfo->headFileSize); } -static void doCloseQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { +static void doCloseQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { tclose(pVnodeFilesInfo->headerFd); tclose(pVnodeFilesInfo->dataFd); tclose(pVnodeFilesInfo->lastFd); } -static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { +static void doInitQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { pVnodeFilesInfo->current = -1; pVnodeFilesInfo->headFileSize = -1; - + pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value pVnodeFilesInfo->dataFd = FD_INITIALIZER; pVnodeFilesInfo->lastFd = FD_INITIALIZER; @@ -320,15 +337,15 @@ static void doInitQueryFileInfoFD(SQueryFilesInfo* pVnodeFilesInfo) { /* * clean memory and other corresponding resources are delegated to invoker */ -static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileInfo, int32_t vnodeId) { - SHeaderFileInfo* pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; - +static int32_t doOpenQueryFileData(SQInfo *pQInfo, SQueryFilesInfo *pVnodeFileInfo, int32_t vnodeId) { + SHeaderFileInfo *pHeaderFileInfo = &pVnodeFileInfo->pFileInfo[pVnodeFileInfo->current]; + pVnodeFileInfo->headerFd = open(pVnodeFileInfo->headerFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->headerFd)) { dError("QInfo:%p failed open head file:%s reason:%s", pQInfo, pVnodeFileInfo->headerFilePath, strerror(errno)); return -1; } - + /* * current header file is empty or broken, return directly. * @@ -339,55 +356,54 @@ static int32_t doOpenQueryFileData(SQInfo* pQInfo, SQueryFilesInfo* pVnodeFileIn if (checkIsHeaderFileEmpty(pVnodeFileInfo, vnodeId)) { qTrace("QInfo:%p vid:%d, fileId:%d, index:%d, size:%d, ignore file, empty or broken", pQInfo, pVnodeFileInfo->vnodeId, pHeaderFileInfo->fileID, pVnodeFileInfo->current, pVnodeFileInfo->headFileSize); - + return -1; } - + pVnodeFileInfo->dataFd = open(pVnodeFileInfo->dataFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->dataFd)) { dError("QInfo:%p failed open data file:%s reason:%s", pQInfo, pVnodeFileInfo->dataFilePath, strerror(errno)); return -1; } - + pVnodeFileInfo->lastFd = open(pVnodeFileInfo->lastFilePath, O_RDONLY); if (!FD_VALID(pVnodeFileInfo->lastFd)) { dError("QInfo:%p failed open last file:%s reason:%s", pQInfo, pVnodeFileInfo->lastFilePath, strerror(errno)); return -1; } - - pVnodeFileInfo->pHeaderFileData = mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, - pVnodeFileInfo->headerFd, 0); - + + pVnodeFileInfo->pHeaderFileData = + mmap(NULL, pVnodeFileInfo->headFileSize, PROT_READ, MAP_SHARED, pVnodeFileInfo->headerFd, 0); + if (pVnodeFileInfo->pHeaderFileData == MAP_FAILED) { pVnodeFileInfo->pHeaderFileData = NULL; - + doCloseQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo); - + dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFileInfo->headerFilePath, pVnodeFileInfo->headFileSize, strerror(errno)); - + return -1; } else { if (madvise(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize, MADV_SEQUENTIAL) == -1) { dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno)); } } - + return TSDB_CODE_SUCCESS; } -static void doUnmapHeaderFile(SQueryFilesInfo* pVnodeFileInfo) { +static void doUnmapHeaderFile(SQueryFilesInfo *pVnodeFileInfo) { munmap(pVnodeFileInfo->pHeaderFileData, pVnodeFileInfo->headFileSize); pVnodeFileInfo->pHeaderFileData = NULL; pVnodeFileInfo->headFileSize = -1; } -static void doCloseOpenedFileData(SQueryFilesInfo* pVnodeFileInfo) { +static void doCloseOpenedFileData(SQueryFilesInfo *pVnodeFileInfo) { if (pVnodeFileInfo->current >= 0) { - assert(pVnodeFileInfo->current < pVnodeFileInfo->numOfFiles && pVnodeFileInfo->current >= 0); - + doUnmapHeaderFile(pVnodeFileInfo); doCloseQueryFileInfoFD(pVnodeFileInfo); doInitQueryFileInfoFD(pVnodeFileInfo); @@ -412,22 +428,22 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t vnodeId, int SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); // only for log output SQueryFilesInfo *pVnodeFileInfo = &pRuntimeEnv->vnodeFileInfo; - + if (pVnodeFileInfo->current != fileIndex || pVnodeFileInfo->pHeaderFileData == NULL) { if (pVnodeFileInfo->current >= 0) { assert(pVnodeFileInfo->pHeaderFileData != NULL); } - + // do close the current memory mapped header file and corresponding fd doCloseOpenedFileData(pVnodeFileInfo); assert(pVnodeFileInfo->pHeaderFileData == NULL); - + // set current opened file Index pVnodeFileInfo->current = fileIndex; - + // set the current opened files(header, data, last) path vnodeSetOpenedFileNames(pVnodeFileInfo); - + if (doOpenQueryFileData(pQInfo, pVnodeFileInfo, vnodeId) != TSDB_CODE_SUCCESS) { doCloseOpenedFileData(pVnodeFileInfo); // all the fds may be partially opened, close them anyway. return pVnodeFileInfo->pHeaderFileData; @@ -445,7 +461,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); - SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; + SVnodeCfg * pCfg = &vnodeList[pMeterObj->vnode].cfg; SHeaderFileInfo *pHeadeFileInfo = &pRuntimeEnv->vnodeFileInfo.pFileInfo[fileIndex]; int64_t st = taosGetTimestampUs(); @@ -466,7 +482,7 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim if (data == NULL) { return -1; // failed to load the header file data into memory } - + #else char *data = calloc(1, tmsize + TSDB_FILE_HEADER_LEN); read(fd, data, tmsize + TSDB_FILE_HEADER_LEN); @@ -487,7 +503,8 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim } // corrupted file may cause the invalid compInfoOffset, check needs - if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, getCompHeaderStartPosition(pCfg)) < 0) { + if (validateCompBlockOffset(pQInfo, pMeterObj, compHeader, &pRuntimeEnv->vnodeFileInfo, + getCompHeaderStartPosition(pCfg)) < 0) { return -1; } @@ -751,8 +768,7 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo // load checksum TSCKSUM checksum = 0; - ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, - sizeof(TSCKSUM)); + ret = readDataFromDiskFile(fd, pQInfo, pQueryFileInfo, (char *)&checksum, offset + pFields[col].len, sizeof(TSCKSUM)); if (ret != 0) { return ret; } @@ -774,11 +790,11 @@ static int32_t loadColumnIntoMem(SQuery *pQuery, SQueryFilesInfo *pQueryFileInfo } static int32_t loadDataBlockFieldsInfo(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); - SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; + SQuery * pQuery = pRuntimeEnv->pQuery; + SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); + SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; SQueryFilesInfo *pVnodeFilesInfo = &pRuntimeEnv->vnodeFileInfo; - + size_t size = sizeof(SField) * (pBlock->numOfCols) + sizeof(TSCKSUM); // if *pField != NULL, this block is loaded once, in current query do nothing @@ -822,6 +838,21 @@ static void fillWithNull(SQuery *pQuery, char *dst, int32_t col, int32_t numOfPo setNullN(dst, type, bytes, numOfPoints); } +static int32_t loadPrimaryTSColumn(SQueryRuntimeEnv *pRuntimeEnv, SCompBlock *pBlock, SField **pField, + int32_t *columnBytes) { + SQuery *pQuery = pRuntimeEnv->pQuery; + assert(PRIMARY_TSCOL_LOADED(pQuery) == false); + + if (columnBytes != NULL) { + (*columnBytes) += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); + } + + int32_t ret = loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, + pRuntimeEnv->primaryColBuffer, pRuntimeEnv->unzipBuffer, + pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); + return ret; +} + static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, bool loadPrimaryCol, bool loadSField) { int32_t i = 0, j = 0; @@ -831,16 +862,40 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR SData ** sdata = pRuntimeEnv->colDataBuffer; assert(fileIdx == pRuntimeEnv->vnodeFileInfo.current); + + SData **primaryTSBuf = &pRuntimeEnv->primaryColBuffer; + void * tmpBuf = pRuntimeEnv->unzipBuffer; + int32_t columnBytes = 0; + + SQueryCostSummary *pSummary = &pRuntimeEnv->summary; + + int32_t status = vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); + if (status == DISK_BLOCK_NO_NEED_TO_LOAD) { + dTrace( + "QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, no need to load again, ts:%d, slot:%d, " + "brange:%lld-%lld, rows:%d", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, loadPrimaryCol, + pQuery->slot, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); - SData ** primaryTSBuf = &pRuntimeEnv->primaryColBuffer; - void * tmpBuf = pRuntimeEnv->unzipBuffer; + if (loadSField && (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL)) { + loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); + } + + return TSDB_CODE_SUCCESS; + } else if (status == DISK_BLOCK_LOAD_TS) { + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, data block has been loaded, incrementally load ts", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId); + + assert(PRIMARY_TSCOL_LOADED(pQuery) == false && loadSField == true); + if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { + loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); + } - if (vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIdx)) { - dTrace("QInfo:%p vid:%d sid:%d id:%s, data block has been loaded, ts:%d, slot:%d, brange:%lld-%lld, rows:%d", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, loadPrimaryCol, pQuery->slot, - pBlock->keyFirst, pBlock->keyLast, pBlock->numOfPoints); - - return 0; + // load primary timestamp + int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes); + + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); + return ret; } /* failed to load fields info, return with error info */ @@ -848,21 +903,15 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR return -1; } - SQueryCostSummary *pSummary = &pRuntimeEnv->summary; - int32_t columnBytes = 0; - int64_t st = taosGetTimestampUs(); if (loadPrimaryCol) { if (PRIMARY_TSCOL_LOADED(pQuery)) { *primaryTSBuf = sdata[0]; } else { - columnBytes += (*pField)[PRIMARYKEY_TIMESTAMP_COL_INDEX].len + sizeof(TSCKSUM); - int32_t ret = - loadColumnIntoMem(pQuery, &pRuntimeEnv->vnodeFileInfo, pBlock, *pField, PRIMARYKEY_TIMESTAMP_COL_INDEX, *primaryTSBuf, - tmpBuf, pRuntimeEnv->secondaryUnzipBuffer, pRuntimeEnv->unzipBufSize); - if (ret != 0) { - return -1; + int32_t ret = loadPrimaryTSColumn(pRuntimeEnv, pBlock, pField, &columnBytes); + if (ret != TSDB_CODE_SUCCESS) { + return ret; } pSummary->numOfSeek++; @@ -936,7 +985,7 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR pSummary->loadBlocksUs += (et - st); pSummary->readDiskBlocks++; - vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx); + vnodeSetDataBlockInfoLoaded(pRuntimeEnv, pMeterObj, fileIdx, loadPrimaryCol); return ret; } @@ -1848,8 +1897,8 @@ int32_t vnodeGetVnodeHeaderFileIdx(int32_t *fid, SQueryRuntimeEnv *pRuntimeEnv, return -1; } - SQueryFilesInfo* pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; - + SQueryFilesInfo *pVnodeFiles = &pRuntimeEnv->vnodeFileInfo; + /* set the initial file for current query */ if (order == TSQL_SO_ASC && *fid < pVnodeFiles->pFileInfo[0].fileID) { *fid = pVnodeFiles->pFileInfo[0].fileID; @@ -2967,7 +3016,7 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p * currently opened file is not the start file, reset to the start file */ int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order); - if (fileIdx < 0) { // ignore the files on disk + if (fileIdx < 0) { // ignore the files on disk dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId); position->fileId = -1; return -1; @@ -3061,7 +3110,7 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { SQueryFilesInfo *pVnodeFilesInfo = &(pQInfo->pMeterQuerySupporter->runtimeEnv.vnodeFileInfo); pVnodeFilesInfo->vnodeId = vnodeId; - + sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); if (pDir == NULL) { @@ -3116,10 +3165,11 @@ static void vnodeRecordAllFiles(SQInfo *pQInfo, int32_t vnodeId) { closedir(pDir); dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, - pVnodeFilesInfo->dbFilePathPrefix); + pVnodeFilesInfo->dbFilePathPrefix); /* order the files information according their names */ - qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), file_order_comparator); + qsort(pVnodeFilesInfo->pFileInfo, (size_t)pVnodeFilesInfo->numOfFiles, sizeof(SHeaderFileInfo), + file_order_comparator); } static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, void *pBlock) { @@ -3602,9 +3652,9 @@ void pointInterpSupporterInit(SQuery *pQuery, SPointInterpoSupporter *pInterpoSu int32_t offset = 0; - for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i, ++j) { - pInterpoSupport->pPrevPoint[j] = prev + offset; - pInterpoSupport->pNextPoint[j] = next + offset; + for (int32_t i = 0; i < pQuery->numOfCols; ++i) { + pInterpoSupport->pPrevPoint[i] = prev + offset; + pInterpoSupport->pNextPoint[i] = next + offset; offset += pQuery->colList[i].data.bytes; } @@ -3702,7 +3752,7 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete pQuery->lastKey = pQuery->skey; doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); - + vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3871,7 +3921,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) pQuery->pointsRead = 0; changeExecuteScanOrder(pQuery, true); - + doInitQueryFileInfoFD(&pSupporter->runtimeEnv.vnodeFileInfo); vnodeInitDataBlockInfo(&pSupporter->runtimeEnv.loadBlockInfo); vnodeInitLoadCompBlockInfo(&pSupporter->runtimeEnv.loadCompBlockInfo); @@ -3933,16 +3983,16 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); return TSDB_CODE_SERV_OUT_OF_MEMORY; } - + pSupporter->numOfPages = pSupporter->numOfMeters; ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE); if (ret != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, - strerror(errno)); + strerror(errno)); return TSDB_CODE_SERV_NO_DISKSPACE; } - + pSupporter->runtimeEnv.numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize; pSupporter->lastPageId = -1; pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; @@ -4026,7 +4076,7 @@ TSKEY getTimestampInCacheBlock(SCacheBlock *pBlock, int32_t index) { /* * NOTE: pQuery->pos will not change, the corresponding data block will be loaded into buffer * loadDataBlockOnDemand will change the value of pQuery->pos, according to the pQuery->lastKey - * */ + */ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -4045,23 +4095,16 @@ TSKEY getTimestampInDiskBlock(SQueryRuntimeEnv *pRuntimeEnv, int32_t index) { bool loadTimestamp = true; int32_t fileId = pQuery->fileId; int32_t fileIndex = vnodeGetVnodeHeaderFileIdx(&fileId, pRuntimeEnv, pQuery->order.order); - - if (!vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, fileIndex)) { - dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", - GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); - - // todo handle failed to load data, file corrupted - // todo refactor the return value - int32_t ret = + + dTrace("QInfo:%p vid:%d sid:%d id:%s, fileId:%d, slot:%d load data block due to primary key required", + GET_QINFO_ADDR(pQuery), pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->fileId, pQuery->slot); + + int32_t ret = loadDataBlockIntoMem(pBlock, &pQuery->pFields[pQuery->slot], pRuntimeEnv, fileIndex, loadTimestamp, true); - UNUSED(ret); + if (ret != TSDB_CODE_SUCCESS) { + return -1; } - - // the fields info is not loaded, load it into memory - if (pQuery->pFields == NULL || pQuery->pFields[pQuery->slot] == NULL) { - loadDataBlockFieldsInfo(pRuntimeEnv, pBlock, &pQuery->pFields[pQuery->slot]); - } - + SET_DATA_BLOCK_LOADED(pRuntimeEnv->blockStatus); SET_FILE_BLOCK_FLAG(pRuntimeEnv->blockStatus); @@ -4757,17 +4800,17 @@ int32_t mergeMetersResultToOneGroups(SMeterQuerySupportObj *pSupporter) { int32_t end = pSupporter->pSidSet->starterPos[pSupporter->subgroupIdx + 1]; ret = doMergeMetersResultsToGroupRes(pSupporter, pQuery, pRuntimeEnv, pSupporter->pMeterDataInfo, start, end); - if (ret < 0) { // not enough disk space to save the data into disk + if (ret < 0) { // not enough disk space to save the data into disk return -1; } - + pSupporter->subgroupIdx += 1; // this group generates at least one result, return results if (ret > 0) { break; } - + assert(pSupporter->numOfGroupResultPages == 0); dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pSupporter->subgroupIdx - 1); } @@ -4784,7 +4827,7 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery) // current results of group has been sent to client, try next group if (mergeMetersResultToOneGroups(pSupporter) != TSDB_CODE_SUCCESS) { - return; // failed to save data in the disk + return; // failed to save data in the disk } // set current query completed @@ -4866,7 +4909,7 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { return -1; } - + resetMergeResultBuf(pQuery, pCtx); } @@ -4914,11 +4957,12 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery if (buffer[0]->numOfElems != 0) { // there are data in buffer if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) { - dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), pSupporter->extBufFile); + dError("QInfo:%p failed to flush data into temp file, abort query", GET_QINFO_ADDR(pQuery), + pSupporter->extBufFile); tfree(pTree); tfree(pValidMeter); tfree(posArray); - + return -1; } } @@ -4939,11 +4983,11 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery return pSupporter->numOfGroupResultPages; } -static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { +static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) { assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize); - - SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pQuery); - + + SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); + int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize); pSupporter->numOfPages = numOfPages; @@ -4957,26 +5001,27 @@ static int32_t extendDiskBuf(const SQuery* pQuery, SMeterQuerySupportObj *pSuppo strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE; pQInfo->killed = 1; - + return pQInfo->code; } pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE; pSupporter->meterOutputMMapBuf = mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0); - + if (pSupporter->meterOutputMMapBuf == MAP_FAILED) { dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; pQInfo->killed = 1; - + return pQInfo->code; } - + return TSDB_CODE_SUCCESS; } -int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, const SQueryRuntimeEnv *pRuntimeEnv) { +int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery, + const SQueryRuntimeEnv *pRuntimeEnv) { int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1; int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE + pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1); @@ -5039,7 +5084,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { if (ret != TSDB_CODE_SUCCESS) { return ret; } - + ret = saveResult(pSupporter, pMeterInfo[i].pMeterQInfo, pMeterInfo[i].pMeterQInfo->lastResRows); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -5047,7 +5092,7 @@ int32_t doCloseAllOpenedResults(SMeterQuerySupportObj *pSupporter) { } } } - + return TSDB_CODE_SUCCESS; } @@ -5355,6 +5400,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { // usually this load operation will incure load disk block operation TSKEY endKey = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->endPos); + assert((QUERY_IS_ASC_QUERY(pQuery) && endKey <= pQuery->ekey) || (!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey)); @@ -5600,18 +5646,19 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo ** pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv; - + SVnodeObj *pVnode = &vnodeList[vid]; - char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); - if (pHeaderFileData == NULL) { // failed to load header file into buffer + char *pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, vid, fileIndex); + if (pHeaderFileData == NULL) { // failed to load header file into buffer return 0; } - + int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM); // file is corrupted, abort query in current file - if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < 0) { + if (validateHeaderOffsetSegment(pQInfo, pRuntimeEnv->vnodeFileInfo.headerFilePath, vid, pHeaderFileData, tmsize) < + 0) { *numOfMeters = 0; return 0; } @@ -5754,7 +5801,7 @@ void changeMeterQueryInfoForSuppleQuery(SMeterQueryInfo *pMeterQueryInfo, TSKEY } } -static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { +static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) { if (pSupporter->lastPageId == pSupporter->numOfPages - 1) { if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) { return NULL; @@ -5765,9 +5812,10 @@ static tFilePage *allocNewPage(SQuery* pQuery, SMeterQuerySupportObj *pSupporter return getFilePage(pSupporter, *pageId); } -tFilePage *addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo, SMeterQuerySupportObj *pSupporter) { - uint32_t pageId = 0; - +tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, + SMeterQuerySupportObj *pSupporter) { + uint32_t pageId = 0; + tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId); if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results return NULL; @@ -5907,7 +5955,7 @@ static bool setCurrentQueryRange(SMeterDataInfo *pMeterDataInfo, SQuery *pQuery, * @return */ uint32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery, char *pHeaderData, - int32_t numOfMeters, const char* filePath, SMeterDataInfo **pMeterDataInfo) { + int32_t numOfMeters, const char *filePath, SMeterDataInfo **pMeterDataInfo) { uint32_t numOfBlocks = 0; SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); SQueryCostSummary *pSummary = &pSupporter->runtimeEnv.summary; @@ -6273,8 +6321,8 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; tFilePage * pData = NULL; - SQuery* pQuery = pRuntimeEnv->pQuery; - + SQuery *pQuery = pRuntimeEnv->pQuery; + // in the first scan, new space needed for results if (pMeterQueryInfo->numOfPages == 0) { pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter); @@ -6289,7 +6337,7 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete } } } - + if (pData == NULL) { return -1; } @@ -6298,12 +6346,12 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete pRuntimeEnv->pCtx[i].aOutputBuf = getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, i); pRuntimeEnv->pCtx[i].resultInfo = &pMeterQueryInfo->resultInfo[i]; } - + return TSDB_CODE_SUCCESS; } int32_t setIntervalQueryExecutionContext(SMeterQuerySupportObj *pSupporter, int32_t meterIdx, - SMeterQueryInfo *pMeterQueryInfo) { + SMeterQueryInfo *pMeterQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; if (IS_MASTER_SCAN(pRuntimeEnv)) { @@ -6620,8 +6668,8 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SBlockInfo *pBlockInfo) { int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blkStatus, SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIdx, int32_t slotIdx, __block_search_fn_t searchFn, bool onDemand) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj; + SQuery * pQuery = pRuntimeEnv->pQuery; + SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj; TSKEY *primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data; @@ -6826,7 +6874,7 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue tColModelDisplay(cm, outputPage->data, outputPage->numOfElems, pRuntimeEnv->numOfRowsPerPage); #endif } - + return TSDB_CODE_SUCCESS; } @@ -7077,7 +7125,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data SQuery * pQuery = &pQInfo->query; int tnumOfRows = vnodeList[pObj->vnode].cfg.rowsInFileBlock; - + // for metric query, bufIndex always be 0. for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { // pQInfo->bufIndex == 0 int32_t bytes = pQuery->pSelectExpr[col].resBytes; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index ccd59f356b..d062f1c712 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -193,8 +193,6 @@ static SQInfo *vnodeAllocateQInfoCommon(SQueryMeterMsg *pQueryMsg, SMeterObj *pM } else { pQuery->colList[i].data.filters = NULL; } - - pQuery->dataRowSize += colList[i].bytes; } vnodeUpdateQueryColumnIndex(pQuery, pMeterObj); From 05ce45e76ed5aa0b18fa57c5fab455030583d271 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 13:46:31 +0800 Subject: [PATCH 2/8] fix the bugs in issue #932. [tbase-1353] --- src/client/src/tscServer.c | 105 +++++++++++++++++------------ src/inc/taoserror.h | 3 +- src/inc/taosmsg.h | 4 +- src/rpc/src/tstring.c | 7 +- src/system/detail/src/vnodeRead.c | 4 +- src/system/detail/src/vnodeShell.c | 32 +++++---- 6 files changed, 91 insertions(+), 64 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 40399d85b7..5b25947dc8 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1482,6 +1482,46 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) { return size; } +static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vnodeId, char* pMsg) { + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + + SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; + SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; + + tscTrace("%p vid:%d, query on %d meters", pSql, htons(vnodeId), numOfMeters); + if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { +#ifdef _DEBUG_VIEW + tscTrace("%p sid:%d, uid:%lld", pSql, pMeterMetaInfo->pMeterMeta->sid, pMeterMetaInfo->pMeterMeta->uid); +#endif + SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; + pMeterInfo->sid = htonl(pMeterMeta->sid); + pMeterInfo->uid = htobe64(pMeterMeta->uid); + + pMsg += sizeof(SMeterSidExtInfo); + } else { + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + + for (int32_t i = 0; i < numOfMeters; ++i) { + SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; + SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); + + pMeterInfo->sid = htonl(pQueryMeterInfo->sid); + pMeterInfo->uid = htobe64(pQueryMeterInfo->uid); + + pMsg += sizeof(SMeterSidExtInfo); + + memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen); + pMsg += pMetricMeta->tagLen; + +#ifdef _DEBUG_VIEW + tscTrace("%p sid:%d, uid:%lld", pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid); +#endif + } + } + + return pMsg; +} + int tscBuildQueryMsg(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; @@ -1512,7 +1552,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pQueryMsg->uid = pMeterMeta->uid; pQueryMsg->numOfTagsCols = 0; - } else { // query on metric + } else { // query on super table if (pMeterMetaInfo->vnodeIndex < 0) { tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); return -1; @@ -1699,34 +1739,8 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->colNameLen = htonl(len); - // set sids list - tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters); - if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { -#ifdef _DEBUG_VIEW - - tscTrace("%p %d", pSql, pMeterMetaInfo->pMeterMeta->sid); -#endif - SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg; - pSMeterTagInfo->sid = htonl(pMeterMeta->sid); - pMsg += sizeof(SMeterSidExtInfo); - } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); - - for (int32_t i = 0; i < numOfMeters; ++i) { - SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg; - SMeterSidExtInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); - - pMeterTagInfo->sid = htonl(pQueryMeterInfo->sid); - pMsg += sizeof(SMeterSidExtInfo); - -#ifdef _DEBUG_VIEW - tscTrace("%p %d", pSql, pQueryMeterInfo->sid); -#endif - - memcpy(pMsg, pQueryMeterInfo->tags, pMetricMeta->tagLen); - pMsg += pMetricMeta->tagLen; - } - } + // serialize the table info (sid, uid, tags) + pMsg = doSerializeTableInfo(pSql, numOfMeters, htons(pQueryMsg->vnode), pMsg); // only include the required tag column schema. If a tag is not required, it won't be sent to vnode if (pMeterMetaInfo->numOfTags > 0) { @@ -3226,44 +3240,47 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { size += pMeta->numOfVnodes * sizeof(SVnodeSidList *) + pMeta->numOfMeters * sizeof(SMeterSidExtInfo *); - char *pStr = calloc(1, size); - if (pStr == NULL) { + char *pBuf = calloc(1, size); + if (pBuf == NULL) { pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; goto _error_clean; } - SMetricMeta *pNewMetricMeta = (SMetricMeta *)pStr; + SMetricMeta *pNewMetricMeta = (SMetricMeta *)pBuf; metricMetaList[k] = pNewMetricMeta; pNewMetricMeta->numOfMeters = pMeta->numOfMeters; pNewMetricMeta->numOfVnodes = pMeta->numOfVnodes; pNewMetricMeta->tagLen = pMeta->tagLen; - pStr = pStr + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *); + pBuf = pBuf + sizeof(SMetricMeta) + pNewMetricMeta->numOfVnodes * sizeof(SVnodeSidList *); for (int32_t i = 0; i < pMeta->numOfVnodes; ++i) { SVnodeSidList *pSidLists = (SVnodeSidList *)rsp; - memcpy(pStr, pSidLists, sizeof(SVnodeSidList)); + memcpy(pBuf, pSidLists, sizeof(SVnodeSidList)); - pNewMetricMeta->list[i] = pStr - (char *)pNewMetricMeta; // offset value - SVnodeSidList *pLists = (SVnodeSidList *)pStr; + pNewMetricMeta->list[i] = pBuf - (char *)pNewMetricMeta; // offset value + SVnodeSidList *pLists = (SVnodeSidList *)pBuf; tscTrace("%p metricmeta:vid:%d,numOfMeters:%d", pSql, i, pLists->numOfSids); - pStr += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids; + pBuf += sizeof(SVnodeSidList) + sizeof(SMeterSidExtInfo *) * pSidLists->numOfSids; rsp += sizeof(SVnodeSidList); - size_t sidSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen; + size_t elemSize = sizeof(SMeterSidExtInfo) + pNewMetricMeta->tagLen; for (int32_t j = 0; j < pSidLists->numOfSids; ++j) { - pLists->pSidExtInfoList[j] = pStr - (char *)pLists; - memcpy(pStr, rsp, sidSize); - - rsp += sidSize; - pStr += sidSize; + pLists->pSidExtInfoList[j] = pBuf - (char *)pLists; + memcpy(pBuf, rsp, elemSize); + + ((SMeterSidExtInfo*) pBuf)->uid = htobe64(((SMeterSidExtInfo*) pBuf)->uid); + ((SMeterSidExtInfo*) pBuf)->sid = htonl(((SMeterSidExtInfo*) pBuf)->sid); + + rsp += elemSize; + pBuf += elemSize; } } - sizes[k] = pStr - (char *)pNewMetricMeta; + sizes[k] = pBuf - (char *)pNewMetricMeta; } for (int32_t i = 0; i < num; ++i) { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 5fee1d7da5..2bee153955 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -136,8 +136,9 @@ extern "C" { #define TSDB_CODE_INVALID_TABLE_ID 115 #define TSDB_CODE_INVALID_VNODE_STATUS 116 #define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117 +#define TSDB_CODE_TABLE_ID_MISMATCH 118 -#define TSDB_CODE_MAX_ERROR_CODE 118 +#define TSDB_CODE_MAX_ERROR_CODE 119 #ifdef __cplusplus } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 895cf23cf8..8d2121e3de 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -487,7 +487,7 @@ typedef struct SColumnInfo { */ typedef struct SMeterSidExtInfo { int32_t sid; - void * pObj; + int64_t uid; char tags[]; } SMeterSidExtInfo; @@ -724,9 +724,7 @@ typedef struct { int32_t numOfMeters; int32_t join; int32_t joinCondLen; // for join condition - int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; - } SMetricMetaMsg; typedef struct { diff --git a/src/rpc/src/tstring.c b/src/rpc/src/tstring.c index 63f65b882f..a4fc2b2c71 100644 --- a/src/rpc/src/tstring.c +++ b/src/rpc/src/tstring.c @@ -238,8 +238,9 @@ char *tsError[] = {"success", "only super table has metric meta info", "tags value not unique for join", "invalid submit message", - "not active table(not created yet or dropped already)", //114 - "invalid table id", - "invalid vnode status", //116 + "not active table(not created yet or dropped already)", + "invalid table id", // 115 + "invalid vnode status", "failed to lock resources", + "table id/uid mismatch", // 118 }; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index d062f1c712..927b5850f3 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -1097,10 +1097,12 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pSids[0] = (SMeterSidExtInfo *)pMsg; pSids[0]->sid = htonl(pSids[0]->sid); - + pSids[0]->uid = htobe64(pSids[0]->uid); + for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) { pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); pSids[j]->sid = htonl(pSids[j]->sid); + pSids[j]->uid = htobe64(pSids[j]->uid); } pMsg = (char *)pSids[pQueryMsg->numOfSids - 1]; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 5efbb41014..030bd0cdf4 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -28,7 +28,7 @@ #include "vnodeRead.h" #include "vnodeUtil.h" #include "vnodeStore.h" -#include "tstatus.h" +#include "vnodeStatus.h" extern int tsMaxQueues; @@ -297,7 +297,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { } if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) { - dTrace("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode); + dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode); code = TSDB_CODE_INVALID_TABLE_ID; goto _query_over; } @@ -312,31 +312,39 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { } if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) { + dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode); code = TSDB_CODE_NO_READ_ACCESS; goto _query_over; } - - if (pQueryMsg->pSidExtInfo == 0) { - dTrace("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg); - code = TSDB_CODE_INVALID_QUERY_MSG; - goto _query_over; - } - + if (pVnode->meterList == NULL) { dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode); code = TSDB_CODE_NOT_ACTIVE_VNODE; goto _query_over; } + if (pQueryMsg->pSidExtInfo == 0) { + dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg); + code = TSDB_CODE_INVALID_QUERY_MSG; + goto _query_over; + } + pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo; for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) { - dTrace("qmsg:%p sid:%d is out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, - pVnode->cfg.maxSessions); - + dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions); code = TSDB_CODE_INVALID_TABLE_ID; goto _query_over; } + + SMeterObj* pMeterObj = pVnode->meterList[pSids[i]->sid]; + if (pMeterObj->uid != pSids[i]->uid || pMeterObj->sid != pSids[i]->sid) { // uid/sid not match, error in query msg + dError("qmsg:%p sid/uid mismatch, vid:%d sid:%d id:%s uid:%" ", in msg sid:%d, uid:%lld", pQueryMsg, + pQueryMsg->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->uid, pSids[i]->sid, pSids[i]->uid); + + code = TSDB_CODE_TABLE_ID_MISMATCH; + goto _query_over; + } } // todo optimize for single table query process From c516665a2a22d6a2bccc9c4071888a85cd3e6bd5 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 13:52:22 +0800 Subject: [PATCH 3/8] refactor some codes. --- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscSchemaUtil.c | 20 +++--- src/inc/tschemautil.h | 2 +- src/system/detail/inc/mgmtBalance.h | 2 +- src/system/detail/inc/vnode.h | 9 --- .../detail/inc/vnodeStatus.h} | 29 +++++--- src/system/detail/src/dnodeMgmt.c | 2 +- src/system/detail/src/mgmtDb.c | 2 +- src/system/detail/src/mgmtDnode.c | 3 +- src/system/detail/src/mgmtMeter.c | 64 +++++++++-------- src/system/detail/src/mgmtShell.c | 2 +- src/system/detail/src/mgmtSupertableQuery.c | 4 +- src/system/detail/src/mgmtVgroup.c | 2 +- src/system/detail/src/vnodeCache.c | 2 +- src/system/detail/src/vnodeCommit.c | 3 +- src/system/detail/src/vnodeFile.c | 3 +- src/system/detail/src/vnodeImport.c | 1 + src/system/detail/src/vnodeMeter.c | 14 ++-- src/system/detail/src/vnodeQueryImpl.c | 6 +- .../detail/src/vnodeStatus.c} | 32 ++++++--- src/system/detail/src/vnodeStore.c | 4 +- src/system/detail/src/vnodeStream.c | 4 +- src/system/detail/src/vnodeUtil.c | 69 +++++++++++-------- src/system/lite/src/mgmtBalance.spec.c | 2 +- src/system/lite/src/mgmtDnode.spec.c | 2 +- src/system/lite/src/mgmtDnodeInt.spec.c | 2 +- src/system/lite/src/vnodePeer.spec.c | 2 +- 27 files changed, 161 insertions(+), 128 deletions(-) rename src/{inc/tstatus.h => system/detail/inc/vnodeStatus.h} (69%) rename src/{util/src/tstatus.c => system/detail/src/vnodeStatus.c} (76%) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9df50b0fa1..9f19034e92 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2336,7 +2336,7 @@ static int32_t getMeterIndex(SSQLToken* pTableToken, SSqlCmd* pCmd, SColumnIndex for (int32_t i = 0; i < pCmd->numOfTables; ++i) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i); - extractMeterName(pMeterMetaInfo->name, tableName); + extractTableName(pMeterMetaInfo->name, tableName); if (strncasecmp(tableName, pTableToken->z, pTableToken->n) == 0 && strlen(tableName) == pTableToken->n) { pIndex->tableIndex = i; diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 85ca3eb863..648c25657c 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -131,35 +131,39 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) { } // todo refactor -static FORCE_INLINE char* skipSegments(char* input, char delimiter, int32_t num) { +static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) { for (int32_t i = 0; i < num; ++i) { - while (*input != 0 && *input++ != delimiter) { + while (*input != 0 && *input++ != delim) { }; } return input; } -static FORCE_INLINE void copySegment(char* dst, char* src, char delimiter) { +static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { + size_t len = 0; while (*src != delimiter && *src != 0) { *dst++ = *src++; + len++; } + + return len; } /** - * extract meter name from meterid, which the format of userid.dbname.metername + * extract table name from meterid, which the format of userid.dbname.metername * @param meterId * @return */ -void extractMeterName(char* meterId, char* name) { +void extractTableName(char* meterId, char* name) { char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 2); - copySegment(name, r, TS_PATH_DELIMITER[0]); + copy(name, r, TS_PATH_DELIMITER[0]); } SSQLToken extractDBName(char* meterId, char* name) { char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 1); - copySegment(name, r, TS_PATH_DELIMITER[0]); + size_t len = copy(name, r, TS_PATH_DELIMITER[0]); - SSQLToken token = {.z = name, .n = strlen(name), .type = TK_STRING}; + SSQLToken token = {.z = name, .n = len, .type = TK_STRING}; return token; } diff --git a/src/inc/tschemautil.h b/src/inc/tschemautil.h index 7706bcd3a4..0031b4fa25 100644 --- a/src/inc/tschemautil.h +++ b/src/inc/tschemautil.h @@ -53,7 +53,7 @@ char *tsGetTagsValue(SMeterMeta *pMeta); bool tsMeterMetaIdentical(SMeterMeta *p1, SMeterMeta *p2); -void extractMeterName(char *meterId, char *name); +void extractTableName(char *meterId, char *name); SSQLToken extractDBName(char *meterId, char *name); diff --git a/src/system/detail/inc/mgmtBalance.h b/src/system/detail/inc/mgmtBalance.h index 67bfd55db2..a97e794894 100644 --- a/src/system/detail/inc/mgmtBalance.h +++ b/src/system/detail/inc/mgmtBalance.h @@ -25,7 +25,7 @@ extern "C" { #include "dnodeSystem.h" #include "mgmt.h" #include "tglobalcfg.h" -#include "tstatus.h" +#include "vnodeStatus.h" #include "ttime.h" void mgmtCreateDnodeOrderList(); diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 3ffbb6ab55..435184463b 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -64,15 +64,6 @@ enum _sync_cmd { TSDB_SYNC_CMD_REMOVE, }; -enum _meter_state { - TSDB_METER_STATE_READY = 0x00, - TSDB_METER_STATE_INSERT = 0x01, - TSDB_METER_STATE_IMPORTING = 0x02, - TSDB_METER_STATE_UPDATING = 0x04, - TSDB_METER_STATE_DELETING = 0x10, - TSDB_METER_STATE_DELETED = 0x18, -}; - typedef struct { int64_t offset : 48; int64_t length : 16; diff --git a/src/inc/tstatus.h b/src/system/detail/inc/vnodeStatus.h similarity index 69% rename from src/inc/tstatus.h rename to src/system/detail/inc/vnodeStatus.h index 53b5cb9547..1a28d67e98 100644 --- a/src/inc/tstatus.h +++ b/src/system/detail/inc/vnodeStatus.h @@ -78,15 +78,26 @@ enum _TSDB_VN_STREAM_STATUS { TSDB_VN_STREAM_STATUS_START }; -const char* taosGetVgroupStatusStr(int vgroupStatus); -const char* taosGetDbStatusStr(int dbStatus); -const char* taosGetVnodeStatusStr(int vnodeStatus); -const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus); -const char* taosGetVnodeDropStatusStr(int dropping); -const char* taosGetDnodeStatusStr(int dnodeStatus); -const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus); -const char* taosGetVgroupLbStatusStr(int vglbStatus); -const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus); +enum TSDB_TABLE_STATUS { + TSDB_METER_STATE_READY = 0x00, + TSDB_METER_STATE_INSERTING = 0x01, + TSDB_METER_STATE_IMPORTING = 0x02, + TSDB_METER_STATE_UPDATING = 0x04, + TSDB_METER_STATE_DROPPING = 0x10, + TSDB_METER_STATE_DROPPED = 0x18, +}; + +const char* taosGetVgroupStatusStr(int32_t vgroupStatus); +const char* taosGetDbStatusStr(int32_t dbStatus); +const char* taosGetVnodeStatusStr(int32_t vnodeStatus); +const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus); +const char* taosGetVnodeDropStatusStr(int32_t dropping); +const char* taosGetDnodeStatusStr(int32_t dnodeStatus); +const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus); +const char* taosGetVgroupLbStatusStr(int32_t vglbStatus); +const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus); + +const char* taosGetTableStatusStr(int32_t tableStatus); #ifdef __cplusplus } diff --git a/src/system/detail/src/dnodeMgmt.c b/src/system/detail/src/dnodeMgmt.c index a2496c6a42..9842e0dad6 100644 --- a/src/system/detail/src/dnodeMgmt.c +++ b/src/system/detail/src/dnodeMgmt.c @@ -26,7 +26,7 @@ #include "vnodeMgmt.h" #include "vnodeSystem.h" #include "vnodeUtil.h" -#include "tstatus.h" +#include "vnodeStatus.h" SMgmtObj mgmtObj; extern uint64_t tsCreatedTime; diff --git a/src/system/detail/src/mgmtDb.c b/src/system/detail/src/mgmtDb.c index 1ad41c00eb..c41db68e6b 100644 --- a/src/system/detail/src/mgmtDb.c +++ b/src/system/detail/src/mgmtDb.c @@ -20,7 +20,7 @@ #include "mgmtBalance.h" #include "mgmtUtil.h" #include "tschemautil.h" -#include "tstatus.h" +#include "vnodeStatus.h" void *dbSdb = NULL; int tsDbUpdateSize; diff --git a/src/system/detail/src/mgmtDnode.c b/src/system/detail/src/mgmtDnode.c index 753d8bdaa0..d7917f6d4f 100644 --- a/src/system/detail/src/mgmtDnode.c +++ b/src/system/detail/src/mgmtDnode.c @@ -20,8 +20,7 @@ #include "dnodeSystem.h" #include "mgmt.h" #include "tschemautil.h" -#include "tstatus.h" -#include "tstatus.h" +#include "vnodeStatus.h" bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType); int mgmtGetDnodesNum(); diff --git a/src/system/detail/src/mgmtMeter.c b/src/system/detail/src/mgmtMeter.c index be141f278d..4e4f4b61e3 100644 --- a/src/system/detail/src/mgmtMeter.c +++ b/src/system/detail/src/mgmtMeter.c @@ -27,7 +27,7 @@ #include "tsqlfunction.h" #include "ttime.h" #include "vnodeTagMgmt.h" -#include "tstatus.h" +#include "vnodeStatus.h" extern int64_t sdbVersion; @@ -984,6 +984,28 @@ SSchema *mgmtGetMeterSchema(STabObj *pMeter) { return (SSchema *)pMetric->schema; } +static int32_t mgmtSerializeTagValue(char* pMsg, STabObj* pMeter, int16_t* tagsId, int32_t numOfTags) { + int32_t offset = 0; + + for (int32_t j = 0; j < numOfTags; ++j) { + if (tagsId[j] == TSDB_TBNAME_COLUMN_INDEX) { // handle the table name tags + char name[TSDB_METER_NAME_LEN] = {0}; + extractTableName(pMeter->meterId, name); + + memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN); + offset += TSDB_METER_NAME_LEN; + } else { + SSchema s = {0}; + char * tag = mgmtMeterGetTag(pMeter, tagsId[j], &s); + + memcpy(pMsg + offset, tag, (size_t)s.bytes); + offset += s.bytes; + } + } + + return offset; +} + /* * serialize SVnodeSidList to byte array */ @@ -996,7 +1018,6 @@ static char *mgmtBuildMetricMetaMsg(STabObj *pMeter, int32_t *ovgId, SVnodeSidLi * 1. the query msg may be larger than 64k, * 2. the following meters belong to different vnodes */ - (*pList) = (SVnodeSidList *)pMsg; (*pList)->numOfSids = 0; (*pList)->index = 0; @@ -1015,29 +1036,15 @@ static char *mgmtBuildMetricMetaMsg(STabObj *pMeter, int32_t *ovgId, SVnodeSidLi (*pList)->numOfSids++; SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg; - pSMeterTagInfo->sid = pMeter->gid.sid; + pSMeterTagInfo->sid = htonl(pMeter->gid.sid); + pSMeterTagInfo->uid = htobe64(pMeter->uid); + pMsg += sizeof(SMeterSidExtInfo); - int32_t offset = 0; - for (int32_t j = 0; j < numOfTags; ++j) { - if (tagsId[j] == -1) { - char name[TSDB_METER_NAME_LEN] = {0}; - extractMeterName(pMeter->meterId, name); - - memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN); - offset += TSDB_METER_NAME_LEN; - } else { - SSchema s = {0}; - char * tag = mgmtMeterGetTag(pMeter, tagsId[j], &s); - - memcpy(pMsg + offset, tag, (size_t)s.bytes); - offset += s.bytes; - } - } - - pMsg += offset; + int32_t offset = mgmtSerializeTagValue(pMsg, pMeter, tagsId, numOfTags); assert(offset == tagLen); - + + pMsg += offset; return pMsg; } @@ -1209,12 +1216,9 @@ int mgmtRetrieveMetricMeta(void *thandle, char **pStart, SMetricMetaMsg *pMetric #endif if (ret == TSDB_CODE_SUCCESS) { + // todo opt performance for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) { ret = mgmtRetrieveMetersFromMetric(pMetricMetaMsg, i, &result[i]); - // todo opt performance - // if (result[i].num <= 0) {//no result - // } else if (result[i].num < 10) { - // } } } @@ -1283,7 +1287,7 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { memset(meterName, 0, tListLen(meterName)); // pattern compare for meter name - extractMeterName(pMeter->meterId, meterName); + extractTableName(pMeter->meterId, meterName); if (pShow->payloadLen > 0 && patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) @@ -1305,7 +1309,7 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; if (pMeter->pTagData) { - extractMeterName(pMeter->pTagData, pWrite); + extractTableName(pMeter->pTagData, pWrite); } cols++; @@ -1389,7 +1393,7 @@ int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn) pShow->pNode = (void *)pMetric->next; memset(metricName, 0, tListLen(metricName)); - extractMeterName(pMetric->meterId, metricName); + extractTableName(pMetric->meterId, metricName); if (pShow->payloadLen > 0 && patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) @@ -1398,7 +1402,7 @@ int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn) cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - extractMeterName(pMetric->meterId, pWrite); + extractTableName(pMetric->meterId, pWrite); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; diff --git a/src/system/detail/src/mgmtShell.c b/src/system/detail/src/mgmtShell.c index f6b2d7ba9b..ae7c7337a8 100644 --- a/src/system/detail/src/mgmtShell.c +++ b/src/system/detail/src/mgmtShell.c @@ -21,7 +21,7 @@ #include "mgmtProfile.h" #include "taosmsg.h" #include "tlog.h" -#include "tstatus.h" +#include "vnodeStatus.h" #define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN) diff --git a/src/system/detail/src/mgmtSupertableQuery.c b/src/system/detail/src/mgmtSupertableQuery.c index 10a64408ce..1aabe2feda 100644 --- a/src/system/detail/src/mgmtSupertableQuery.c +++ b/src/system/detail/src/mgmtSupertableQuery.c @@ -196,7 +196,7 @@ static bool mgmtTablenameFilterCallback(tSkipListNode* pNode, void* param) { // pattern compare for meter name STabObj* pMeterObj = (STabObj*)pNode->pData; - extractMeterName(pMeterObj->meterId, name); + extractTableName(pMeterObj->meterId, name); return patternMatch(pSupporter->pattern, name, TSDB_METER_ID_LEN, &pSupporter->info) == TSDB_PATTERN_MATCH; } @@ -786,7 +786,7 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer // todo refactor!!!!! static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) { if (offset == TSDB_TBNAME_COLUMN_INDEX) { - extractMeterName(pMeter->meterId, param); + extractTableName(pMeter->meterId, param); } else { char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position memcpy(param, tags, len); // make sure the value is null-terminated string diff --git a/src/system/detail/src/mgmtVgroup.c b/src/system/detail/src/mgmtVgroup.c index b2273ea87a..6a8b5a7983 100644 --- a/src/system/detail/src/mgmtVgroup.c +++ b/src/system/detail/src/mgmtVgroup.c @@ -19,7 +19,7 @@ #include "mgmt.h" #include "tschemautil.h" #include "tlog.h" -#include "tstatus.h" +#include "vnodeStatus.h" void * vgSdb = NULL; int tsVgUpdateSize; diff --git a/src/system/detail/src/vnodeCache.c b/src/system/detail/src/vnodeCache.c index 30dd8f7375..36bf872109 100644 --- a/src/system/detail/src/vnodeCache.c +++ b/src/system/detail/src/vnodeCache.c @@ -20,7 +20,7 @@ #include "vnode.h" #include "vnodeCache.h" #include "vnodeUtil.h" -#include "tstatus.h" +#include "vnodeStatus.h" void vnodeSearchPointInCache(SMeterObj *pObj, SQuery *pQuery); void vnodeProcessCommitTimer(void *param, void *tmrId); diff --git a/src/system/detail/src/vnodeCommit.c b/src/system/detail/src/vnodeCommit.c index 57bb52eb23..b5c9f80745 100644 --- a/src/system/detail/src/vnodeCommit.c +++ b/src/system/detail/src/vnodeCommit.c @@ -19,6 +19,7 @@ #include "tsdb.h" #include "vnode.h" #include "vnodeUtil.h" +#include "vnodeStatus.h" typedef struct { int sversion; @@ -165,7 +166,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { continue; } - if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) { dWarn("vid:%d sid:%d id:%s, meter is dropped, ignore data in commit log, contLen:%d action:%d", vnode, head.sid, head.contLen, head.action); continue; diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index ed56f72134..7597659e53 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -21,6 +21,7 @@ #include "vnode.h" #include "vnodeFile.h" #include "vnodeUtil.h" +#include "vnodeStatus.h" #define FILE_QUERY_NEW_BLOCK -5 // a special negative number @@ -611,7 +612,7 @@ _again: } // meter is going to be deleted, abort - if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) { dWarn("vid:%d sid:%d is dropped, ignore this meter", vnode, sid); continue; } diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 83d40c84a6..6bf543e470 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -18,6 +18,7 @@ #include "vnode.h" #include "vnodeUtil.h" +#include "vnodeStatus.h" extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId); extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize, diff --git a/src/system/detail/src/vnodeMeter.c b/src/system/detail/src/vnodeMeter.c index 7860a886c5..7cb4870eb2 100644 --- a/src/system/detail/src/vnodeMeter.c +++ b/src/system/detail/src/vnodeMeter.c @@ -24,7 +24,7 @@ #include "vnodeMgmt.h" #include "vnodeShell.h" #include "vnodeUtil.h" -#include "tstatus.h" +#include "vnodeStatus.h" #define VALID_TIMESTAMP(key, curKey, prec) (((key) >= 0) && ((key) <= ((curKey) + 36500 * tsMsPerDay[prec]))) @@ -520,7 +520,7 @@ int vnodeRemoveMeterObj(int vnode, int sid) { } // after remove this meter, change its state to DELETED - pObj->state = TSDB_METER_STATE_DELETED; + pObj->state = TSDB_METER_STATE_DROPPED; pObj->timeStamp = taosGetTimestampMs(); vnodeList[vnode].lastRemove = pObj->timeStamp; @@ -612,12 +612,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; } - if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERT)) != TSDB_CODE_SUCCESS) { + if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERTING)) != TSDB_CODE_SUCCESS) { goto _over; } for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion - if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) { dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId, pObj->state); @@ -660,7 +660,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pthread_mutex_unlock(&(pVnode->vmutex)); - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); + vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERTING); _over: dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld", @@ -726,7 +726,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { } SMeterObj *pObj = pVnode->meterList[pNew->sid]; - if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) { dTrace("vid:%d sid:%d id:%s, meter is deleted, abort update schema", pNew->vnode, pNew->sid, pNew->meterId); free(pNew->schema); free(pNew); @@ -734,7 +734,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { } int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_UPDATING); - if (state >= TSDB_METER_STATE_DELETING) { + if (state >= TSDB_METER_STATE_DROPPING) { dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d", pObj->vnode, pObj->sid, pObj->meterId, state); return; diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 045f79fa75..a18dfe4705 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -31,6 +31,7 @@ #include "vnodeDataFilterFunc.h" #include "vnodeFile.h" #include "vnodeQueryImpl.h" +#include "vnodeStatus.h" enum { TS_JOIN_TS_EQUAL = 0, @@ -47,8 +48,7 @@ enum { #define IS_DISK_DATA_BLOCK(q) ((q)->fileId >= 0) // static int32_t copyDataFromMMapBuffer(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t -// offset, -// int32_t size); +// offset, int32_t size); static int32_t readDataFromDiskFile(int fd, SQInfo *pQInfo, SQueryFilesInfo *pQueryFile, char *buf, uint64_t offset, int32_t size); @@ -2304,7 +2304,7 @@ bool isQueryKilled(SQuery *pQuery) { * if it will be deleted soon, stop current query ASAP. */ SMeterObj *pMeterObj = pQInfo->pObj; - if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { + if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DROPPING)) { pQInfo->killed = 1; return true; } diff --git a/src/util/src/tstatus.c b/src/system/detail/src/vnodeStatus.c similarity index 76% rename from src/util/src/tstatus.c rename to src/system/detail/src/vnodeStatus.c index 1ab007715d..4756496f40 100644 --- a/src/util/src/tstatus.c +++ b/src/system/detail/src/vnodeStatus.c @@ -15,9 +15,9 @@ #include "taosmsg.h" #include "tsdb.h" -#include "tstatus.h" +#include "vnodeStatus.h" -const char* taosGetVgroupStatusStr(int vgroupStatus) { +const char* taosGetVgroupStatusStr(int32_t vgroupStatus) { switch (vgroupStatus) { case TSDB_VG_STATUS_READY: return "ready"; case TSDB_VG_STATUS_IN_PROGRESS: return "inprogress"; @@ -28,7 +28,7 @@ const char* taosGetVgroupStatusStr(int vgroupStatus) { } } -const char* taosGetDbStatusStr(int dbStatus) { +const char* taosGetDbStatusStr(int32_t dbStatus) { switch (dbStatus) { case TSDB_DB_STATUS_READY: return "ready"; case TSDB_DB_STATUS_DROPPING: return "dropping"; @@ -37,7 +37,7 @@ const char* taosGetDbStatusStr(int dbStatus) { } } -const char* taosGetVnodeStatusStr(int vnodeStatus) { +const char* taosGetVnodeStatusStr(int32_t vnodeStatus) { switch (vnodeStatus) { case TSDB_VN_STATUS_OFFLINE: return "offline"; case TSDB_VN_STATUS_CREATING: return "creating"; @@ -50,7 +50,7 @@ const char* taosGetVnodeStatusStr(int vnodeStatus) { } } -const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) { +const char* taosGetVnodeSyncStatusStr(int32_t vnodeSyncStatus) { switch (vnodeSyncStatus) { case TSDB_VN_SYNC_STATUS_INIT: return "init"; case TSDB_VN_SYNC_STATUS_SYNCING: return "syncing"; @@ -60,7 +60,7 @@ const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) { } } -const char* taosGetVnodeDropStatusStr(int dropping) { +const char* taosGetVnodeDropStatusStr(int32_t dropping) { switch (dropping) { case TSDB_VN_DROP_STATUS_READY: return "ready"; case TSDB_VN_DROP_STATUS_DROPPING: return "dropping"; @@ -68,7 +68,7 @@ const char* taosGetVnodeDropStatusStr(int dropping) { } } -const char* taosGetDnodeStatusStr(int dnodeStatus) { +const char* taosGetDnodeStatusStr(int32_t dnodeStatus) { switch (dnodeStatus) { case TSDB_DN_STATUS_OFFLINE: return "offline"; case TSDB_DN_STATUS_READY: return "ready"; @@ -76,7 +76,7 @@ const char* taosGetDnodeStatusStr(int dnodeStatus) { } } -const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus) { +const char* taosGetDnodeLbStatusStr(int32_t dnodeBalanceStatus) { switch (dnodeBalanceStatus) { case TSDB_DN_LB_STATUS_BALANCED: return "balanced"; case TSDB_DN_LB_STATUS_BALANCING: return "balancing"; @@ -86,7 +86,7 @@ const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus) { } } -const char* taosGetVgroupLbStatusStr(int vglbStatus) { +const char* taosGetVgroupLbStatusStr(int32_t vglbStatus) { switch (vglbStatus) { case TSDB_VG_LB_STATUS_READY: return "ready"; case TSDB_VG_LB_STATUS_UPDATE: return "updating"; @@ -94,10 +94,22 @@ const char* taosGetVgroupLbStatusStr(int vglbStatus) { } } -const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus) { +const char* taosGetVnodeStreamStatusStr(int32_t vnodeStreamStatus) { switch (vnodeStreamStatus) { case TSDB_VN_STREAM_STATUS_START: return "start"; case TSDB_VN_STREAM_STATUS_STOP: return "stop"; default: return "undefined"; } } + +const char* taosGetTableStatusStr(int32_t tableStatus) { + switch(tableStatus) { + case TSDB_METER_STATE_INSERTING: return "inserting"; + case TSDB_METER_STATE_IMPORTING:return "importing"; + case TSDB_METER_STATE_UPDATING: return "updating"; + case TSDB_METER_STATE_DROPPING: return "deleting"; + case TSDB_METER_STATE_DROPPED: return "dropped"; + case TSDB_METER_STATE_READY: return "ready"; + default:return "undefined"; + } +} diff --git a/src/system/detail/src/vnodeStore.c b/src/system/detail/src/vnodeStore.c index 13f64ecf68..360216e964 100644 --- a/src/system/detail/src/vnodeStore.c +++ b/src/system/detail/src/vnodeStore.c @@ -22,7 +22,7 @@ #include "vnode.h" #include "vnodeStore.h" #include "vnodeUtil.h" -#include "tstatus.h" +#include "vnodeStatus.h" int tsMaxVnode = -1; int tsOpenVnodes = 0; @@ -118,7 +118,7 @@ static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) { } else { // set the meter is to be deleted SMeterObj* pObj = pVnode->meterList[sid]; if (pObj != NULL) { - pObj->state = TSDB_METER_STATE_DELETED; + pObj->state = TSDB_METER_STATE_DROPPED; } } } diff --git a/src/system/detail/src/vnodeStream.c b/src/system/detail/src/vnodeStream.c index 6b5f82a687..7ee20a2e59 100644 --- a/src/system/detail/src/vnodeStream.c +++ b/src/system/detail/src/vnodeStream.c @@ -17,7 +17,7 @@ #include "taosmsg.h" #include "vnode.h" #include "vnodeUtil.h" -#include "tstatus.h" +#include "vnodeStatus.h" /* static TAOS *dbConn = NULL; */ void vnodeCloseStreamCallback(void *param); @@ -86,7 +86,7 @@ void vnodeOpenStreams(void *param, void *tmrId) { for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) { pObj = pVnode->meterList[sid]; - if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) continue; + if (pObj == NULL || pObj->sqlLen == 0 || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) continue; dTrace("vid:%d sid:%d id:%s, open stream:%s", pObj->vnode, sid, pObj->meterId, pObj->pSql); diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index f8a4d3efb0..943bec4250 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -22,6 +22,7 @@ #include "vnode.h" #include "vnodeDataFilterFunc.h" #include "vnodeUtil.h" +#include "vnodeStatus.h" int vnodeCheckFileIntegrity(FILE* fp) { /* @@ -547,30 +548,38 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) { SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid]; - if (pMeter == NULL || (pMeter->state > TSDB_METER_STATE_INSERT)) { - if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DELETING)) { - code = TSDB_CODE_NOT_ACTIVE_TABLE; - dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid); - vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid); - } else {//update or import - code = TSDB_CODE_ACTION_IN_PROGRESS; - dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%d, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid, - pMeter->meterId, pMeter->state); - } - } else { - /* - * vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can - * check if the numOfQueries is 0 or not. - */ - pMeterObjList[(*numOfInc)++] = pMeter; - atomic_fetch_add_32(&pMeter->numOfQueries, 1); + if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DROPPING)) { + code = TSDB_CODE_NOT_ACTIVE_TABLE; + dError("qmsg:%p, vid:%d sid:%d, not there or will be dropped", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid); + + vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid); + continue; + } else if (pMeter->uid != pSids[i]->uid || pMeter->sid != pSids[i]->sid) { + code = TSDB_CODE_TABLE_ID_MISMATCH; + dError("qmsg:%p, vid:%d sid:%d id:%s uid:%lld, id mismatch. sid:%d uid:%lld in msg", pQueryMsg, + pQueryMsg->vnode, pMeter->sid, pMeter->meterId, pMeter->uid, pSids[i]->sid, pSids[i]->uid); + + vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid); + continue; + } else if (pMeter->state > TSDB_METER_STATE_INSERTING) { //update or import + code = TSDB_CODE_ACTION_IN_PROGRESS; + dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%s, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid, + pMeter->meterId, taosGetTableStatusStr(pMeter->state)); + continue; + } + + /* + * vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can + * check if the numOfQueries is 0 or not. + */ + pMeterObjList[(*numOfInc)++] = pMeter; + atomic_fetch_add_32(&pMeter->numOfQueries, 1); - // output for meter more than one query executed - if (pMeter->numOfQueries > 1) { - dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid, - pMeter->meterId, pMeter->numOfQueries); - num++; - } + // output for meter more than one query executed + if (pMeter->numOfQueries > 1) { + dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid, + pMeter->meterId, pMeter->numOfQueries); + num++; } } @@ -652,7 +661,7 @@ void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state) { bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state) { if (state == TSDB_METER_STATE_READY) { return pMeterObj->state == TSDB_METER_STATE_READY; - } else if (state == TSDB_METER_STATE_DELETING) { + } else if (state == TSDB_METER_STATE_DROPPING) { return pMeterObj->state >= state; } else { return (((pMeterObj->state) & state) == state); @@ -664,7 +673,7 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) { return; } - pMeterObj->state |= TSDB_METER_STATE_DELETING; + pMeterObj->state |= TSDB_METER_STATE_DROPPING; } int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) { @@ -672,7 +681,7 @@ int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) { int32_t state = vnodeSetMeterState(pObj, st); if (state != TSDB_METER_STATE_READY) {//return to denote import is not performed - if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPING)) { dTrace("vid:%d sid:%d id:%s, meter is deleted, state:%d", pObj->vnode, pObj->sid, pObj->meterId, pObj->state); code = TSDB_CODE_NOT_ACTIVE_TABLE; @@ -690,17 +699,17 @@ int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) { bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { SMeterObj* pObj = pVnode->meterList[sid]; - if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETED)) { + if (pObj == NULL || vnodeIsMeterState(pObj, TSDB_METER_STATE_DROPPED)) { return true; } - int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DELETING); + int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DROPPING); /* * if the meter is not in ready/deleting state, it must be in insert/import/update, * set the deleting state and wait the procedure to be completed */ - if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DELETING) { + if (prev != TSDB_METER_STATE_READY && prev < TSDB_METER_STATE_DROPPING) { vnodeSetMeterDeleting(pObj); dWarn("vid:%d sid:%d id:%s, can not be deleted, state:%d, wait", pObj->vnode, pObj->sid, pObj->meterId, prev); @@ -710,7 +719,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { bool ready = true; /* - * the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DELETING, + * the query will be stopped ASAP, since the state of meter is set to TSDB_METER_STATE_DROPPING, * and new query will abort since the meter is deleted. */ pthread_mutex_lock(&pVnode->vmutex); diff --git a/src/system/lite/src/mgmtBalance.spec.c b/src/system/lite/src/mgmtBalance.spec.c index 109f36af18..f55bada0a2 100644 --- a/src/system/lite/src/mgmtBalance.spec.c +++ b/src/system/lite/src/mgmtBalance.spec.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "mgmtBalance.h" -#include "tstatus.h" +#include "vnodeStatus.h" void mgmtStartBalanceTimer(int64_t mseconds) {} diff --git a/src/system/lite/src/mgmtDnode.spec.c b/src/system/lite/src/mgmtDnode.spec.c index dc7dd7d472..6db80aac45 100644 --- a/src/system/lite/src/mgmtDnode.spec.c +++ b/src/system/lite/src/mgmtDnode.spec.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "mgmt.h" -#include "tstatus.h" +#include "vnodeStatus.h" SDnodeObj dnodeObj; extern uint32_t tsRebootTime; diff --git a/src/system/lite/src/mgmtDnodeInt.spec.c b/src/system/lite/src/mgmtDnodeInt.spec.c index a914d630c2..734fa630c5 100644 --- a/src/system/lite/src/mgmtDnodeInt.spec.c +++ b/src/system/lite/src/mgmtDnodeInt.spec.c @@ -23,7 +23,7 @@ #include "tutil.h" #include "vnode.h" #include "tsystem.h" -#include "tstatus.h" +#include "vnodeStatus.h" extern void *dmQhandle; void * mgmtStatusTimer = NULL; diff --git a/src/system/lite/src/vnodePeer.spec.c b/src/system/lite/src/vnodePeer.spec.c index d7da8b66f4..34400d4051 100644 --- a/src/system/lite/src/vnodePeer.spec.c +++ b/src/system/lite/src/vnodePeer.spec.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "vnode.h" -#include "tstatus.h" +#include "vnodeStatus.h" int vnodeInitPeer(int numOfThreads) { return 0; } From 93b52b4b4bf1622b5e5f97e919247a897ef2a130 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 13:54:35 +0800 Subject: [PATCH 4/8] refactor codes. --- src/system/detail/src/vnodeShell.c | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 030bd0cdf4..662c26c5ad 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -336,15 +336,6 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { code = TSDB_CODE_INVALID_TABLE_ID; goto _query_over; } - - SMeterObj* pMeterObj = pVnode->meterList[pSids[i]->sid]; - if (pMeterObj->uid != pSids[i]->uid || pMeterObj->sid != pSids[i]->sid) { // uid/sid not match, error in query msg - dError("qmsg:%p sid/uid mismatch, vid:%d sid:%d id:%s uid:%" ", in msg sid:%d, uid:%lld", pQueryMsg, - pQueryMsg->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->uid, pSids[i]->sid, pSids[i]->uid); - - code = TSDB_CODE_TABLE_ID_MISMATCH; - goto _query_over; - } } // todo optimize for single table query process From d9a34ffeb9f40767ca21291232e6aff41469b142 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 14:11:59 +0800 Subject: [PATCH 5/8] do not change the error code for submit/select query, and handle the table id mismatch problem. --- src/client/src/tscServer.c | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5b25947dc8..d4da72b654 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -477,25 +477,17 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { if (code == 0) return pSql; msg = NULL; } else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID || - rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || - rspCode == TSDB_CODE_NETWORK_UNAVAIL) { + rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID || + rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) { #else if (rspCode == TSDB_CODE_NOT_ACTIVE_TABLE || rspCode == TSDB_CODE_INVALID_TABLE_ID || - rspCode == TSDB_CODE_INVALID_VNODE_ID || rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || - rspCode == TSDB_CODE_NETWORK_UNAVAIL) { + rspCode == TSDB_CODE_NOT_ACTIVE_VNODE || rspCode == TSDB_CODE_INVALID_VNODE_ID || + rspCode == TSDB_CODE_TABLE_ID_MISMATCH || rspCode == TSDB_CODE_NETWORK_UNAVAIL) { #endif pSql->thandle = NULL; taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user); - if ((pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) && - (rspCode == TSDB_CODE_INVALID_TABLE_ID || rspCode == TSDB_CODE_INVALID_VNODE_ID)) { - /* - * In case of the insert/select operations, the invalid table(vnode) id means - * the submit/query msg is invalid, renew meter meta will not help to fix this problem, - * so return the invalid_query_msg to client directly. - */ - code = TSDB_CODE_INVALID_QUERY_MSG; - } else if (pCmd->command == TSDB_SQL_CONNECT) { + if (pCmd->command == TSDB_SQL_CONNECT) { code = TSDB_CODE_NETWORK_UNAVAIL; } else if (pCmd->command == TSDB_SQL_HB) { code = TSDB_CODE_NOT_READY; @@ -2331,7 +2323,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql) { size = tscEstimateCreateTableMsgLength(pSql); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for create table msg", pSql); - free(tmpData); + free(tmpData); return -1; } From abfac9ab09de6fed1fbc971872dead0741d24c55 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 14:13:07 +0800 Subject: [PATCH 6/8] dump the number of tasks in the queue every 30sec. --- src/inc/tsched.h | 2 ++ src/util/src/tsched.c | 40 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/inc/tsched.h b/src/inc/tsched.h index dffd7a298a..827ecbbb42 100644 --- a/src/inc/tsched.h +++ b/src/inc/tsched.h @@ -32,6 +32,8 @@ typedef struct _sched_msg { void *taosInitScheduler(int queueSize, int numOfThreads, const char *label); +void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl); + int taosScheduleTask(void *qhandle, SSchedMsg *pMsg); void taosCleanUpScheduler(void *param); diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index bd49c670f6..4eefd2ad95 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -16,6 +16,9 @@ #include "os.h" #include "tlog.h" #include "tsched.h" +#include "ttimer.h" + +#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. typedef struct { char label[16]; @@ -28,10 +31,13 @@ typedef struct { int numOfThreads; pthread_t * qthread; SSchedMsg * queue; + + void* pTmrCtrl; + void* pTimer; } SSchedQueue; -void *taosProcessSchedQueue(void *param); -void taosCleanUpScheduler(void *param); +static void *taosProcessSchedQueue(void *param); +static void taosDumpSchedulerStatus(void *qhandle, void *tmrId); void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) { pthread_attr_t attr; @@ -96,6 +102,17 @@ _error: return NULL; } +void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) { + SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label); + + if (tmrCtrl != NULL && pSched != NULL) { + pSched->pTmrCtrl = tmrCtrl; + taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); + } + + return pSched; +} + void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; @@ -173,8 +190,27 @@ void taosCleanUpScheduler(void *param) { tsem_destroy(&pSched->emptySem); tsem_destroy(&pSched->fullSem); pthread_mutex_destroy(&pSched->queueMutex); + + if (pSched->pTimer) { + taosTmrStopA(&pSched->pTimer); + } free(pSched->queue); free(pSched->qthread); free(pSched); // fix memory leak } + +// for debug purpose, dump the scheduler status every 1min. +void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { + SSchedQueue *pSched = (SSchedQueue *)qhandle; + if (pSched == NULL || pSched->pTimer == NULL || pSched->pTimer != tmrId) { + return; + } + + int32_t size = ((pSched->emptySlot - pSched->fullSlot) + pSched->queueSize) % pSched->queueSize; + if (size > 0) { + pTrace("scheduler:%s, current tasks in queue:%d, task thread:%d", pSched->label, size, pSched->numOfThreads); + } + + taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); +} From 60f98e31ce59a838d60fe5d13a5cc4543c396ef9 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 14:18:00 +0800 Subject: [PATCH 7/8] add the log for query queue initializing. --- src/system/detail/src/vnodeSystem.c | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/system/detail/src/vnodeSystem.c b/src/system/detail/src/vnodeSystem.c index 631e258b25..b23050ab03 100644 --- a/src/system/detail/src/vnodeSystem.c +++ b/src/system/detail/src/vnodeSystem.c @@ -36,8 +36,14 @@ void vnodeCleanUpSystem() { bool vnodeInitQueryHandle() { int numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore; - if (numOfThreads < 1) numOfThreads = 1; - queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query"); + if (numOfThreads < 1) { + numOfThreads = 1; + } + + int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; + dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize,numOfThreads); + + queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); return true; } @@ -52,15 +58,15 @@ bool vnodeInitTmrCtl() { int vnodeInitSystem() { - if (!vnodeInitQueryHandle()) { - dError("failed to init query qhandle, exit"); - return -1; - } - if (!vnodeInitTmrCtl()) { dError("failed to init timer, exit"); return -1; } + + if (!vnodeInitQueryHandle()) { + dError("failed to init query qhandle, exit"); + return -1; + } if (vnodeInitStore() < 0) { dError("failed to init vnode storage"); From 7d493b3dec5c9426b1ebb82e0c51628420657826 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 17 Dec 2019 15:13:29 +0800 Subject: [PATCH 8/8] fix bugs in issue #934. [tbase-1354] --- src/client/src/tscSQLParser.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9f19034e92..fb0009300e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1957,7 +1957,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, int32_t colIdx, tSQLExprItem* pItem } SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByNameEx(&pParamElem->pNode->colInfo, pCmd, &index) != TSDB_CODE_SUCCESS) { + if ((getColumnIndexByNameEx(&pParamElem->pNode->colInfo, pCmd, &index) != TSDB_CODE_SUCCESS) || + index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { return invalidSqlErrMsg(pCmd, msg3); } @@ -1966,7 +1967,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, int32_t colIdx, tSQLExprItem* pItem SSchema* pSchema = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, index.columnIndex); int16_t colType = pSchema->type; - if (colType == TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) { + if (colType <= TSDB_DATA_TYPE_BOOL || colType >= TSDB_DATA_TYPE_BINARY) { return invalidSqlErrMsg(pCmd, msg1); } @@ -5468,15 +5469,16 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd) { int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { const char* msg1 = "functions/columns not allowed in group by query"; + const char* msg2 = "projection query on columns not allowed"; const char* msg3 = "group by not allowed on projection query"; - const char* msg5 = "retrieve tags not compatible with group by or interval query"; + const char* msg4 = "retrieve tags not compatible with group by or interval query"; SSqlCmd* pCmd = &pSql->cmd; // only retrieve tags, group by is not supportted if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) { if (pCmd->groupbyExpr.numOfGroupCols > 0 || pCmd->nAggTimeInterval > 0) { - return invalidSqlErrMsg(pCmd, msg5); + return invalidSqlErrMsg(pCmd, msg4); } else { return TSDB_CODE_SUCCESS; } @@ -5509,7 +5511,7 @@ int32_t doFunctionsCompatibleCheck(SSqlObj* pSql) { } if (!qualified) { - return TSDB_CODE_INVALID_SQL; + return invalidSqlErrMsg(pCmd, msg2); } }