fix(query): sort according to the generated column data in order by operator.
This commit is contained in:
parent
e4f0a0fc67
commit
4e90982c22
|
@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
|
||||||
int32_t mapIndex = i;
|
int32_t mapIndex = i;
|
||||||
if (pIndexMap) {
|
// if (pIndexMap) {
|
||||||
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
||||||
}
|
// }
|
||||||
|
|
||||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
||||||
|
@ -491,9 +491,14 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||||
|
|
||||||
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
||||||
bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
|
bool isNull = false;
|
||||||
char* p = colDataGetData(pColData, j);
|
if (pBlock->pBlockAgg == NULL) {
|
||||||
|
isNull = colDataIsNull_s(pColData, pBlock->info.rows);
|
||||||
|
} else {
|
||||||
|
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* p = colDataGetData(pColData, j);
|
||||||
colDataAppend(pDstCol, j - startIndex, p, isNull);
|
colDataAppend(pDstCol, j - startIndex, p, isNull);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -578,9 +578,8 @@ typedef struct SSortOperatorInfo {
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
SArray* pSortInfo;
|
SArray* pSortInfo;
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SArray* inputSlotMap; // for index map from table scan output
|
SArray* pColMatchInfo; // for index map from table scan output
|
||||||
int32_t bufPageSize;
|
int32_t bufPageSize;
|
||||||
// int32_t numOfRowsInRes;
|
|
||||||
|
|
||||||
// TODO extact struct
|
// TODO extact struct
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
|
@ -645,7 +644,7 @@ void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
||||||
SSDataBlock* loadNextDataBlock(void* param);
|
SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||||
|
|
|
@ -117,18 +117,25 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle);
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pHandle
|
* @param pHandle
|
||||||
* @param colIndex
|
* @param colId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex);
|
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pHandle
|
* @param pHandle
|
||||||
* @param colIndex
|
* @param colId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex);
|
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param pSortHandle
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -3520,7 +3520,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
@ -4701,7 +4701,7 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* createIndexMap(SNodeList* pNodeList);
|
static SArray* createIndexMap(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
|
@ -4870,16 +4870,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = NULL;
|
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||||
if (pSortPhyNode->pExprs != NULL) {
|
|
||||||
pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
|
||||||
}
|
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo);
|
int32_t numOfOutputCols = 0;
|
||||||
|
SArray* pColList =
|
||||||
|
extractColMatchInfo(pSortPhyNode->pTargets, pSortPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
||||||
|
|
||||||
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
@ -5037,7 +5037,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) {
|
SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
|
||||||
if (pList == NULL) {
|
if (pList == NULL) {
|
||||||
|
@ -5052,22 +5052,7 @@ SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) {
|
||||||
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
||||||
|
|
||||||
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
|
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
|
||||||
|
bi.slotId = pColNode->slotId;
|
||||||
bool found = false;
|
|
||||||
for (int32_t j = 0; j < LIST_LENGTH(pNodeListTarget); ++j) {
|
|
||||||
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeListTarget, j);
|
|
||||||
|
|
||||||
SColumnNode* pColNodeT = (SColumnNode*)pTarget->pExpr;
|
|
||||||
if (pColNode->slotId == pColNodeT->slotId) { // to find slotId in PhysiSort OutputDataBlockDesc
|
|
||||||
bi.slotId = pTarget->slotId;
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!found) {
|
|
||||||
qError("sort slot id does not found");
|
|
||||||
}
|
|
||||||
taosArrayPush(pList, &bi);
|
taosArrayPush(pList, &bi);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
||||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
|
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
int32_t rowSize = pResBlock->info.rowSize;
|
int32_t rowSize = pResBlock->info.rowSize;
|
||||||
|
@ -20,17 +20,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
|
|
||||||
|
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
|
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->inputSlotMap = pIndexMap;
|
pInfo->pColMatchInfo= pColMatchColInfo;
|
||||||
pOperator->name = "SortOperator";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
|
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
||||||
|
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
|
||||||
|
// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
|
||||||
|
@ -45,14 +47,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO merge aggregate super table
|
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
|
colDataAppendNULL(pColInfo, pBlock->info.rows);
|
||||||
} else {
|
} else {
|
||||||
char* pData = tsortGetValue(pTupleHandle, i);
|
char* pData = tsortGetValue(pTupleHandle, i);
|
||||||
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
|
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
|
||||||
|
@ -62,11 +62,12 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
@ -74,12 +75,32 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
if (pDataBlock->info.rows >= capacity) {
|
if (p->info.rows >= capacity) {
|
||||||
return pDataBlock;
|
return pDataBlock;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (p->info.rows > 0) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
|
||||||
|
for(int32_t j = 0; j < p->info.numOfCols; ++j) {
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, j);
|
||||||
|
if (pSrc->info.colId == pmInfo->colId) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
pDataBlock->info.capacity = p->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,16 +127,16 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
SSortOperatorInfo* pInfo = pOperator->info;
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT,
|
// int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str);
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
|
||||||
|
-1, -1, NULL, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
|
||||||
|
|
||||||
|
|
||||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
ps->param = pOperator->pDownstream[0];
|
ps->param = pOperator->pDownstream[0];
|
||||||
tsortAddSource(pInfo->pSortHandle, ps);
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
|
@ -127,7 +148,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -135,5 +156,5 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
taosArrayDestroy(pInfo->inputSlotMap);
|
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,25 +64,8 @@ struct SSortHandle {
|
||||||
|
|
||||||
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
|
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
|
||||||
|
|
||||||
static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
|
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
return createOneDataBlock(pSortHandle->pDataBlock, false);
|
||||||
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
|
||||||
pBlock->info.numOfCols = numOfCols;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData colInfo = {0};
|
|
||||||
|
|
||||||
colInfo.info.type = pSchema[i].type;
|
|
||||||
colInfo.info.bytes = pSchema[i].bytes;
|
|
||||||
colInfo.info.colId = pSchema[i].colId;
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(colInfo.info.type)) {
|
|
||||||
pBlock->info.hasVarCol = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pBlock;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -98,7 +81,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
|
||||||
pSortHandle->numOfPages = numOfPages;
|
pSortHandle->numOfPages = numOfPages;
|
||||||
pSortHandle->pSortInfo = pSortInfo;
|
pSortHandle->pSortInfo = pSortInfo;
|
||||||
pSortHandle->pIndexMap = pIndexMap;
|
pSortHandle->pIndexMap = pIndexMap;
|
||||||
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
|
}
|
||||||
|
|
||||||
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
||||||
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
pSortHandle->cmpParam.orderInfo = pSortInfo;
|
||||||
|
@ -530,6 +516,17 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
if (pHandle->pDataBlock == NULL) {
|
if (pHandle->pDataBlock == NULL) {
|
||||||
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
|
||||||
|
|
||||||
|
// calculate the buffer pages according to the total available buffers.
|
||||||
|
int32_t rowSize = blockDataGetRowSize(pBlock);
|
||||||
|
if (rowSize * 4 > 4096) {
|
||||||
|
pHandle->pageSize = rowSize * 4;
|
||||||
|
} else {
|
||||||
|
pHandle->pageSize = 4096;
|
||||||
|
}
|
||||||
|
// todo!!
|
||||||
|
pHandle->numOfPages = 1024;
|
||||||
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform the scalar function calculation before apply the sort
|
// perform the scalar function calculation before apply the sort
|
||||||
|
@ -538,7 +535,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo relocate the columns
|
// todo relocate the columns
|
||||||
|
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -689,7 +685,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
|
||||||
|
|
||||||
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
|
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
|
||||||
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
|
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
|
||||||
return colDataIsNull(pColInfoSrc, 0, pVHandle->rowIndex, NULL);
|
return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||||
|
|
|
@ -1023,8 +1023,7 @@ static void vectorMathMultiplyHelper(SColumnInfoData* pLeftCol, SColumnInfoData*
|
||||||
colDataAppendNULL(pOutputCol, i);
|
colDataAppendNULL(pOutputCol, i);
|
||||||
continue; // TODO set null or ignore
|
continue; // TODO set null or ignore
|
||||||
}
|
}
|
||||||
*output = getVectorDoubleValueFnLeft(LEFT_COL, i)
|
*output = getVectorDoubleValueFnLeft(LEFT_COL, i) * getVectorDoubleValueFnRight(RIGHT_COL, 0);
|
||||||
* getVectorDoubleValueFnRight(RIGHT_COL, 0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1050,8 +1049,7 @@ void vectorMathMultiply(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
|
||||||
colDataAppendNULL(pOutputCol, i);
|
colDataAppendNULL(pOutputCol, i);
|
||||||
continue; // TODO set null or ignore
|
continue; // TODO set null or ignore
|
||||||
}
|
}
|
||||||
*output = getVectorDoubleValueFnLeft(LEFT_COL, i)
|
*output = getVectorDoubleValueFnLeft(LEFT_COL, i) * getVectorDoubleValueFnRight(RIGHT_COL, i);
|
||||||
* getVectorDoubleValueFnRight(RIGHT_COL, i);
|
|
||||||
}
|
}
|
||||||
} else if (pLeft->numOfRows == 1) {
|
} else if (pLeft->numOfRows == 1) {
|
||||||
vectorMathMultiplyHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i);
|
vectorMathMultiplyHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i);
|
||||||
|
|
Loading…
Reference in New Issue