Merge pull request #24614 from taosdata/szhou/tms/port-3
feat: multi-readers and one merge tree
This commit is contained in:
commit
ba41231373
|
@ -379,6 +379,7 @@
|
||||||
#define TK_NO_BATCH_SCAN 607
|
#define TK_NO_BATCH_SCAN 607
|
||||||
#define TK_SORT_FOR_GROUP 608
|
#define TK_SORT_FOR_GROUP 608
|
||||||
#define TK_PARTITION_FIRST 609
|
#define TK_PARTITION_FIRST 609
|
||||||
|
#define TK_PARA_TABLES_SORT 610
|
||||||
|
|
||||||
|
|
||||||
#define TK_NK_NIL 65535
|
#define TK_NK_NIL 65535
|
||||||
|
|
|
@ -121,6 +121,7 @@ typedef struct SScanLogicNode {
|
||||||
bool filesetDelimited; // returned blocks delimited by fileset
|
bool filesetDelimited; // returned blocks delimited by fileset
|
||||||
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
||||||
SArray* pFuncTypes; // for last, last_row
|
SArray* pFuncTypes; // for last, last_row
|
||||||
|
bool paraTablesSort; // for table merge scan
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
@ -443,6 +444,7 @@ typedef struct STableScanPhysiNode {
|
||||||
int8_t igCheckUpdate;
|
int8_t igCheckUpdate;
|
||||||
bool filesetDelimited;
|
bool filesetDelimited;
|
||||||
bool needCountEmptyTable;
|
bool needCountEmptyTable;
|
||||||
|
bool paraTablesSort;
|
||||||
} STableScanPhysiNode;
|
} STableScanPhysiNode;
|
||||||
|
|
||||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||||
|
|
|
@ -128,6 +128,7 @@ typedef enum EHintOption {
|
||||||
HINT_BATCH_SCAN,
|
HINT_BATCH_SCAN,
|
||||||
HINT_SORT_FOR_GROUP,
|
HINT_SORT_FOR_GROUP,
|
||||||
HINT_PARTITION_FIRST,
|
HINT_PARTITION_FIRST,
|
||||||
|
HINT_PARA_TABLES_SORT
|
||||||
} EHintOption;
|
} EHintOption;
|
||||||
|
|
||||||
typedef struct SHintNode {
|
typedef struct SHintNode {
|
||||||
|
|
|
@ -631,7 +631,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||||
|
|
||||||
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
if (pBlock->pBlockAgg == NULL) {
|
if (pBlock->pBlockAgg == NULL) {
|
||||||
|
|
|
@ -283,6 +283,42 @@ typedef struct STableScanInfo {
|
||||||
bool needCountEmptyTable;
|
bool needCountEmptyTable;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
|
typedef enum ESubTableInputType {
|
||||||
|
SUB_TABLE_MEM_BLOCK,
|
||||||
|
SUB_TABLE_EXT_PAGES,
|
||||||
|
} ESubTableInputType;
|
||||||
|
|
||||||
|
typedef struct STmsSubTableInput {
|
||||||
|
STsdbReader* pReader;
|
||||||
|
SQueryTableDataCond tblCond;
|
||||||
|
STableKeyInfo* pKeyInfo;
|
||||||
|
bool bInMemReader;
|
||||||
|
ESubTableInputType type;
|
||||||
|
SSDataBlock* pReaderBlock;
|
||||||
|
|
||||||
|
SArray* aBlockPages;
|
||||||
|
SSDataBlock* pPageBlock;
|
||||||
|
int32_t pageIdx;
|
||||||
|
|
||||||
|
int32_t rowIdx;
|
||||||
|
int64_t* aTs;
|
||||||
|
} STmsSubTableInput;
|
||||||
|
|
||||||
|
typedef struct SBlockOrderInfo SBlockOrderInfo;
|
||||||
|
typedef struct STmsSubTablesMergeInfo {
|
||||||
|
SBlockOrderInfo* pOrderInfo;
|
||||||
|
|
||||||
|
int32_t numSubTables;
|
||||||
|
STmsSubTableInput* aInputs;
|
||||||
|
SMultiwayMergeTreeInfo* pTree;
|
||||||
|
int32_t numSubTablesCompleted;
|
||||||
|
|
||||||
|
int32_t numTableBlocksInMem;
|
||||||
|
SDiskbasedBuf* pBlocksBuf;
|
||||||
|
|
||||||
|
int32_t numInMemReaders;
|
||||||
|
} STmsSubTablesMergeInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
int32_t tableStartIndex;
|
int32_t tableStartIndex;
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
|
@ -296,7 +332,6 @@ typedef struct STableMergeScanInfo {
|
||||||
SSDataBlock* pSortInputBlock;
|
SSDataBlock* pSortInputBlock;
|
||||||
SSDataBlock* pReaderBlock;
|
SSDataBlock* pReaderBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
SArray* sortSourceParams;
|
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
|
@ -317,6 +352,8 @@ typedef struct STableMergeScanInfo {
|
||||||
SSDataBlock* nextDurationBlocks[2];
|
SSDataBlock* nextDurationBlocks[2];
|
||||||
bool rtnNextDurationBlocks;
|
bool rtnNextDurationBlocks;
|
||||||
int32_t nextDurationBlocksIdx;
|
int32_t nextDurationBlocksIdx;
|
||||||
|
|
||||||
|
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanFilterContext {
|
typedef struct STagScanFilterContext {
|
||||||
|
|
|
@ -3421,6 +3421,420 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// table merge scan operator
|
||||||
|
|
||||||
|
// table merge scan operator
|
||||||
|
// TODO: limit / duration optimization
|
||||||
|
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
||||||
|
// TODO: error processing, memory freeing
|
||||||
|
// TODO: add log for error and perf
|
||||||
|
// TODO: tsdb reader open/close dynamically
|
||||||
|
// TODO: blockdata deep cleanup
|
||||||
|
|
||||||
|
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
int32_t right = *(int32_t*)pRight;
|
||||||
|
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
|
||||||
|
|
||||||
|
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
|
||||||
|
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
|
||||||
|
|
||||||
|
if (leftIdx == -1) {
|
||||||
|
return 1;
|
||||||
|
} else if (rightIdx == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
|
||||||
|
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
|
||||||
|
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||||
|
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
|
||||||
|
ret = -1 * ret;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
||||||
|
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||||
|
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||||
|
for (int i = 0; i < src->numOfCols; i++) {
|
||||||
|
dst->colList[i] = src->colList[i];
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
blockDataCleanup(pInput->pReaderBlock);
|
||||||
|
if (!pInput->bInMemReader) {
|
||||||
|
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
|
||||||
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
|
if (code != 0) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->base.dataReader = pInput->pReader;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
bool hasNext = false;
|
||||||
|
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||||
|
if (code != 0) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
*pSubTableHasBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInput->tblCond.order == TSDB_ORDER_ASC) {
|
||||||
|
pInput->tblCond.twindows.skey = pInput->pReaderBlock->info.window.ekey + 1;
|
||||||
|
} else {
|
||||||
|
pInput->tblCond.twindows.ekey = pInput->pReaderBlock->info.window.skey - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
|
||||||
|
if (code != 0) {
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||||
|
*pSubTableHasBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pSubTableHasBlock = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*pSubTableHasBlock) {
|
||||||
|
pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid);
|
||||||
|
pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows;
|
||||||
|
}
|
||||||
|
if (!pInput->bInMemReader || !*pSubTableHasBlock) {
|
||||||
|
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||||
|
pInfo->bGroupProcessed = false;
|
||||||
|
|
||||||
|
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
int32_t i = pInfo->tableStartIndex + 1;
|
||||||
|
for (; i < numOfTables; ++i) {
|
||||||
|
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||||
|
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pInfo->tableEndIndex = i - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
if (pInput->rowIdx == -1) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
pInput->pageIdx = -1;
|
||||||
|
}
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
|
}
|
||||||
|
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||||
|
setGroupStartEndIndex(pInfo);
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||||
|
if (pSubTblsInfo == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
|
||||||
|
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||||
|
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
|
||||||
|
if (pSubTblsInfo->aInputs == NULL) {
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
int32_t bufPageSize = pInfo->bufPageSize;
|
||||||
|
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||||
|
int32_t code =
|
||||||
|
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
|
||||||
|
pSubTblsInfo->numInMemReaders = pSubTblsInfo->numSubTables;
|
||||||
|
|
||||||
|
pInfo->pSubTablesMergeInfo = pSubTblsInfo;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
pInput->type = SUB_TABLE_MEM_BLOCK;
|
||||||
|
dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
|
||||||
|
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||||
|
pInput->pKeyInfo = keyInfo;
|
||||||
|
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i + 1 < pSubTblsInfo->numInMemReaders) {
|
||||||
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, keyInfo, 1, pInput->pReaderBlock,
|
||||||
|
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
|
pInput->bInMemReader = true;
|
||||||
|
} else {
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
pInput->bInMemReader = false;
|
||||||
|
}
|
||||||
|
bool hasNext = true;
|
||||||
|
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
||||||
|
if (!hasNext) {
|
||||||
|
pInput->rowIdx = -1;
|
||||||
|
++pSubTblsInfo->numSubTablesCompleted;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
pInput->pageIdx = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
bool hasNext = true;
|
||||||
|
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
||||||
|
if (!hasNext) {
|
||||||
|
pInput->rowIdx = -1;
|
||||||
|
++pSubTblsInfo->numSubTablesCompleted;
|
||||||
|
} else {
|
||||||
|
pInput->rowIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
if (pInput->rowIdx < pInputBlock->info.rows - 1) {
|
||||||
|
++pInput->rowIdx;
|
||||||
|
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
|
||||||
|
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||||
|
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
||||||
|
}
|
||||||
|
if (pInput->rowIdx != -1) {
|
||||||
|
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||||
|
pInput->aTs = (int64_t*)col->pData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||||
|
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||||
|
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||||
|
|
||||||
|
if (isNull) {
|
||||||
|
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
||||||
|
} else {
|
||||||
|
if (pSrcColInfo->pData != NULL) {
|
||||||
|
char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx);
|
||||||
|
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pBlock->info.dataLoad = 1;
|
||||||
|
pBlock->info.scanFlag = pInputBlock->info.scanFlag;
|
||||||
|
pBlock->info.rows += 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
|
||||||
|
blockDataCleanup(pResBlock);
|
||||||
|
bool finished = false;
|
||||||
|
while (true) {
|
||||||
|
while (1) {
|
||||||
|
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
|
||||||
|
finished = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
|
||||||
|
adjustSubTableForNextRow(pOperator, pSubTblsInfo);
|
||||||
|
|
||||||
|
if (pResBlock->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
|
if (finished || limitReached || pResBlock->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
initSubTablesMergeInfo(pInfo);
|
||||||
|
|
||||||
|
initSubTableInputs(pOperator, pInfo);
|
||||||
|
|
||||||
|
openSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
||||||
|
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||||
|
if (pSubTblsInfo != NULL) {
|
||||||
|
tMergeTreeDestroy(&pSubTblsInfo->pTree);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||||
|
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||||
|
taosMemoryFree(pInput->tblCond.colList);
|
||||||
|
blockDataDestroy(pInput->pReaderBlock);
|
||||||
|
blockDataDestroy(pInput->pPageBlock);
|
||||||
|
taosArrayDestroy(pInput->aBlockPages);
|
||||||
|
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
|
||||||
|
pInput->pReader = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
|
||||||
|
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||||
|
|
||||||
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
pInfo->pSubTablesMergeInfo = NULL;
|
||||||
|
}
|
||||||
|
taosMemoryTrim(0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
|
||||||
|
if (tableListSize == 0) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pInfo->tableStartIndex = 0;
|
||||||
|
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||||
|
startSubTablesTableMergeScan(pOperator);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = NULL;
|
||||||
|
while (pInfo->tableStartIndex < tableListSize) {
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||||
|
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||||
|
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||||
|
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||||
|
}
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
pBlock->info.id.groupId = pInfo->groupId;
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
pInfo->bGroupProcessed = true;
|
||||||
|
return pBlock;
|
||||||
|
} else {
|
||||||
|
// Data of this group are all dumped, let's try the next group
|
||||||
|
stopSubTablesTableMergeScan(pInfo);
|
||||||
|
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||||
|
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||||
|
startSubTablesTableMergeScan(pOperator);
|
||||||
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
||||||
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
||||||
if (pInfo->mSkipTables == NULL) {
|
if (pInfo->mSkipTables == NULL) {
|
||||||
|
@ -3575,15 +3989,6 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
|
||||||
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
|
||||||
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
|
||||||
for (int i = 0; i < src->numOfCols; i++) {
|
|
||||||
dst->colList[i] = src->colList[i];
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||||
STableMergeScanInfo* pTmsInfo = param;
|
STableMergeScanInfo* pTmsInfo = param;
|
||||||
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
||||||
|
@ -3671,8 +4076,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
tSimpleHashClear(pInfo->mTableNumRows);
|
|
||||||
|
|
||||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
|
||||||
|
@ -3823,10 +4226,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
void destroyTableMergeScanOperatorInfo(void* param) {
|
void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
|
||||||
|
|
||||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
|
|
||||||
|
|
||||||
|
// start one reader variable
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
|
@ -3837,18 +4238,22 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
taosHashCleanup(pTableScanInfo->mSkipTables);
|
taosHashCleanup(pTableScanInfo->mSkipTables);
|
||||||
pTableScanInfo->mSkipTables = NULL;
|
pTableScanInfo->mSkipTables = NULL;
|
||||||
|
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
||||||
|
// end one reader variable
|
||||||
|
|
||||||
|
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||||
|
|
||||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||||
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
|
||||||
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||||
|
|
||||||
|
stopSubTablesTableMergeScan(pTableScanInfo);
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3922,14 +4327,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
|
||||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
|
||||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
|
||||||
|
|
||||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
|
||||||
|
|
||||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
|
||||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
|
|
||||||
pInfo->mergeLimit = -1;
|
pInfo->mergeLimit = -1;
|
||||||
|
@ -3938,24 +4335,37 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
||||||
pInfo->mSkipTables = NULL;
|
pInfo->mSkipTables = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
|
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||||
|
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
|
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||||
|
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||||
|
|
||||||
|
//start one reader variable
|
||||||
|
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
if (!tsExperimental) {
|
if (!tsExperimental) {
|
||||||
pInfo->filesetDelimited = false;
|
pInfo->filesetDelimited = false;
|
||||||
} else {
|
} else {
|
||||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||||
}
|
}
|
||||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
// end one reader variable
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(
|
||||||
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
|
||||||
|
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
||||||
|
NULL);
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -456,6 +456,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(filesetDelimited);
|
COPY_SCALAR_FIELD(filesetDelimited);
|
||||||
COPY_SCALAR_FIELD(isCountByTag);
|
COPY_SCALAR_FIELD(isCountByTag);
|
||||||
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
||||||
|
COPY_SCALAR_FIELD(paraTablesSort);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,6 +689,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
||||||
COPY_SCALAR_FIELD(igExpired);
|
COPY_SCALAR_FIELD(igExpired);
|
||||||
COPY_SCALAR_FIELD(filesetDelimited);
|
COPY_SCALAR_FIELD(filesetDelimited);
|
||||||
COPY_SCALAR_FIELD(needCountEmptyTable);
|
COPY_SCALAR_FIELD(needCountEmptyTable);
|
||||||
|
COPY_SCALAR_FIELD(paraTablesSort);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -698,6 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||||
|
static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort";
|
||||||
|
|
||||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||||
|
@ -745,6 +746,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -795,6 +799,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1888,6 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||||
|
static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort";
|
||||||
|
|
||||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||||
|
@ -1962,6 +1970,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2038,6 +2049,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2185,6 +2185,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2269,6 +2272,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -401,6 +401,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
||||||
case HINT_PARTITION_FIRST:
|
case HINT_PARTITION_FIRST:
|
||||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
|
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
|
||||||
break;
|
break;
|
||||||
|
case HINT_PARA_TABLES_SORT:
|
||||||
|
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -479,6 +482,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
||||||
}
|
}
|
||||||
opt = HINT_PARTITION_FIRST;
|
opt = HINT_PARTITION_FIRST;
|
||||||
break;
|
break;
|
||||||
|
case TK_PARA_TABLES_SORT:
|
||||||
|
lastComma = false;
|
||||||
|
if (0 != opt || inParamList) {
|
||||||
|
quit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
opt = HINT_PARA_TABLES_SORT;
|
||||||
|
break;
|
||||||
case TK_NK_LP:
|
case TK_NK_LP:
|
||||||
lastComma = false;
|
lastComma = false;
|
||||||
if (0 == opt || inParamList) {
|
if (0 == opt || inParamList) {
|
||||||
|
|
|
@ -173,6 +173,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||||
{"PAGES", TK_PAGES},
|
{"PAGES", TK_PAGES},
|
||||||
{"PAGESIZE", TK_PAGESIZE},
|
{"PAGESIZE", TK_PAGESIZE},
|
||||||
|
{"PARA_TABLES_SORT", TK_PARA_TABLES_SORT},
|
||||||
{"PARTITION", TK_PARTITION},
|
{"PARTITION", TK_PARTITION},
|
||||||
{"PARTITION_FIRST", TK_PARTITION_FIRST},
|
{"PARTITION_FIRST", TK_PARTITION_FIRST},
|
||||||
{"PASS", TK_PASS},
|
{"PASS", TK_PASS},
|
||||||
|
|
|
@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
|
||||||
|
|
||||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||||
bool getSortForGroupOptHint(SNodeList* pList);
|
bool getSortForGroupOptHint(SNodeList* pList);
|
||||||
|
bool getparaTablesSortOptHint(SNodeList* pList);
|
||||||
bool getOptHint(SNodeList* pList, EHintOption hint);
|
bool getOptHint(SNodeList* pList, EHintOption hint);
|
||||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||||
|
|
|
@ -501,7 +501,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyNode((SNode*)pScan);
|
nodesDestroyNode((SNode*)pScan);
|
||||||
}
|
}
|
||||||
|
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
|
||||||
pCxt->hasScan = true;
|
pCxt->hasScan = true;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -651,6 +651,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
||||||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||||
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
||||||
|
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
|
||||||
|
|
||||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
|
@ -466,6 +466,18 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getparaTablesSortOptHint(SNodeList* pList) {
|
||||||
|
if (!pList) return false;
|
||||||
|
SNode* pNode;
|
||||||
|
FOREACH(pNode, pList) {
|
||||||
|
SHintNode* pHint = (SHintNode*)pNode;
|
||||||
|
if (pHint->option == HINT_PARA_TABLES_SORT) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||||
|
|
|
@ -57,6 +57,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms.py
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms2.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
||||||
|
|
|
@ -656,7 +656,9 @@ if $data31 != 4 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
|
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
|
||||||
|
print select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||||
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||||
|
print $rows
|
||||||
if $rows != 40 then
|
if $rows != 40 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue