[td-5881]<feature>: Sort the result according to any single column in the outer query result is allowed.
This commit is contained in:
parent
f415a994fe
commit
799c642cec
|
@ -29,15 +29,16 @@ extern "C" {
|
|||
#include "tsched.h"
|
||||
#include "tsclient.h"
|
||||
|
||||
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
|
||||
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
|
||||
|
||||
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
|
||||
|
||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
|
||||
|
||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
|
||||
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
|
||||
|
||||
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
|
||||
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
|
||||
|
||||
#pragma pack(push,1)
|
||||
|
|
|
@ -224,6 +224,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
|||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
|
||||
|
@ -1622,12 +1623,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
startPos = pSDataBlock->info.rows - 1;
|
||||
|
||||
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
|
||||
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
}
|
||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
|
@ -2213,6 +2214,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_StateWindow: {
|
||||
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
|
||||
|
@ -2229,24 +2231,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
|
||||
case OP_Filter: { // todo refactor
|
||||
int32_t numOfFilterCols = 0;
|
||||
// if (pQueryAttr->numOfFilterCols > 0) {
|
||||
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
|
||||
// } else {
|
||||
if (pQueryAttr->stableQuery) {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
|
||||
} else {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
|
||||
}
|
||||
// }
|
||||
if (pQueryAttr->stableQuery) {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
|
||||
} else {
|
||||
SColumnInfo* pColInfo =
|
||||
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
|
||||
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
|
||||
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
|
||||
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2258,11 +2256,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
|
||||
case OP_MultiwayMergeSort: {
|
||||
bool groupMix = true;
|
||||
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
groupMix = false;
|
||||
}
|
||||
|
||||
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
|
||||
4096, merger, groupMix); // TODO hack it
|
||||
4096, merger, groupMix); // TODO hack it
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2283,6 +2282,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
break;
|
||||
}
|
||||
|
||||
case OP_Order: {
|
||||
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
assert(0);
|
||||
}
|
||||
|
@ -3092,7 +3096,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
// check if this data block is required to load
|
||||
if ((*status) != BLK_DATA_ALL_NEEDED) {
|
||||
bool needFilter = true;
|
||||
|
||||
|
||||
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
|
||||
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
|
||||
|
@ -5404,6 +5408,108 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
|
||||
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
|
||||
|
||||
int32_t numOfCols = pSrc->info.numOfCols;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
|
||||
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
|
||||
char* tmp = realloc(pCol2->pData, newSize);
|
||||
if (tmp != NULL) {
|
||||
pCol2->pData = tmp;
|
||||
int32_t offset = pCol2->info.bytes * pDest->info.rows;
|
||||
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
|
||||
} else {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
pDest->info.rows += pSrc->info.rows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSort(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOrderOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while(1) {
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
|
||||
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
// start to flush data into disk and try do multiway merge sort
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
|
||||
void** pCols = calloc(numOfCols, POINTER_BYTES);
|
||||
SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
|
||||
pCols[i] = p1->pData;
|
||||
pSchema[i].colId = p1->info.colId;
|
||||
pSchema[i].bytes = p1->info.bytes;
|
||||
pSchema[i].type = (uint8_t) p1->info.type;
|
||||
}
|
||||
|
||||
__compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
|
||||
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
|
||||
|
||||
tfree(pCols);
|
||||
tfree(pSchema);
|
||||
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
|
||||
|
||||
{
|
||||
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
|
||||
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData col = {{0}};
|
||||
col.info.bytes = pExpr->base.resBytes;
|
||||
col.info.colId = pExpr->base.resColId;
|
||||
col.info.type = pExpr->base.resType;
|
||||
taosArrayPush(pDataBlock->pDataBlock, &col);
|
||||
}
|
||||
|
||||
pDataBlock->info.numOfCols = numOfOutput;
|
||||
pInfo->pDataBlock = pDataBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "InMemoryOrder";
|
||||
pOperator->operatorType = OP_Order;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exec = doSort;
|
||||
pOperator->cleanup = destroyOrderOperatorInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
|
||||
return pTableScanInfo->order;
|
||||
}
|
||||
|
@ -6404,6 +6510,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
|
||||
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
|
||||
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
|
||||
|
@ -6752,7 +6863,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
pOperator->exec = doFill;
|
||||
pOperator->cleanup = destroySFillOperatorInfo;
|
||||
|
||||
|
|
Loading…
Reference in New Issue