Merge pull request #24998 from taosdata/szhou/tms/region-block

feat: tms region block
This commit is contained in:
dapan1121 2024-03-13 09:12:35 +08:00 committed by GitHub
commit b96cfab82f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 603 additions and 39 deletions

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT | PARTITION_FIRST | SMALLDATA_TS_SORT
select_list:
select_expr [, select_expr] ...
@ -94,6 +94,7 @@ The list of currently supported Hints is as follows:
| SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list |
| PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list |
| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp |
| SMALLDATA_TS_SORT| None | When sorting the supertable rows by timestamp, if the length of query columns >= 256, and there are relatively few rows, this hint can improve performance. | Sorting the supertable rows by timestamp |
For example:
@ -102,6 +103,7 @@ SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
SELECT /*+ SMALLDATA_TS_SORT() */ * from stable1 order by ts;
```
## Lists

View File

@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list
hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */
hint:
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT
BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARTITION_FIRST | PARA_TABLES_SORT | SMALLDATA_TS_SORT
select_list:
select_expr [, select_expr] ...
@ -94,6 +94,8 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适
| SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 |
| PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 |
| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 |
| SMALLDATA_TS_SORT| 无 | 超级表的数据按时间戳排序时, 查询列长度大于等于256, 但是行数不多, 使用这个提示, 可以提高性能 | 超级表的数据按时间戳排序时 |
举例:
```sql
@ -101,6 +103,7 @@ SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0
SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1;
SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts;
SELECT /*+ SMALLDATA_TS_SORT() */ * from stable1 order by ts;
```
## 列表

View File

@ -380,7 +380,7 @@
#define TK_SORT_FOR_GROUP 608
#define TK_PARTITION_FIRST 609
#define TK_PARA_TABLES_SORT 610
#define TK_SMALLDATA_TS_SORT 611
#define TK_NK_NIL 65535

View File

@ -122,6 +122,7 @@ typedef struct SScanLogicNode {
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
SArray* pFuncTypes; // for last, last_row
bool paraTablesSort; // for table merge scan
bool smallDataTsSort; // disable row id sort for table merge scan
} SScanLogicNode;
typedef struct SJoinLogicNode {
@ -445,6 +446,7 @@ typedef struct STableScanPhysiNode {
bool filesetDelimited;
bool needCountEmptyTable;
bool paraTablesSort;
bool smallDataTsSort;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;

View File

@ -128,7 +128,8 @@ typedef enum EHintOption {
HINT_BATCH_SCAN,
HINT_SORT_FOR_GROUP,
HINT_PARTITION_FIRST,
HINT_PARA_TABLES_SORT
HINT_PARA_TABLES_SORT,
HINT_SMALLDATA_TS_SORT,
} EHintOption;
typedef struct SHintNode {

View File

@ -119,6 +119,13 @@ int32_t taosSetFileHandlesLimit();
int32_t taosLinkFile(char *src, char *dst);
FILE* taosOpenCFile(const char* filename, const char* mode);
int taosSeekCFile(FILE* file, int64_t offset, int whence);
size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream );
size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream);
int taosCloseCFile(FILE *);
int taosSetAutoDelFile(char* path);
bool lastErrorIsFileNotExist();
#ifdef __cplusplus

View File

@ -352,6 +352,8 @@ typedef struct STableMergeScanInfo {
SSDataBlock* nextDurationBlocks[2];
bool rtnNextDurationBlocks;
int32_t nextDurationBlocksIdx;
bool bSortRowId;
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
} STableMergeScanInfo;

View File

@ -194,6 +194,9 @@ void tsortSetClosed(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle);
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsSize);
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle);
/**
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
* @param [in] pSortCols cols to comp and build

View File

@ -3871,6 +3871,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
@ -3957,7 +3958,9 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
}
@ -4009,9 +4012,16 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
if (pInfo->bSortRowId && numOfTable != 1) {
int32_t memSize = 512 * 1024 * 1024;
code = tsortSetSortByRowId(pInfo->pSortHandle, memSize);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
@ -4048,6 +4058,7 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
@ -4132,8 +4143,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pResBlock, pTupleHandle);
tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle);
if (pResBlock->info.rows >= capacity) {
break;
}
@ -4200,7 +4210,10 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
} else {
if (pInfo->bNewFilesetEvent) {
stopDurationForGroupTableMergeScan(pOperator);
startDurationForGroupTableMergeScan(pOperator);
code = startDurationForGroupTableMergeScan(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
} else {
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan(pOperator);
@ -4331,10 +4344,15 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
pInfo->mSkipTables = NULL;
}
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
if (!hasLimit && blockDataGetRowSize(pInfo->pResBlock) >= 256 && !pTableScanNode->smallDataTsSort) {
pInfo->bSortRowId = true;
} else {
pInfo->bSortRowId = false;
}
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
@ -4343,6 +4361,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
//start one reader variable

View File

@ -32,6 +32,32 @@ struct STupleHandle {
int32_t rowIndex;
};
typedef struct SSortMemFileRegion {
int64_t fileOffset;
int32_t regionSize;
int32_t bufRegOffset;
int32_t bufLen;
char* buf;
} SSortMemFileRegion;
typedef struct SSortMemFile {
char* writeBuf;
int32_t writeBufSize;
int64_t writeFileOffset;
int32_t currRegionId;
int32_t currRegionOffset;
bool bRegionDirty;
SArray* aFileRegions;
int32_t cacheSize;
int32_t blockSize;
FILE* pTdFile;
char memFilePath[PATH_MAX];
} SSortMemFile;
struct SSortHandle {
int32_t type;
int32_t pageSize;
@ -76,10 +102,21 @@ struct SSortHandle {
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
bool bSortByRowId;
SSortMemFile* pExtRowsMemFile;
int32_t extRowBytes;
int32_t extRowsPageSize;
int32_t extRowsMemSize;
int32_t srcTsSlotId;
SBlockOrderInfo extRowsOrderInfo;
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
void* mergeLimitReachedParam;
};
static int32_t destroySortMemFile(SSortHandle* pHandle);
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
char** ppRow, bool* pFreeRow);
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
pHandle->singleTableMerge = true;
}
@ -189,6 +226,7 @@ void destroyTuple(void* t) {
}
}
/**
*
* @param type
@ -202,7 +240,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->type = type;
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pSortInfo = taosArrayDup(pSortInfo, NULL);
pSortHandle->loops = 0;
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
@ -305,6 +343,10 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource);
if (pSortHandle->pExtRowsMemFile != NULL) {
destroySortMemFile(pSortHandle);
}
taosArrayDestroy(pSortHandle->pSortInfo);
taosMemoryFreeClear(pSortHandle);
}
@ -851,6 +893,389 @@ static int32_t createPageBuf(SSortHandle* pHandle) {
return 0;
}
void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
if (pHandle->bSortByRowId) {
int32_t regionId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 3);
char* buf = NULL;
bool bFreeRow = false;
getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow);
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
if (!isNull[i]) {
colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pStart);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
pStart += varDataTLen(pStart);
} else {
int32_t bytes = pColInfo->info.bytes;
pStart += bytes;
}
} else {
colDataSetNULL(pColInfo, pBlock->info.rows);
}
}
if (bFreeRow) {
taosMemoryFree(buf);
}
if (*(int32_t*)pStart != pStart - buf) {
qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart,
(int32_t)(pStart - buf));
};
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
} else {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
bool isNull = tsortIsNullVal(pTupleHandle, i);
if (isNull) {
colDataSetNULL(pColInfo, pBlock->info.rows);
} else {
char* pData = tsortGetValue(pTupleHandle, i);
if (pData != NULL) {
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
}
}
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pCol, rowIdx)) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
char* pData = colDataGetData(pCol, rowIdx);
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
if (pCol->pData) {
int32_t dataLen = getJsonValueLen(pData);
memcpy(pStart, pData, dataLen);
pStart += dataLen;
} else {
// the column that is pre-allocated has no data and has offset
*pStart = 0;
pStart += 1;
}
} else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
if (pCol->pData) {
varDataCopy(pStart, pData);
pStart += varDataTLen(pData);
} else {
// the column that is pre-allocated has no data and has offset
*(VarDataLenT*)(pStart) = 0;
pStart += VARSTR_HEADER_SIZE;
}
} else {
int32_t bytes = pCol->info.bytes;
memcpy(pStart, pData, bytes);
pStart += bytes;
}
}
*(int32_t*)pStart = (char*)pStart - (char*)buf;
pStart += sizeof(int32_t);
return (int32_t)(pStart - (char*)buf);
}
static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen,
char** ppRow, bool* pFreeRow) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId);
if (pRegion->buf == NULL) {
pRegion->bufRegOffset = 0;
pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
if (pRegion->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
pRegion->bufLen = readBytes;
}
ASSERT(pRegion->bufRegOffset <= tupleOffset);
if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) {
*pFreeRow = false;
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
} else {
*ppRow = taosMemoryMalloc(rowLen);
if (*ppRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
taosMemoryFreeClear(*ppRow);
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
*pFreeRow = true;
pRegion->bufRegOffset += pRegion->bufLen;
pRegion->bufLen = readBytes;
}
return TSDB_CODE_SUCCESS;
}
static int32_t createSortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile != NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
if (pMemFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
pMemFile->pTdFile = taosOpenCFile(pMemFile->memFilePath, "w+");
if (pMemFile->pTdFile == NULL) {
code = terrno = TAOS_SYSTEM_ERROR(errno);
}
}
if (code == TSDB_CODE_SUCCESS) {
taosSetAutoDelFile(pMemFile->memFilePath);
pMemFile->currRegionId = -1;
pMemFile->currRegionOffset = -1;
pMemFile->writeBufSize = 4 * 1024 * 1024;
pMemFile->writeFileOffset = -1;
pMemFile->bRegionDirty = false;
pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
if (pMemFile->writeBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (code == TSDB_CODE_SUCCESS) {
pMemFile->cacheSize = pHandle->extRowsMemSize;
pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
if (pMemFile->aFileRegions == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (code == TSDB_CODE_SUCCESS) {
pHandle->pExtRowsMemFile = pMemFile;
} else {
if (pMemFile) {
if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
if (pMemFile->pTdFile) {
taosCloseCFile(pMemFile->pTdFile);
pMemFile->pTdFile = NULL;
}
taosMemoryFreeClear(pMemFile);
}
}
return code;
}
static int32_t destroySortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS;
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) {
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
taosMemoryFree(pRegion->buf);
}
taosArrayDestroy(pMemFile->aFileRegions);
pMemFile->aFileRegions = NULL;
taosMemoryFree(pMemFile->writeBuf);
pMemFile->writeBuf = NULL;
taosCloseCFile(pMemFile->pTdFile);
pMemFile->pTdFile = NULL;
taosRemoveFile(pMemFile->memFilePath);
taosMemoryFree(pMemFile);
pHandle->pExtRowsMemFile = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t tsortOpenRegion(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
if (pMemFile->currRegionId == -1) {
SSortMemFileRegion region = {0};
region.fileOffset = 0;
region.bufRegOffset = 0;
taosArrayPush(pMemFile->aFileRegions, &region);
pMemFile->currRegionId = 0;
pMemFile->currRegionOffset = 0;
pMemFile->writeFileOffset = 0;
} else {
SSortMemFileRegion regionNew = {0};
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize;
regionNew.bufRegOffset = 0;
taosArrayPush(pMemFile->aFileRegions, &regionNew);
++pMemFile->currRegionId;
pMemFile->currRegionOffset = 0;
pMemFile->writeFileOffset = regionNew.fileOffset;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tsortCloseRegion(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
pRegion->regionSize = pMemFile->currRegionOffset;
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
if (writeBytes > 0) {
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
pMemFile->bRegionDirty = false;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions);
ASSERT(numRegions == (pMemFile->currRegionId + 1));
if (numRegions == 0) return TSDB_CODE_SUCCESS;
int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095;
pMemFile->blockSize = blockReadBytes;
for (int32_t i = 0; i < numRegions; ++i) {
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i);
pRegion->bufRegOffset = 0;
}
taosMemoryFree(pMemFile->writeBuf);
pMemFile->writeBuf = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
{
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
}
}
*pRegionId = pMemFile->currRegionId;
*pOffset = pMemFile->currRegionOffset;
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
*pLength = blockLen;
pMemFile->currRegionOffset += blockLen;
pMemFile->bRegionDirty = true;
return TSDB_CODE_SUCCESS;
}
static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
int32_t pageId = -1;
int32_t offset = -1;
int32_t length = -1;
saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
SSDataBlock* pBlock = pHandle->pDataBlock;
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
char* pData = colDataGetData(pSrcTsCol, *rowIndex);
colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1);
colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId);
SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2);
colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset);
SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
pBlock->info.rows += 1;
*rowIndex += 1;
}
static void initRowIdSort(SSortHandle* pHandle) {
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &regionIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
blockDataAppendColInfo(pSortInput, &lengthCol);
blockDataDestroy(pHandle->pDataBlock);
pHandle->pDataBlock = pSortInput;
int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
pHandle->pageSize = 256 * 1024; // 256k
pHandle->numOfPages = 256;
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo bi = {0};
bi.order = pOrder->order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(aOrder, &bi);
taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = aOrder;
return;
}
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
pHandle->extRowsMemSize = extRowsMemSize;
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
pHandle->extRowsOrderInfo = *pOrder;
initRowIdSort(pHandle);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
qError("create sort mem file failed since %s, tempDir:%s", terrstr(), tsTempDir);
return terrno;
}
int32_t code = createSortMemFile(pHandle);
pHandle->bSortByRowId = true;
return code;
}
typedef struct SBlkMergeSupport {
int64_t** aTs;
int32_t* aRowIdx;
@ -925,7 +1350,7 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
return sz;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
@ -933,13 +1358,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlockOrderInfo* pOrigBlockOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
SBlockOrderInfo* pHandleBlockOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup;
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.order = order->order;
sup.order = pOrigBlockOrder->order;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
}
@ -963,16 +1390,17 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
int64_t lastPageBufTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (order->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t lastPageBufTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
@ -985,19 +1413,24 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
if (!pHandle->bSortByRowId) {
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
} else {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
}
blkPgSz += bufInc;
++nRows;
@ -1011,7 +1444,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, order->slotId);
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
@ -1024,14 +1457,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && order->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && order->order == TSDB_ORDER_DESC)) {
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
}
blockDataCleanup(pHandle->pDataBlock);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
@ -1083,11 +1517,10 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
int32_t code = createPageBuf(pHandle);
if (code != TSDB_CODE_SUCCESS) {
@ -1098,7 +1531,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
if (pOrder->order == TSDB_ORDER_ASC) {
SBlockOrderInfo* pOrigOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
if (pOrigOrder->order == TSDB_ORDER_ASC) {
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
@ -1110,7 +1544,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
int64_t p = taosGetTimestampUs();
bool bExtractedBlock = false;
bool bSkipBlock = false;
if (pBlk != NULL && pHandle->mergeLimit > 0) {
@ -1121,13 +1554,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
(pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
if (bExtractedBlock) {
blockDataDestroy(pBlk);
}
}
continue;
}
}
@ -1150,7 +1583,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk);
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
int64_t p = taosGetTimestampUs();
if (pHandle->bSortByRowId) {
tsortOpenRegion(pHandle);
}
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
@ -1158,7 +1597,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
taosArrayClear(aBlkSort);
break;
}
if (pHandle->bSortByRowId) {
tsortCloseRegion(pHandle);
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
@ -1195,6 +1636,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
if (pHandle->bSortByRowId) {
tsortFinalizeRegions(pHandle);
}
pHandle->type = SORT_SINGLESOURCE_SORT;
return code;
}

View File

@ -457,6 +457,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(isCountByTag);
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
COPY_SCALAR_FIELD(paraTablesSort);
COPY_SCALAR_FIELD(smallDataTsSort);
return TSDB_CODE_SUCCESS;
}
@ -690,6 +691,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(needCountEmptyTable);
COPY_SCALAR_FIELD(paraTablesSort);
COPY_SCALAR_FIELD(smallDataTsSort);
return TSDB_CODE_SUCCESS;
}

View File

@ -699,6 +699,7 @@ static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort";
static const char* jkScanLogicPlanSmallDataTsSort = "SmallDataTsSort";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -749,6 +750,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanSmallDataTsSort, pNode->paraTablesSort);
}
return code;
}
@ -800,7 +804,10 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort);
code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->smallDataTsSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanSmallDataTsSort, &pNode->smallDataTsSort);
}
return code;
}
@ -1896,6 +1903,7 @@ static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort";
static const char* jkTableScanPhysiPlanSmallDataTsSort = "SmallDataTsSort";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1973,6 +1981,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanSmallDataTsSort, pNode->smallDataTsSort);
}
return code;
}
@ -2052,6 +2063,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanSmallDataTsSort, &pNode->smallDataTsSort);
}
return code;
}

View File

@ -2188,6 +2188,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->smallDataTsSort);
}
return code;
}
@ -2275,6 +2278,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->smallDataTsSort);
}
return code;
}

View File

@ -404,6 +404,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
case HINT_PARA_TABLES_SORT:
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
break;
case HINT_SMALLDATA_TS_SORT:
if (paramNum > 0 || hasHint(*ppHintList, HINT_SMALLDATA_TS_SORT)) return true;
break;
default:
return true;
}
@ -490,6 +493,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
}
opt = HINT_PARA_TABLES_SORT;
break;
case TK_SMALLDATA_TS_SORT:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_SMALLDATA_TS_SORT;
break;
case TK_NK_LP:
lastComma = false;
if (0 == opt || inParamList) {

View File

@ -213,6 +213,7 @@ static SKeyword keywordTable[] = {
{"SLIDING", TK_SLIDING},
{"SLIMIT", TK_SLIMIT},
{"SMA", TK_SMA},
{"SMALLDATA_TS_SORT", TK_SMALLDATA_TS_SORT},
{"SMALLINT", TK_SMALLINT},
{"SNODE", TK_SNODE},
{"SNODES", TK_SNODES},

View File

@ -47,7 +47,8 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
bool getBatchScanOptionFromHint(SNodeList* pList);
bool getSortForGroupOptHint(SNodeList* pList);
bool getparaTablesSortOptHint(SNodeList* pList);
bool getParaTablesSortOptHint(SNodeList* pList);
bool getSmallDataTsSortOptHint(SNodeList* pList);
bool getOptHint(SNodeList* pList, EHintOption hint);
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);

View File

@ -502,7 +502,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
} else {
nodesDestroyNode((SNode*)pScan);
}
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
pScan->paraTablesSort = getParaTablesSortOptHint(pSelect->pHint);
pScan->smallDataTsSort = getSmallDataTsSortOptHint(pSelect->pHint);
pCxt->hasScan = true;
return code;

View File

@ -658,6 +658,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
pTableScan->smallDataTsSort = pScanLogicNode->smallDataTsSort;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) {

View File

@ -466,7 +466,7 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
return false;
}
bool getparaTablesSortOptHint(SNodeList* pList) {
bool getParaTablesSortOptHint(SNodeList* pList) {
if (!pList) return false;
SNode* pNode;
FOREACH(pNode, pList) {
@ -478,6 +478,18 @@ bool getparaTablesSortOptHint(SNodeList* pList) {
return false;
}
bool getSmallDataTsSortOptHint(SNodeList* pList) {
if (!pList) return false;
SNode* pNode;
FOREACH(pNode, pList) {
SHintNode* pHint = (SHintNode*)pNode;
if (pHint->option == HINT_SMALLDATA_TS_SORT) {
return true;
}
}
return false;
}
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SLogicNode* pCurr = (SLogicNode*)pNode;

View File

@ -1404,3 +1404,35 @@ int32_t taosLinkFile(char *src, char *dst) {
#endif
return 0;
}
FILE* taosOpenCFile(const char* filename, const char* mode) {
return fopen(filename, mode);
}
int taosSeekCFile(FILE* file, int64_t offset, int whence) {
#ifdef WINDOWS
return _fseeki64(file, offset, whence);
#else
return fseeko(file, offset, whence);
#endif
}
size_t taosReadFromCFile(void *buffer, size_t size, size_t count, FILE *stream ) {
return fread(buffer, size, count, stream);
}
size_t taosWriteToCFile(const void* ptr, size_t size, size_t nitems, FILE* stream) {
return fwrite(ptr, size, nitems, stream);
}
int taosCloseCFile(FILE *f) {
return fclose(f);
}
int taosSetAutoDelFile(char* path) {
#ifdef WINDOWS
return SetFileAttributes(path, FILE_ATTRIBUTE_TEMPORARY);
#else
return unlink(path);
#endif
}