Merge branch '3.0' into fix/3_liaohj
This commit is contained in:
commit
d0da988b7d
|
@ -55,6 +55,7 @@ typedef struct SLogicNode {
|
||||||
EGroupAction groupAction;
|
EGroupAction groupAction;
|
||||||
EOrder inputTsOrder;
|
EOrder inputTsOrder;
|
||||||
EOrder outputTsOrder;
|
EOrder outputTsOrder;
|
||||||
|
bool forceCreateNonBlockingOptr; // true if the operator can use non-blocking(pipeline) mode
|
||||||
} SLogicNode;
|
} SLogicNode;
|
||||||
|
|
||||||
typedef enum EScanType {
|
typedef enum EScanType {
|
||||||
|
@ -105,6 +106,7 @@ typedef struct SScanLogicNode {
|
||||||
bool hasNormalCols; // neither tag column nor primary key tag column
|
bool hasNormalCols; // neither tag column nor primary key tag column
|
||||||
bool sortPrimaryKey;
|
bool sortPrimaryKey;
|
||||||
bool igLastNull;
|
bool igLastNull;
|
||||||
|
bool groupOrderScan;
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
@ -316,6 +318,7 @@ typedef struct SPhysiNode {
|
||||||
struct SPhysiNode* pParent;
|
struct SPhysiNode* pParent;
|
||||||
SNode* pLimit;
|
SNode* pLimit;
|
||||||
SNode* pSlimit;
|
SNode* pSlimit;
|
||||||
|
bool forceCreateNonBlockingOptr;
|
||||||
} SPhysiNode;
|
} SPhysiNode;
|
||||||
|
|
||||||
typedef struct SScanPhysiNode {
|
typedef struct SScanPhysiNode {
|
||||||
|
@ -326,6 +329,7 @@ typedef struct SScanPhysiNode {
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int8_t tableType;
|
int8_t tableType;
|
||||||
SName tableName;
|
SName tableName;
|
||||||
|
bool groupOrderScan;
|
||||||
} SScanPhysiNode;
|
} SScanPhysiNode;
|
||||||
|
|
||||||
typedef SScanPhysiNode STagScanPhysiNode;
|
typedef SScanPhysiNode STagScanPhysiNode;
|
||||||
|
|
|
@ -632,7 +632,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
|
||||||
pStart += colSize;
|
pStart += colSize;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart, pCol->pData, dataSize);
|
if (dataSize != 0) {
|
||||||
|
// ubsan reports error if pCol->pData==NULL && dataSize==0
|
||||||
|
memcpy(pStart, pCol->pData, dataSize);
|
||||||
|
}
|
||||||
pStart += dataSize;
|
pStart += dataSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -684,8 +687,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (colLength != 0) {
|
||||||
memcpy(pCol->pData, pStart, colLength);
|
// ubsan reports error if colLength==0 && pCol->pData == 0
|
||||||
|
memcpy(pCol->pData, pStart, colLength);
|
||||||
|
}
|
||||||
pStart += colLength;
|
pStart += colLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, vo
|
||||||
|
|
||||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||||
void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst);
|
void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||||
|
void mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst);
|
||||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||||
|
|
||||||
const char *mndGetStbStr(const char *src);
|
const char *mndGetStbStr(const char *src);
|
||||||
|
|
|
@ -2498,12 +2498,14 @@ static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == strcmp(cfgReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) {
|
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName);
|
||||||
|
if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
|
||||||
mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
|
mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
|
||||||
if (mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
|
if (mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
} else if (0 == strcmp(cfgReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
|
} else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
|
||||||
mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
|
mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
|
||||||
if (mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
|
if (mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -2672,6 +2674,13 @@ void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
|
||||||
tNameGetDbName(&name, dst);
|
tNameGetDbName(&name, dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
|
||||||
|
tNameGetDbName(&name, dst);
|
||||||
|
}
|
||||||
|
|
||||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
|
||||||
int32_t pos = -1;
|
int32_t pos = -1;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
|
|
@ -2090,7 +2090,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_CACHE_NHIT_INC(CTG_CI_TBL_CFG, 1);
|
CTG_CACHE_NHIT_INC(CTG_CI_TBL_CFG, 1);
|
||||||
|
|
||||||
if (pCtx->tbType <= 0) {
|
if (pCtx->tbType <= 0) {
|
||||||
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
||||||
if (pCtx->tbType <= 0) {
|
if (pCtx->tbType <= 0) {
|
||||||
|
@ -2102,7 +2102,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_SUPER_TABLE == pCtx->tbType) {
|
if (TSDB_SUPER_TABLE == pCtx->tbType || TSDB_SYSTEM_TABLE == pCtx->tbType) {
|
||||||
CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
|
CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
|
||||||
} else {
|
} else {
|
||||||
if (NULL == pCtx->pVgInfo) {
|
if (NULL == pCtx->pVgInfo) {
|
||||||
|
|
|
@ -232,19 +232,20 @@ typedef struct STableMergeScanInfo {
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SArray* queryConds; // array of queryTableDataCond
|
|
||||||
STableScanBase base;
|
STableScanBase base;
|
||||||
int32_t bufPageSize;
|
int32_t bufPageSize;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
SArray* pSortInfo;
|
SArray* pSortInfo;
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SSDataBlock* pSortInputBlock;
|
SSDataBlock* pSortInputBlock;
|
||||||
|
SSDataBlock* pReaderBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
SArray* sortSourceParams;
|
SArray* sortSourceParams;
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
int32_t scanTimes;
|
int32_t scanTimes;
|
||||||
|
int32_t readIdx;
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
SSortExecInfo sortExecInfo;
|
SSortExecInfo sortExecInfo;
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
||||||
enum {
|
enum {
|
||||||
SORT_MULTISOURCE_MERGE = 0x1,
|
SORT_MULTISOURCE_MERGE = 0x1,
|
||||||
SORT_SINGLESOURCE_SORT = 0x2,
|
SORT_SINGLESOURCE_SORT = 0x2,
|
||||||
|
SORT_BLOCK_TS_MERGE = 0x3
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SMultiMergeSource {
|
typedef struct SMultiMergeSource {
|
||||||
|
@ -53,6 +54,12 @@ typedef struct SMsortComparParam {
|
||||||
int32_t numOfSources;
|
int32_t numOfSources;
|
||||||
SArray* orderInfo; // SArray<SBlockOrderInfo>
|
SArray* orderInfo; // SArray<SBlockOrderInfo>
|
||||||
bool cmpGroupId;
|
bool cmpGroupId;
|
||||||
|
|
||||||
|
int32_t sortType;
|
||||||
|
// the following field to speed up when sortType == SORT_BLOCK_TS_MERGE
|
||||||
|
int32_t tsSlotId;
|
||||||
|
int32_t order;
|
||||||
|
__compar_fn_t cmpFn;
|
||||||
} SMsortComparParam;
|
} SMsortComparParam;
|
||||||
|
|
||||||
typedef struct SSortHandle SSortHandle;
|
typedef struct SSortHandle SSortHandle;
|
||||||
|
@ -70,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
||||||
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
|
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
|
||||||
uint32_t sortBufSize);
|
uint32_t pqSortBufSize);
|
||||||
|
|
||||||
void tsortSetForceUsePQSort(SSortHandle* pHandle);
|
void tsortSetForceUsePQSort(SSortHandle* pHandle);
|
||||||
|
|
||||||
|
@ -110,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
|
||||||
*/
|
*/
|
||||||
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit);
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -45,6 +45,8 @@ typedef struct SAggOperatorInfo {
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SExprSupp scalarExprSup;
|
SExprSupp scalarExprSup;
|
||||||
bool groupKeyOptimized;
|
bool groupKeyOptimized;
|
||||||
|
bool hasValidBlock;
|
||||||
|
SSDataBlock* pNewGroupBlock;
|
||||||
} SAggOperatorInfo;
|
} SAggOperatorInfo;
|
||||||
|
|
||||||
static void destroyAggOperatorInfo(void* param);
|
static void destroyAggOperatorInfo(void* param);
|
||||||
|
@ -53,7 +55,6 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
|
||||||
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
|
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
|
||||||
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
|
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
|
||||||
|
|
||||||
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator);
|
|
||||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
|
||||||
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
|
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
|
||||||
|
|
||||||
|
@ -111,9 +112,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
||||||
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
|
||||||
pTaskInfo);
|
!pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL);
|
optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
@ -153,28 +154,42 @@ void destroyAggOperatorInfo(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a blocking operator
|
/**
|
||||||
int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
* @brief get blocks from downstream and fill results into groupedRes after aggragation
|
||||||
if (OPTR_IS_OPENED(pOperator)) {
|
* @retval false if no more groups
|
||||||
return TSDB_CODE_SUCCESS;
|
* @retval true if there could have new groups coming
|
||||||
}
|
* @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
|
||||||
|
* if false, fill results of ONE GROUP
|
||||||
|
* */
|
||||||
|
static bool nextGroupedResult(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
|
if (pOperator->blocking && pAggInfo->hasValidBlock) return false;
|
||||||
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t order = pAggInfo->binfo.inputTsOrder;
|
int32_t order = pAggInfo->binfo.inputTsOrder;
|
||||||
bool hasValidBlock = false;
|
SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
|
||||||
|
|
||||||
|
if (pBlock) {
|
||||||
|
pAggInfo->pNewGroupBlock = NULL;
|
||||||
|
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
|
||||||
|
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||||
|
setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
||||||
|
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
bool blockAllocated = false;
|
bool blockAllocated = false;
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
if (!hasValidBlock) {
|
if (!pAggInfo->hasValidBlock) {
|
||||||
createDataBlockForEmptyInput(pOperator, &pBlock);
|
createDataBlockForEmptyInput(pOperator, &pBlock);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -184,7 +199,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hasValidBlock = true;
|
pAggInfo->hasValidBlock = true;
|
||||||
pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
|
pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
|
@ -196,7 +211,11 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// if non-blocking mode and new group arrived, save the block and break
|
||||||
|
if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
|
||||||
|
pAggInfo->pNewGroupBlock = pBlock;
|
||||||
|
break;
|
||||||
|
}
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||||
setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
|
||||||
|
@ -215,10 +234,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
||||||
OPTR_SET_OPENED(pOperator);
|
return pBlock != NULL;
|
||||||
|
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
return pTaskInfo->code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
|
@ -230,26 +246,25 @@ SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
bool hasNewGroups = false;
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
do {
|
||||||
setOperatorCompleted(pOperator);
|
hasNewGroups = nextGroupedResult(pOperator);
|
||||||
return NULL;
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
}
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
while (1) {
|
||||||
while (1) {
|
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
||||||
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
|
||||||
|
|
||||||
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
|
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
|
||||||
setOperatorCompleted(pOperator);
|
if (!hasNewGroups) setOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} while (pInfo->pRes->info.rows == 0 && hasNewGroups);
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
|
size_t rows = blockDataGetNumOfRows(pInfo->pRes);
|
||||||
pOperator->resultInfo.totalRows += rows;
|
pOperator->resultInfo.totalRows += rows;
|
||||||
|
|
|
@ -127,6 +127,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
|
||||||
if (pGroupResInfo->pRows != NULL) {
|
if (pGroupResInfo->pRows != NULL) {
|
||||||
taosArrayDestroy(pGroupResInfo->pRows);
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
}
|
}
|
||||||
|
if (pGroupResInfo->pBuf) {
|
||||||
|
taosMemoryFree(pGroupResInfo->pBuf);
|
||||||
|
pGroupResInfo->pBuf = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// extract the result rows information from the hash map
|
// extract the result rows information from the hash map
|
||||||
int32_t size = tSimpleHashGetSize(pHashmap);
|
int32_t size = tSimpleHashGetSize(pHashmap);
|
||||||
|
@ -2104,6 +2108,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
if (groupSort && groupByTbname) {
|
if (groupSort && groupByTbname) {
|
||||||
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
|
} else if (groupByTbname && pScanNode->groupOrderScan){
|
||||||
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else {
|
} else {
|
||||||
pTableListInfo->numOfOuputGroups = 1;
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,8 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// clang-format off
|
|
||||||
|
|
||||||
#include "executorInt.h"
|
#include "executorInt.h"
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
@ -55,8 +53,7 @@ typedef struct STableMergeScanSortSourceParam {
|
||||||
SOperatorInfo* pOperator;
|
SOperatorInfo* pOperator;
|
||||||
int32_t readerIdx;
|
int32_t readerIdx;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
SSDataBlock* inputBlock;
|
STsdbReader* reader;
|
||||||
STsdbReader* dataReader;
|
|
||||||
} STableMergeScanSortSourceParam;
|
} STableMergeScanSortSourceParam;
|
||||||
|
|
||||||
typedef struct STableCountScanOperatorInfo {
|
typedef struct STableCountScanOperatorInfo {
|
||||||
|
@ -2734,32 +2731,17 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
int32_t readIdx = source->readerIdx;
|
SSDataBlock* pBlock = pInfo->pReaderBlock;
|
||||||
SSDataBlock* pBlock = source->inputBlock;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
|
|
||||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
|
||||||
if (NULL == source->dataReader) {
|
|
||||||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
|
||||||
if (code != 0) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->base.dataReader = source->dataReader;
|
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
|
||||||
|
|
||||||
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
while (true) {
|
while (true) {
|
||||||
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||||
pInfo->base.dataReader = NULL;
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2769,7 +2751,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
|
||||||
pInfo->base.dataReader = NULL;
|
|
||||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2779,12 +2760,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryCond->order == TSDB_ORDER_ASC) {
|
|
||||||
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
|
||||||
} else {
|
|
||||||
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
|
||||||
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
@ -2806,16 +2781,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
qTrace("tsdb/read-table-data: %p, close reader", reader);
|
|
||||||
pInfo->base.dataReader = NULL;
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
pAPI->tsdReader.tsdReaderClose(source->dataReader);
|
|
||||||
source->dataReader = NULL;
|
|
||||||
pInfo->base.dataReader = NULL;
|
|
||||||
blockDataDestroy(source->inputBlock);
|
|
||||||
source->inputBlock = NULL;
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2851,6 +2819,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
|
||||||
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
{
|
{
|
||||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||||
|
@ -2867,53 +2837,29 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
||||||
|
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
|
||||||
|
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
|
||||||
// the additional one is reserved for merge result
|
|
||||||
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
|
||||||
int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
|
|
||||||
if (kWay >= 128) {
|
|
||||||
kWay = 128;
|
|
||||||
} else if (kWay <= 2) {
|
|
||||||
kWay = 2;
|
|
||||||
} else {
|
|
||||||
int i = 2;
|
|
||||||
while (i * 2 <= kWay) i = i * 2;
|
|
||||||
kWay = i;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||||
|
int64_t mergeLimit = -1;
|
||||||
|
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
|
||||||
|
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
|
||||||
|
}
|
||||||
|
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
|
||||||
|
|
||||||
// one table has one data block
|
// one table has one data block
|
||||||
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
|
||||||
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
STableMergeScanSortSourceParam param = {0};
|
||||||
STableMergeScanSortSourceParam param = {0};
|
param.pOperator = pOperator;
|
||||||
param.readerIdx = i;
|
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
|
||||||
param.pOperator = pOperator;
|
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
|
||||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
|
ps->param = ¶m;
|
||||||
SQueryTableDataCond cond;
|
ps->onlyRef = true;
|
||||||
dumpQueryTableCond(&pInfo->base.cond, &cond);
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
taosArrayPush(pInfo->queryConds, &cond);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
|
||||||
ps->param = param;
|
|
||||||
ps->onlyRef = true;
|
|
||||||
tsortAddSource(pInfo->pSortHandle, ps);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||||
|
|
||||||
|
@ -2929,8 +2875,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
|
|
||||||
|
|
||||||
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
|
||||||
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
|
||||||
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
|
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
|
||||||
|
@ -2938,24 +2882,14 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
|
||||||
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
if (pInfo->base.dataReader != NULL) {
|
||||||
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
|
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
|
||||||
blockDataDestroy(param->inputBlock);
|
pInfo->base.dataReader = NULL;
|
||||||
pAPI->tsdReader.tsdReaderClose(param->dataReader);
|
|
||||||
param->dataReader = NULL;
|
|
||||||
}
|
}
|
||||||
taosArrayClear(pInfo->sortSourceParams);
|
|
||||||
|
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
pInfo->pSortHandle = NULL;
|
pInfo->pSortHandle = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
|
|
||||||
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
|
|
||||||
taosMemoryFree(cond->colList);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pInfo->queryConds);
|
|
||||||
pInfo->queryConds = NULL;
|
|
||||||
|
|
||||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2968,28 +2902,32 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
blockDataCleanup(pResBlock);
|
blockDataCleanup(pResBlock);
|
||||||
|
STupleHandle* pTupleHandle = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
while (1) {
|
||||||
if (pTupleHandle == NULL) {
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
break;
|
if (pTupleHandle == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
||||||
|
if (pResBlock->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
if (tsortIsClosed(pHandle)) {
|
||||||
if (pResBlock->info.rows >= capacity) {
|
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
break;
|
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
|
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||||
|
pInfo->limitInfo.numOfOutputRows);
|
||||||
|
if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsortIsClosed(pHandle)) {
|
|
||||||
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
|
||||||
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
|
||||||
pInfo->limitInfo.numOfOutputRows);
|
|
||||||
|
|
||||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3053,14 +2991,7 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||||
|
|
||||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);
|
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; i++) {
|
|
||||||
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
|
|
||||||
blockDataDestroy(p->inputBlock);
|
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
|
|
||||||
p->dataReader = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
@ -3069,16 +3000,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
||||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||||
pTableScanInfo->pSortHandle = NULL;
|
pTableScanInfo->pSortHandle = NULL;
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
|
|
||||||
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
|
|
||||||
taosMemoryFree(pCond->colList);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->queryConds);
|
|
||||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||||
|
|
||||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||||
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
||||||
|
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
||||||
|
|
||||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
|
@ -3140,6 +3066,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->base.scanFlag = MAIN_SCAN;
|
pInfo->base.scanFlag = MAIN_SCAN;
|
||||||
pInfo->base.readHandle = *readHandle;
|
pInfo->base.readHandle = *readHandle;
|
||||||
|
|
||||||
|
pInfo->readIdx = -1;
|
||||||
|
|
||||||
pInfo->base.limitInfo.limit.limit = -1;
|
pInfo->base.limitInfo.limit.limit = -1;
|
||||||
pInfo->base.limitInfo.slimit.limit = -1;
|
pInfo->base.limitInfo.slimit.limit = -1;
|
||||||
pInfo->base.pTableListInfo = pTableListInfo;
|
pInfo->base.pTableListInfo = pTableListInfo;
|
||||||
|
@ -3162,6 +3090,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||||
|
|
||||||
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||||
|
@ -3570,6 +3500,4 @@ static void destoryTableCountScanOperator(void* param) {
|
||||||
|
|
||||||
taosArrayDestroy(pTableCountScanInfo->stbUidList);
|
taosArrayDestroy(pTableCountScanInfo->stbUidList);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
// clang-format on
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tpagedbuf.h"
|
#include "tpagedbuf.h"
|
||||||
#include "tsort.h"
|
#include "tsort.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "tsimplehash.h"
|
||||||
|
|
||||||
struct STupleHandle {
|
struct STupleHandle {
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
|
@ -42,13 +43,15 @@ struct SSortHandle {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
uint64_t totalElapsed;
|
uint64_t totalElapsed;
|
||||||
|
|
||||||
uint64_t maxRows;
|
uint64_t pqMaxRows;
|
||||||
uint32_t maxTupleLength;
|
uint32_t pqMaxTupleLength;
|
||||||
uint32_t sortBufSize;
|
uint32_t pqSortBufSize;
|
||||||
bool forceUsePQSort;
|
bool forceUsePQSort;
|
||||||
BoundedQueue* pBoundedQueue;
|
BoundedQueue* pBoundedQueue;
|
||||||
uint32_t tmpRowIdx;
|
uint32_t tmpRowIdx;
|
||||||
|
|
||||||
|
int64_t mergeLimit;
|
||||||
|
|
||||||
int32_t sourceId;
|
int32_t sourceId;
|
||||||
SSDataBlock* pDataBlock;
|
SSDataBlock* pDataBlock;
|
||||||
SMsortComparParam cmpParam;
|
SMsortComparParam cmpParam;
|
||||||
|
@ -173,8 +176,8 @@ void destroyTuple(void* t) {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
|
||||||
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
|
SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
|
||||||
uint32_t sortBufSize) {
|
uint32_t pqSortBufSize) {
|
||||||
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
||||||
|
|
||||||
pSortHandle->type = type;
|
pSortHandle->type = type;
|
||||||
|
@ -183,10 +186,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
||||||
pSortHandle->pSortInfo = pSortInfo;
|
pSortHandle->pSortInfo = pSortInfo;
|
||||||
pSortHandle->loops = 0;
|
pSortHandle->loops = 0;
|
||||||
|
|
||||||
pSortHandle->maxTupleLength = maxTupleLength;
|
pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
|
||||||
if (maxRows != 0) {
|
if (pqMaxRows != 0) {
|
||||||
pSortHandle->sortBufSize = sortBufSize;
|
pSortHandle->pqSortBufSize = pqSortBufSize;
|
||||||
pSortHandle->maxRows = maxRows;
|
pSortHandle->pqMaxRows = pqMaxRows;
|
||||||
}
|
}
|
||||||
pSortHandle->forceUsePQSort = false;
|
pSortHandle->forceUsePQSort = false;
|
||||||
|
|
||||||
|
@ -194,10 +197,18 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
||||||
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSortHandle->mergeLimit = -1;
|
||||||
|
|
||||||
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
||||||
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
||||||
pSortHandle->cmpParam.cmpGroupId = false;
|
pSortHandle->cmpParam.cmpGroupId = false;
|
||||||
|
pSortHandle->cmpParam.sortType = type;
|
||||||
|
if (type == SORT_BLOCK_TS_MERGE) {
|
||||||
|
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0);
|
||||||
|
pSortHandle->cmpParam.tsSlotId = pOrder->slotId;
|
||||||
|
pSortHandle->cmpParam.order = pOrder->order;
|
||||||
|
pSortHandle->cmpParam.cmpFn = (pOrder->order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
|
||||||
|
}
|
||||||
tsortSetComparFp(pSortHandle, msortComparFn);
|
tsortSetComparFp(pSortHandle, msortComparFn);
|
||||||
|
|
||||||
if (idstr != NULL) {
|
if (idstr != NULL) {
|
||||||
|
@ -469,11 +480,14 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
pSource->pageIndex++;
|
pSource->pageIndex++;
|
||||||
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
|
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
|
||||||
|
qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
|
||||||
(*numOfCompleted) += 1;
|
(*numOfCompleted) += 1;
|
||||||
pSource->src.rowIndex = -1;
|
pSource->src.rowIndex = -1;
|
||||||
pSource->pageIndex = -1;
|
pSource->pageIndex = -1;
|
||||||
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
|
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
|
||||||
} else {
|
} else {
|
||||||
|
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex);
|
||||||
|
|
||||||
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
||||||
|
|
||||||
void* pPage = getBufPage(pHandle->pBuf, *pPgId);
|
void* pPage = getBufPage(pHandle->pBuf, *pPgId);
|
||||||
|
@ -486,7 +500,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -497,6 +510,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
|
||||||
if (pSource->src.pBlock == NULL) {
|
if (pSource->src.pBlock == NULL) {
|
||||||
(*numOfCompleted) += 1;
|
(*numOfCompleted) += 1;
|
||||||
pSource->src.rowIndex = -1;
|
pSource->src.rowIndex = -1;
|
||||||
|
qDebug("adjust merge tree. %d source completed", *numOfCompleted);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -577,53 +591,63 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInfo->size; ++i) {
|
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
|
||||||
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
|
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
|
||||||
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
|
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
|
||||||
|
int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex;
|
||||||
|
int64_t* right1 = (int64_t*)(pRightColInfoData->pData) + pRightSource->src.rowIndex;
|
||||||
|
|
||||||
bool leftNull = false;
|
int ret = pParam->cmpFn(left1, right1);
|
||||||
if (pLeftColInfoData->hasNull) {
|
return ret;
|
||||||
if (pLeftBlock->pBlockAgg == NULL) {
|
} else {
|
||||||
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
|
for (int32_t i = 0; i < pInfo->size; ++i) {
|
||||||
} else {
|
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
|
||||||
leftNull =
|
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
|
||||||
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
|
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
||||||
|
|
||||||
|
bool leftNull = false;
|
||||||
|
if (pLeftColInfoData->hasNull) {
|
||||||
|
if (pLeftBlock->pBlockAgg == NULL) {
|
||||||
|
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||||
|
} else {
|
||||||
|
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
|
||||||
|
pLeftBlock->pBlockAgg[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
bool rightNull = false;
|
||||||
bool rightNull = false;
|
if (pRightColInfoData->hasNull) {
|
||||||
if (pRightColInfoData->hasNull) {
|
if (pRightBlock->pBlockAgg == NULL) {
|
||||||
if (pRightBlock->pBlockAgg == NULL) {
|
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
|
||||||
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
|
} else {
|
||||||
} else {
|
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
||||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
pRightBlock->pBlockAgg[i]);
|
||||||
pRightBlock->pBlockAgg[i]);
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (leftNull && rightNull) {
|
if (leftNull && rightNull) {
|
||||||
continue; // continue to next slot
|
continue; // continue to next slot
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rightNull) {
|
if (rightNull) {
|
||||||
return pOrder->nullFirst ? 1 : -1;
|
return pOrder->nullFirst ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (leftNull) {
|
if (leftNull) {
|
||||||
return pOrder->nullFirst ? -1 : 1;
|
return pOrder->nullFirst ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||||
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
|
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||||
|
|
||||||
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
||||||
|
|
||||||
int ret = fn(left1, right1);
|
int ret = fn(left1, right1);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
return ret;
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -668,6 +692,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
|
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
|
||||||
for (int32_t i = 0; i < sortGroup; ++i) {
|
for (int32_t i = 0; i < sortGroup; ++i) {
|
||||||
|
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
|
||||||
pHandle->sourceId += 1;
|
pHandle->sourceId += 1;
|
||||||
|
|
||||||
int32_t end = (i + 1) * numOfInputSources - 1;
|
int32_t end = (i + 1) * numOfInputSources - 1;
|
||||||
|
@ -690,13 +715,15 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int nMergedRows = 0;
|
||||||
|
|
||||||
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||||
while (1) {
|
while (1) {
|
||||||
if (tsortIsClosed(pHandle)) {
|
if (tsortIsClosed(pHandle)) {
|
||||||
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -720,8 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
|
nMergedRows += pDataBlock->info.rows;
|
||||||
|
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sortComparCleanup(&pHandle->cmpParam);
|
sortComparCleanup(&pHandle->cmpParam);
|
||||||
|
@ -769,39 +800,285 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
|
||||||
return pgSize;
|
return pgSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
static int32_t createPageBuf(SSortHandle* pHandle) {
|
||||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
if (pHandle->pBuf == NULL) {
|
||||||
|
if (!osTempSpaceAvailable()) {
|
||||||
|
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||||
|
qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
|
||||||
|
"tableBlocksBuf", tsTempDir);
|
||||||
|
dBufSetPrintInfo(pHandle->pBuf);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct SBlkMergeSupport {
|
||||||
|
int64_t** aTs;
|
||||||
|
int32_t* aRowIdx;
|
||||||
|
int32_t order;
|
||||||
|
} SBlkMergeSupport;
|
||||||
|
|
||||||
|
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
int32_t right = *(int32_t*)pRight;
|
||||||
|
|
||||||
|
SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
|
||||||
|
if (pSup->aRowIdx[left] == -1) {
|
||||||
|
return 1;
|
||||||
|
} else if (pSup->aRowIdx[right] == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
|
||||||
|
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
|
||||||
|
|
||||||
|
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||||
|
if (pSup->order == TSDB_ORDER_DESC) {
|
||||||
|
ret = -1 * ret;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
|
||||||
|
int32_t pageId = -1;
|
||||||
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
|
taosArrayPush(aPgId, &pageId);
|
||||||
|
|
||||||
|
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
|
||||||
|
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||||
|
|
||||||
|
blockDataToBuf(pPage, blk);
|
||||||
|
|
||||||
|
setBufPageDirty(pPage, true);
|
||||||
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
|
||||||
|
int sz = 0;
|
||||||
|
int numCols = taosArrayGetSize(blk->pDataBlock);
|
||||||
|
if (!blk->info.hasVarCol) {
|
||||||
|
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
|
||||||
|
sz += blockDataGetRowSize(blk);
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < numCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
if (pColInfoData->varmeta.offset[row] != -1) {
|
||||||
|
char* p = colDataGetData(pColInfoData, row);
|
||||||
|
sz += varDataTLen(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
sz += sizeof(pColInfoData->varmeta.offset[0]);
|
||||||
|
} else {
|
||||||
|
sz += pColInfoData->info.bytes;
|
||||||
|
|
||||||
|
if (((rowIdxInPage) & 0x07) == 0) {
|
||||||
|
sz += 1; // bitmap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sz;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
|
||||||
|
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
|
||||||
|
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
|
||||||
|
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
|
||||||
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
|
int32_t numBlks = taosArrayGetSize(aBlk);
|
||||||
|
|
||||||
|
SBlkMergeSupport sup;
|
||||||
|
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
|
||||||
|
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
|
||||||
|
sup.order = order->order;
|
||||||
|
for (int i = 0; i < numBlks; ++i) {
|
||||||
|
SSDataBlock* blk = taosArrayGetP(aBlk, i);
|
||||||
|
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
|
||||||
|
sup.aTs[i] = (int64_t*)col->pData;
|
||||||
|
sup.aRowIdx[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
for (int i = 0; i < numBlks; ++i) {
|
||||||
|
SSDataBlock* blk = taosArrayGetP(aBlk, i);
|
||||||
|
totalRows += blk->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
|
||||||
|
|
||||||
|
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||||
|
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||||
|
int32_t nRows = 0;
|
||||||
|
int32_t nMergedRows = 0;
|
||||||
|
bool mergeLimitReached = false;
|
||||||
|
size_t blkPgSz = pgHeaderSz;
|
||||||
|
|
||||||
|
while (nRows < totalRows) {
|
||||||
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
|
int32_t minRow = sup.aRowIdx[minIdx];
|
||||||
|
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
||||||
|
|
||||||
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
|
||||||
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
|
blkPgSz = pgHeaderSz;
|
||||||
|
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
||||||
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
mergeLimitReached = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||||
|
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||||
|
blkPgSz += bufInc;
|
||||||
|
|
||||||
|
++nRows;
|
||||||
|
|
||||||
|
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
|
||||||
|
sup.aRowIdx[minIdx] = -1;
|
||||||
|
} else {
|
||||||
|
++sup.aRowIdx[minIdx];
|
||||||
|
}
|
||||||
|
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||||
|
}
|
||||||
|
if (pHandle->pDataBlock->info.rows > 0) {
|
||||||
|
if (!mergeLimitReached) {
|
||||||
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
}
|
||||||
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
|
}
|
||||||
|
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
|
||||||
|
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
|
||||||
|
|
||||||
|
taosMemoryFree(sup.aRowIdx);
|
||||||
|
taosMemoryFree(sup.aTs);
|
||||||
|
|
||||||
|
tMergeTreeDestroy(&pTree);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
|
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||||
|
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
|
||||||
|
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
|
||||||
|
|
||||||
|
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
createPageBuf(pHandle);
|
||||||
|
|
||||||
|
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||||
|
int32_t szSort = 0;
|
||||||
|
|
||||||
|
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
|
||||||
|
|
||||||
|
if (pBlk != NULL) {
|
||||||
|
szSort += blockDataGetSize(pBlk);
|
||||||
|
|
||||||
|
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
|
||||||
|
if (ppBlk != NULL) {
|
||||||
|
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
|
||||||
|
blockDataMerge(tBlk, pBlk);
|
||||||
|
} else {
|
||||||
|
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
|
||||||
|
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
|
||||||
|
taosArrayPush(aBlkSort, &tBlk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
|
||||||
|
tSimpleHashClear(mUidBlk);
|
||||||
|
|
||||||
|
int64_t p = taosGetTimestampUs();
|
||||||
|
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
|
||||||
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
|
pHandle->sortElapsed += el;
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
|
||||||
|
blockDataDestroy(taosArrayGetP(aBlkSort, i));
|
||||||
|
}
|
||||||
|
taosArrayClear(aBlkSort);
|
||||||
|
szSort = 0;
|
||||||
|
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
|
||||||
|
}
|
||||||
|
if (pBlk == NULL) {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
tSimpleHashCleanup(mUidBlk);
|
||||||
|
taosArrayDestroy(aBlkSort);
|
||||||
|
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||||
|
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
|
||||||
|
taosArrayDestroy(aExtSrc);
|
||||||
|
|
||||||
|
pHandle->type = SORT_SINGLESOURCE_SORT;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
SSortSource* source = *pSource;
|
||||||
SSortSource* source = *pSource;
|
*pSource = NULL;
|
||||||
*pSource = NULL;
|
|
||||||
|
|
||||||
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pHandle->pDataBlock == NULL) {
|
||||||
|
uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
|
||||||
|
|
||||||
|
// todo, number of pages are set according to the total available sort buffer
|
||||||
|
pHandle->numOfPages = 1024;
|
||||||
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pHandle->beforeFp != NULL) {
|
||||||
|
pHandle->beforeFp(pBlock, pHandle->param);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (source->param && !source->onlyRef) {
|
||||||
|
taosMemoryFree(source->param);
|
||||||
}
|
}
|
||||||
|
if (!source->onlyRef && source->src.pBlock) {
|
||||||
if (pHandle->pDataBlock == NULL) {
|
blockDataDestroy(source->src.pBlock);
|
||||||
uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
source->src.pBlock = NULL;
|
||||||
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
|
|
||||||
|
|
||||||
// todo, number of pages are set according to the total available sort buffer
|
|
||||||
pHandle->numOfPages = 1024;
|
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
|
||||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(source);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (pHandle->beforeFp != NULL) {
|
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||||
pHandle->beforeFp(pBlock, pHandle->param);
|
if (size > sortBufSize) {
|
||||||
}
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
|
int64_t p = taosGetTimestampUs();
|
||||||
code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != 0) {
|
||||||
if (source->param && !source->onlyRef) {
|
if (source->param && !source->onlyRef) {
|
||||||
taosMemoryFree(source->param);
|
taosMemoryFree(source->param);
|
||||||
}
|
}
|
||||||
|
@ -809,74 +1086,67 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
blockDataDestroy(source->src.pBlock);
|
blockDataDestroy(source->src.pBlock);
|
||||||
source->src.pBlock = NULL;
|
source->src.pBlock = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(source);
|
taosMemoryFree(source);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
|
||||||
if (size > sortBufSize) {
|
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
|
||||||
int64_t p = taosGetTimestampUs();
|
|
||||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
|
||||||
if (code != 0) {
|
|
||||||
if (source->param && !source->onlyRef) {
|
|
||||||
taosMemoryFree(source->param);
|
|
||||||
}
|
|
||||||
if (!source->onlyRef && source->src.pBlock) {
|
|
||||||
blockDataDestroy(source->src.pBlock);
|
|
||||||
source->src.pBlock = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(source);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - p;
|
|
||||||
pHandle->sortElapsed += el;
|
|
||||||
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
|
|
||||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (source->param && !source->onlyRef) {
|
|
||||||
taosMemoryFree(source->param);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(source);
|
|
||||||
|
|
||||||
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
|
||||||
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
|
||||||
|
|
||||||
// Perform the in-memory sort and then flush data in the buffer into disk.
|
|
||||||
int64_t p = taosGetTimestampUs();
|
|
||||||
|
|
||||||
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
|
||||||
if (code != 0) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
|
|
||||||
int64_t el = taosGetTimestampUs() - p;
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
pHandle->sortElapsed += el;
|
pHandle->sortElapsed += el;
|
||||||
|
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||||
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
|
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||||
if (size <= sortBufSize && pHandle->pBuf == NULL) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pHandle->cmpParam.numOfSources = 1;
|
return code;
|
||||||
pHandle->inMemSort = true;
|
|
||||||
|
|
||||||
pHandle->loops = 1;
|
|
||||||
pHandle->tupleHandle.rowIndex = -1;
|
|
||||||
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (source->param && !source->onlyRef) {
|
||||||
|
taosMemoryFree(source->param);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(source);
|
||||||
|
|
||||||
|
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
|
||||||
|
size_t size = blockDataGetSize(pHandle->pDataBlock);
|
||||||
|
|
||||||
|
// Perform the in-memory sort and then flush data in the buffer into disk.
|
||||||
|
int64_t p = taosGetTimestampUs();
|
||||||
|
|
||||||
|
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
|
||||||
|
int64_t el = taosGetTimestampUs() - p;
|
||||||
|
pHandle->sortElapsed += el;
|
||||||
|
|
||||||
|
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
|
||||||
|
if (size <= sortBufSize && pHandle->pBuf == NULL) {
|
||||||
|
pHandle->cmpParam.numOfSources = 1;
|
||||||
|
pHandle->inMemSort = true;
|
||||||
|
|
||||||
|
pHandle->loops = 1;
|
||||||
|
pHandle->tupleHandle.rowIndex = -1;
|
||||||
|
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
code = doAddToBuf(pHandle->pDataBlock, pHandle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
|
code = createBlocksQuickSortInitialSources(pHandle);
|
||||||
|
} else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
|
||||||
|
code = createBlocksMergeSortInitialSources(pHandle);
|
||||||
|
}
|
||||||
|
qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,6 +1193,10 @@ void tsortSetClosed(SSortHandle* pHandle) {
|
||||||
atomic_store_8(&pHandle->closed, 2);
|
atomic_store_8(&pHandle->closed, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) {
|
||||||
|
pHandle->mergeLimit = mergeLimit;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
|
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
|
||||||
void* param) {
|
void* param) {
|
||||||
pHandle->fetchfp = fetchFp;
|
pHandle->fetchfp = fetchFp;
|
||||||
|
@ -1002,8 +1276,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) {
|
||||||
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
|
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
|
||||||
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
|
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
|
||||||
if (tsortIsForceUsePQSort(pHandle)) return true;
|
if (tsortIsForceUsePQSort(pHandle)) return true;
|
||||||
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
|
uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
|
||||||
return maxRowsFitInMemory > pHandle->maxRows;
|
return maxRowsFitInMemory > pHandle->pqMaxRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tsortPQCompFn(void* a, void* b, void* param) {
|
static bool tsortPQCompFn(void* a, void* b, void* param) {
|
||||||
|
@ -1049,7 +1323,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
|
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
|
||||||
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
|
pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
|
||||||
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
|
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tsortSetComparFp(pHandle, tupleComparFn);
|
tsortSetComparFp(pHandle, tupleComparFn);
|
||||||
|
|
||||||
|
|
|
@ -361,6 +361,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(groupAction);
|
COPY_SCALAR_FIELD(groupAction);
|
||||||
COPY_SCALAR_FIELD(inputTsOrder);
|
COPY_SCALAR_FIELD(inputTsOrder);
|
||||||
COPY_SCALAR_FIELD(outputTsOrder);
|
COPY_SCALAR_FIELD(outputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,6 +398,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
CLONE_NODE_LIST_FIELD(pTags);
|
CLONE_NODE_LIST_FIELD(pTags);
|
||||||
CLONE_NODE_FIELD(pSubtable);
|
CLONE_NODE_FIELD(pSubtable);
|
||||||
COPY_SCALAR_FIELD(igLastNull);
|
COPY_SCALAR_FIELD(igLastNull);
|
||||||
|
COPY_SCALAR_FIELD(groupOrderScan);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -545,6 +547,7 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
|
||||||
CLONE_NODE_LIST_FIELD(pChildren);
|
CLONE_NODE_LIST_FIELD(pChildren);
|
||||||
COPY_SCALAR_FIELD(inputTsOrder);
|
COPY_SCALAR_FIELD(inputTsOrder);
|
||||||
COPY_SCALAR_FIELD(outputTsOrder);
|
COPY_SCALAR_FIELD(outputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -556,6 +559,7 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
|
||||||
COPY_SCALAR_FIELD(suid);
|
COPY_SCALAR_FIELD(suid);
|
||||||
COPY_SCALAR_FIELD(tableType);
|
COPY_SCALAR_FIELD(tableType);
|
||||||
COPY_OBJECT_FIELD(tableName, sizeof(SName));
|
COPY_OBJECT_FIELD(tableName, sizeof(SName));
|
||||||
|
COPY_SCALAR_FIELD(groupOrderScan);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1559,6 +1559,7 @@ static const char* jkScanPhysiPlanTableId = "TableId";
|
||||||
static const char* jkScanPhysiPlanSTableId = "STableId";
|
static const char* jkScanPhysiPlanSTableId = "STableId";
|
||||||
static const char* jkScanPhysiPlanTableType = "TableType";
|
static const char* jkScanPhysiPlanTableType = "TableType";
|
||||||
static const char* jkScanPhysiPlanTableName = "TableName";
|
static const char* jkScanPhysiPlanTableName = "TableName";
|
||||||
|
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
|
||||||
|
|
||||||
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
||||||
|
@ -1582,6 +1583,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
|
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1608,6 +1612,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
|
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1853,7 +1853,8 @@ enum {
|
||||||
PHY_NODE_CODE_LIMIT,
|
PHY_NODE_CODE_LIMIT,
|
||||||
PHY_NODE_CODE_SLIMIT,
|
PHY_NODE_CODE_SLIMIT,
|
||||||
PHY_NODE_CODE_INPUT_TS_ORDER,
|
PHY_NODE_CODE_INPUT_TS_ORDER,
|
||||||
PHY_NODE_CODE_OUTPUT_TS_ORDER
|
PHY_NODE_CODE_OUTPUT_TS_ORDER,
|
||||||
|
PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -1878,6 +1879,9 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder);
|
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR, pNode->forceCreateNonBlockingOptr);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1910,6 +1914,8 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_NODE_CODE_OUTPUT_TS_ORDER:
|
case PHY_NODE_CODE_OUTPUT_TS_ORDER:
|
||||||
code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder));
|
code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder));
|
||||||
break;
|
break;
|
||||||
|
case PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->forceCreateNonBlockingOptr);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1925,7 +1931,8 @@ enum {
|
||||||
PHY_SCAN_CODE_BASE_UID,
|
PHY_SCAN_CODE_BASE_UID,
|
||||||
PHY_SCAN_CODE_BASE_SUID,
|
PHY_SCAN_CODE_BASE_SUID,
|
||||||
PHY_SCAN_CODE_BASE_TABLE_TYPE,
|
PHY_SCAN_CODE_BASE_TABLE_TYPE,
|
||||||
PHY_SCAN_CODE_BASE_TABLE_NAME
|
PHY_SCAN_CODE_BASE_TABLE_NAME,
|
||||||
|
PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -1950,6 +1957,9 @@ static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName);
|
code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1982,6 +1992,9 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_SCAN_CODE_BASE_TABLE_NAME:
|
case PHY_SCAN_CODE_BASE_TABLE_NAME:
|
||||||
code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName);
|
code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName);
|
||||||
break;
|
break;
|
||||||
|
case PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->groupOrderScan);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,9 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
|
||||||
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
|
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
|
||||||
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
|
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
|
||||||
|
|
||||||
|
bool isPartTableAgg(SAggLogicNode* pAgg);
|
||||||
|
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -363,6 +363,18 @@ static void scanPathOptSetScanOrder(EScanOrder scanOrder, SScanLogicNode* pScan)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
|
||||||
|
if (pScan->tableType != TSDB_SUPER_TABLE) return;
|
||||||
|
|
||||||
|
if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
|
||||||
|
SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent;
|
||||||
|
bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit);
|
||||||
|
if (withSlimit && isPartTableAgg(pAgg)) {
|
||||||
|
pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC};
|
SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC};
|
||||||
int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info);
|
int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info);
|
||||||
|
@ -371,6 +383,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
if (!pCxt->pPlanCxt->streamQuery) {
|
if (!pCxt->pPlanCxt->streamQuery) {
|
||||||
scanPathOptSetScanOrder(info.scanOrder, info.pScan);
|
scanPathOptSetScanOrder(info.scanOrder, info.pScan);
|
||||||
}
|
}
|
||||||
|
scanPathOptSetGroupOrderScan(info.pScan);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
|
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
|
||||||
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
|
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
|
||||||
|
@ -1675,6 +1688,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) {
|
if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) {
|
||||||
SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent);
|
SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent);
|
||||||
|
scanPathOptSetGroupOrderScan(pScan);
|
||||||
pParent->hasGroupKeyOptimized = true;
|
pParent->hasGroupKeyOptimized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -447,6 +447,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
|
||||||
pScanPhysiNode->uid = pScanLogicNode->tableId;
|
pScanPhysiNode->uid = pScanLogicNode->tableId;
|
||||||
pScanPhysiNode->suid = pScanLogicNode->stableId;
|
pScanPhysiNode->suid = pScanLogicNode->stableId;
|
||||||
pScanPhysiNode->tableType = pScanLogicNode->tableType;
|
pScanPhysiNode->tableType = pScanLogicNode->tableType;
|
||||||
|
pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan;
|
||||||
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
||||||
if (NULL != pScanLogicNode->pTagCond) {
|
if (NULL != pScanLogicNode->pTagCond) {
|
||||||
pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
|
pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
|
||||||
|
@ -880,6 +881,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||||
|
|
||||||
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
|
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
|
||||||
pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
|
pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
|
||||||
|
pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr;
|
||||||
|
|
||||||
SNodeList* pPrecalcExprs = NULL;
|
SNodeList* pPrecalcExprs = NULL;
|
||||||
SNodeList* pGroupKeys = NULL;
|
SNodeList* pGroupKeys = NULL;
|
||||||
|
|
|
@ -306,54 +306,6 @@ static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
|
||||||
return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
|
return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
|
||||||
return ((SScanLogicNode*)pNode)->pGroupTags;
|
|
||||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
|
||||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
|
||||||
} else {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
|
|
||||||
if (NULL == pPartKeys) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
SNode* pPartKey = NULL;
|
|
||||||
FOREACH(pPartKey, pPartKeys) {
|
|
||||||
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
|
||||||
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
|
||||||
}
|
|
||||||
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
|
|
||||||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool stbSplNotSystemScan(SLogicNode* pNode) {
|
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
|
||||||
return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType;
|
|
||||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
|
||||||
return stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
|
|
||||||
} else {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
|
|
||||||
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (NULL != pAgg->pGroupKeys) {
|
|
||||||
return stbSplHasPartTbname(pAgg->pGroupKeys) &&
|
|
||||||
stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
|
|
||||||
}
|
|
||||||
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
switch (nodeType(pNode)) {
|
switch (nodeType(pNode)) {
|
||||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||||
|
@ -364,7 +316,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
|
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
|
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
|
||||||
stbSplIsPartTableAgg((SAggLogicNode*)pNode)) &&
|
isPartTableAgg((SAggLogicNode*)pNode)) &&
|
||||||
stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
|
stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
return stbSplNeedSplitWindow(streamQuery, pNode);
|
return stbSplNeedSplitWindow(streamQuery, pNode);
|
||||||
|
@ -778,10 +730,6 @@ static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
|
|
||||||
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
|
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
|
||||||
case WINDOW_TYPE_INTERVAL:
|
case WINDOW_TYPE_INTERVAL:
|
||||||
|
@ -834,7 +782,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
|
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
|
||||||
return stbSplSplitWindowForPartTable(pCxt, pInfo);
|
return stbSplSplitWindowForPartTable(pCxt, pInfo);
|
||||||
} else {
|
} else {
|
||||||
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
|
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
|
||||||
|
@ -920,7 +868,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
|
if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
|
||||||
return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
|
return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
|
||||||
}
|
}
|
||||||
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
|
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
|
||||||
|
|
|
@ -321,3 +321,57 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool stbNotSystemScan(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||||
|
return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType;
|
||||||
|
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
|
return stbNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool stbHasPartTbname(SNodeList* pPartKeys) {
|
||||||
|
if (NULL == pPartKeys) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SNode* pPartKey = NULL;
|
||||||
|
FOREACH(pPartKey, pPartKeys) {
|
||||||
|
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
||||||
|
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
||||||
|
}
|
||||||
|
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
|
||||||
|
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||||
|
return ((SScanLogicNode*)pNode)->pGroupTags;
|
||||||
|
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
|
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isPartTableAgg(SAggLogicNode* pAgg) {
|
||||||
|
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (NULL != pAgg->pGroupKeys) {
|
||||||
|
return stbHasPartTbname(pAgg->pGroupKeys) &&
|
||||||
|
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
|
||||||
|
}
|
||||||
|
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
|
||||||
|
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -95,6 +95,23 @@ class TDTestCase:
|
||||||
tdSql.checkEqual(f'{db}',tdSql.queryResult[0][0])
|
tdSql.checkEqual(f'{db}',tdSql.queryResult[0][0])
|
||||||
tdSql.checkEqual(f'CREATE DATABASE `{db}`',tdSql.queryResult[0][1])
|
tdSql.checkEqual(f'CREATE DATABASE `{db}`',tdSql.queryResult[0][1])
|
||||||
|
|
||||||
|
def show_create_systb_sql(self):
|
||||||
|
for param in self.ins_param_list:
|
||||||
|
tdSql.query(f'show create table information_schema.ins_{param}')
|
||||||
|
tdSql.checkEqual(f'ins_{param}',tdSql.queryResult[0][0])
|
||||||
|
|
||||||
|
tdSql.execute(f'use information_schema')
|
||||||
|
tdSql.query(f'show create table ins_{param}')
|
||||||
|
tdSql.checkEqual(f'ins_{param}',tdSql.queryResult[0][0])
|
||||||
|
|
||||||
|
for param in self.perf_param_list:
|
||||||
|
tdSql.query(f'show create table performance_schema.perf_{param}')
|
||||||
|
tdSql.checkEqual(f'perf_{param}',tdSql.queryResult[0][0])
|
||||||
|
|
||||||
|
tdSql.execute(f'use performance_schema')
|
||||||
|
tdSql.query(f'show create table perf_{param}')
|
||||||
|
tdSql.checkEqual(f'perf_{param}',tdSql.queryResult[0][0])
|
||||||
|
|
||||||
def show_create_sql(self):
|
def show_create_sql(self):
|
||||||
create_db_sql = self.set_create_database_sql(self.db_param)
|
create_db_sql = self.set_create_database_sql(self.db_param)
|
||||||
print(create_db_sql)
|
print(create_db_sql)
|
||||||
|
@ -200,6 +217,7 @@ class TDTestCase:
|
||||||
self.perf_check()
|
self.perf_check()
|
||||||
self.show_create_sql()
|
self.show_create_sql()
|
||||||
self.show_create_sysdb_sql()
|
self.show_create_sysdb_sql()
|
||||||
|
self.show_create_systb_sql()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -202,7 +202,7 @@ class TDTestCase:
|
||||||
if retCode != "TAOS_OK":
|
if retCode != "TAOS_OK":
|
||||||
tdLog.exit("taos -s fail")
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
tdSql.query("select count(*) from stb group by tg1")
|
tdSql.query("select count(*) from stb group by tg1 order by count(*) desc")
|
||||||
tdSql.checkData(0, 0, 2)
|
tdSql.checkData(0, 0, 2)
|
||||||
tdSql.checkData(1, 0, 1)
|
tdSql.checkData(1, 0, 1)
|
||||||
|
|
||||||
|
|
|
@ -79,6 +79,11 @@ class TDTestCase:
|
||||||
tdSql.query("select count(*) from (select * from meters order by ts desc)")
|
tdSql.query("select count(*) from (select * from meters order by ts desc)")
|
||||||
tdSql.checkData(0, 0, allCnt)
|
tdSql.checkData(0, 0, allCnt)
|
||||||
|
|
||||||
|
rowCnt = tdSql.query("select tbname, count(*) from meters partition by tbname slimit 11")
|
||||||
|
if rowCnt != 10:
|
||||||
|
tdLog.exit("partition by tbname should return 10 rows of table data which is " + str(rowCnt))
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
binPath = self.getPath()
|
binPath = self.getPath()
|
||||||
|
|
Loading…
Reference in New Issue