feat: the table merge scan operator supports order by ts and pk
This commit is contained in:
parent
8c5ec3205c
commit
fdff529860
|
@ -56,10 +56,11 @@ typedef struct SMsortComparParam {
|
||||||
bool cmpGroupId;
|
bool cmpGroupId;
|
||||||
|
|
||||||
int32_t sortType;
|
int32_t sortType;
|
||||||
// the following field to speed up when sortType == SORT_BLOCK_TS_MERGE
|
// the following fields to speed up sorting when sortType == SORT_BLOCK_TS_MERGE
|
||||||
int32_t tsSlotId;
|
int32_t tsSlotId;
|
||||||
int32_t order;
|
int32_t tsOrder;
|
||||||
__compar_fn_t cmpFn;
|
__compar_fn_t cmpTsFn;
|
||||||
|
void* pPkOrder; // SBlockOrderInfo*
|
||||||
} SMsortComparParam;
|
} SMsortComparParam;
|
||||||
|
|
||||||
typedef struct SSortHandle SSortHandle;
|
typedef struct SSortHandle SSortHandle;
|
||||||
|
|
|
@ -1341,6 +1341,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
c.srcSlotId = pColNode->slotId;
|
c.srcSlotId = pColNode->slotId;
|
||||||
c.dstSlotId = pNode->slotId;
|
c.dstSlotId = pNode->slotId;
|
||||||
c.isPk = pColNode->isPk;
|
c.isPk = pColNode->isPk;
|
||||||
|
c.dataType = pColNode->node.resType;
|
||||||
taosArrayPush(pList, &c);
|
taosArrayPush(pList, &c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3985,34 +3985,37 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
|
||||||
|
|
||||||
|
|
||||||
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
|
SArray* generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order) {
|
||||||
|
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
|
SBlockOrderInfo biTs = {0};
|
||||||
|
SBlockOrderInfo biPk = {0};
|
||||||
|
|
||||||
int32_t tsTargetSlotId = 0;
|
int32_t tsTargetSlotId = 0;
|
||||||
int32_t pkTargetSlotId = -1;
|
int32_t pkTargetSlotId = -1;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
||||||
SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
|
SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
|
||||||
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
tsTargetSlotId = colInfo->dstSlotId;
|
tsTargetSlotId = colInfo->dstSlotId;
|
||||||
}
|
|
||||||
if (colInfo->isPk) {
|
|
||||||
pkTargetSlotId = colInfo->dstSlotId;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
|
||||||
SBlockOrderInfo biTs = {0};
|
|
||||||
biTs.order = order;
|
biTs.order = order;
|
||||||
biTs.slotId = tsTargetSlotId;
|
biTs.slotId = tsTargetSlotId;
|
||||||
biTs.nullFirst = NULL_ORDER_FIRST;
|
biTs.nullFirst = (order == TSDB_ORDER_ASC);
|
||||||
taosArrayPush(pList, &biTs);
|
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||||
|
}
|
||||||
if (pkTargetSlotId != -1) {
|
//TODO: order by just ts
|
||||||
SBlockOrderInfo biPk = {0};
|
if (colInfo->isPk) {
|
||||||
|
pkTargetSlotId = colInfo->dstSlotId;
|
||||||
biPk.order = order;
|
biPk.order = order;
|
||||||
biPk.slotId = pkTargetSlotId;
|
biPk.slotId = pkTargetSlotId;
|
||||||
biPk.nullFirst = NULL_ORDER_FIRST;
|
biPk.nullFirst = (order == TSDB_ORDER_ASC);
|
||||||
taosArrayPush(pList, &biPk);
|
biPk.compFn = getKeyComparFunc(colInfo->dataType.type, order);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pList;
|
taosArrayPush(pSortInfo, &biTs);
|
||||||
|
if (pkTargetSlotId != -1) {
|
||||||
|
taosArrayPush(pSortInfo, &biPk);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pSortInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||||
|
|
|
@ -108,8 +108,8 @@ struct SSortHandle {
|
||||||
int32_t extRowsPageSize;
|
int32_t extRowsPageSize;
|
||||||
int32_t extRowsMemSize;
|
int32_t extRowsMemSize;
|
||||||
int32_t srcTsSlotId;
|
int32_t srcTsSlotId;
|
||||||
SBlockOrderInfo extRowsOrderInfo;
|
SArray* aExtRowsOrders;
|
||||||
|
bool bSortPk;
|
||||||
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
|
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
|
||||||
void* mergeLimitReachedParam;
|
void* mergeLimitReachedParam;
|
||||||
};
|
};
|
||||||
|
@ -261,10 +261,17 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
||||||
pSortHandle->cmpParam.cmpGroupId = false;
|
pSortHandle->cmpParam.cmpGroupId = false;
|
||||||
pSortHandle->cmpParam.sortType = type;
|
pSortHandle->cmpParam.sortType = type;
|
||||||
if (type == SORT_BLOCK_TS_MERGE) {
|
if (type == SORT_BLOCK_TS_MERGE) {
|
||||||
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0);
|
SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
|
||||||
pSortHandle->cmpParam.tsSlotId = pOrder->slotId;
|
pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
|
||||||
pSortHandle->cmpParam.order = pOrder->order;
|
pSortHandle->cmpParam.tsOrder = pTsOrder->order;
|
||||||
pSortHandle->cmpParam.cmpFn = (pOrder->order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
|
pSortHandle->cmpParam.cmpTsFn = pTsOrder->compFn;
|
||||||
|
if (taosArrayGetSize(pSortHandle->pSortInfo) == 2) {
|
||||||
|
pSortHandle->cmpParam.pPkOrder = taosArrayGet(pSortHandle->pSortInfo, 1);
|
||||||
|
pSortHandle->bSortPk = true;
|
||||||
|
} else {
|
||||||
|
pSortHandle->cmpParam.pPkOrder = NULL;
|
||||||
|
pSortHandle->bSortPk = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tsortSetComparFp(pSortHandle, msortComparFn);
|
tsortSetComparFp(pSortHandle, msortComparFn);
|
||||||
|
|
||||||
|
@ -347,6 +354,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
||||||
destroySortMemFile(pSortHandle);
|
destroySortMemFile(pSortHandle);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pSortHandle->pSortInfo);
|
taosArrayDestroy(pSortHandle->pSortInfo);
|
||||||
|
taosArrayDestroy(pSortHandle->aExtRowsOrders);
|
||||||
|
pSortHandle->aExtRowsOrders = NULL;
|
||||||
taosMemoryFreeClear(pSortHandle);
|
taosMemoryFreeClear(pSortHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -624,6 +633,56 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
|
||||||
return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
|
return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: improve this function performance
|
||||||
|
|
||||||
|
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
|
||||||
|
int32_t leftRowIndex, int32_t rightRowIndex, SBlockOrderInfo* pOrder) {
|
||||||
|
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
|
||||||
|
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
||||||
|
|
||||||
|
bool isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
|
||||||
|
if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
|
||||||
|
bool leftNull = false;
|
||||||
|
if (pLeftColInfoData->hasNull) {
|
||||||
|
if (pLeftBlock->pBlockAgg == NULL) {
|
||||||
|
leftNull = colDataIsNull_t(pLeftColInfoData, leftRowIndex, isVarType);
|
||||||
|
} else {
|
||||||
|
leftNull =
|
||||||
|
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, leftRowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool rightNull = false;
|
||||||
|
if (pRightColInfoData->hasNull) {
|
||||||
|
if (pRightBlock->pBlockAgg == NULL) {
|
||||||
|
rightNull = colDataIsNull_t(pRightColInfoData, rightRowIndex, isVarType);
|
||||||
|
} else {
|
||||||
|
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, rightRowIndex,
|
||||||
|
pRightBlock->pBlockAgg[pOrder->slotId]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (leftNull && rightNull) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rightNull) {
|
||||||
|
return pOrder->nullFirst ? 1 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (leftNull) {
|
||||||
|
return pOrder->nullFirst ? -1 : 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *left1, *right1;
|
||||||
|
left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
|
||||||
|
right1 = colDataGetData(pRightColInfoData, rightRowIndex);
|
||||||
|
__compar_fn_t fn = pOrder->compFn;
|
||||||
|
int ret = fn(left1, right1);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
int32_t pLeftIdx = *(int32_t*)pLeft;
|
int32_t pLeftIdx = *(int32_t*)pLeft;
|
||||||
int32_t pRightIdx = *(int32_t*)pRight;
|
int32_t pRightIdx = *(int32_t*)pRight;
|
||||||
|
@ -654,12 +713,16 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
|
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
|
||||||
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
|
SColumnInfoData* pLeftTsCol = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
|
||||||
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
|
SColumnInfoData* pRightTsCol = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
|
||||||
int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex;
|
int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
|
||||||
int64_t* right1 = (int64_t*)(pRightColInfoData->pData) + pRightSource->src.rowIndex;
|
int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
|
||||||
|
|
||||||
int ret = pParam->cmpFn(left1, right1);
|
int ret = pParam->cmpTsFn(leftTs, rightTs);
|
||||||
|
if (ret == 0 && pParam->pPkOrder) {
|
||||||
|
ret = tsortComparBlockCell(pLeftBlock, pRightBlock,
|
||||||
|
pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
} else {
|
} else {
|
||||||
bool isVarType;
|
bool isVarType;
|
||||||
|
@ -1208,7 +1271,8 @@ static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource
|
||||||
saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
|
saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
|
||||||
|
|
||||||
SSDataBlock* pBlock = pHandle->pDataBlock;
|
SSDataBlock* pBlock = pHandle->pDataBlock;
|
||||||
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId);
|
SBlockOrderInfo* extRowsTsOrder = taosArrayGet(pHandle->aExtRowsOrders, 0);
|
||||||
|
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, extRowsTsOrder->slotId);
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
char* pData = colDataGetData(pSrcTsCol, *rowIndex);
|
char* pData = colDataGetData(pSrcTsCol, *rowIndex);
|
||||||
colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
|
colDataSetVal(pTsCol, pBlock->info.rows, pData, false);
|
||||||
|
@ -1222,11 +1286,25 @@ static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource
|
||||||
SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
|
SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3);
|
||||||
colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
|
colDataSetInt32(pLengthCol, pBlock->info.rows, &length);
|
||||||
|
|
||||||
|
if (pHandle->bSortPk) {
|
||||||
|
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
|
||||||
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSource->pDataBlock, extRowsPkOrder->slotId);
|
||||||
|
SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, 4);
|
||||||
|
if (colDataIsNull_s(pSrcPkCol, *rowIndex)) {
|
||||||
|
colDataSetNULL(pPkCol, pBlock->info.rows);
|
||||||
|
} else {
|
||||||
|
char* pPkData = colDataGetData(pSrcPkCol, *rowIndex);
|
||||||
|
colDataSetVal(pPkCol, pBlock->info.rows, pPkData, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
*rowIndex += 1;
|
*rowIndex += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void initRowIdSort(SSortHandle* pHandle) {
|
static void initRowIdSort(SSortHandle* pHandle) {
|
||||||
|
SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
|
||||||
|
SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
|
||||||
|
|
||||||
SSDataBlock* pSortInput = createDataBlock();
|
SSDataBlock* pSortInput = createDataBlock();
|
||||||
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
|
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
|
||||||
|
@ -1237,7 +1315,10 @@ static void initRowIdSort(SSortHandle* pHandle) {
|
||||||
blockDataAppendColInfo(pSortInput, &offsetCol);
|
blockDataAppendColInfo(pSortInput, &offsetCol);
|
||||||
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
|
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
|
||||||
blockDataAppendColInfo(pSortInput, &lengthCol);
|
blockDataAppendColInfo(pSortInput, &lengthCol);
|
||||||
|
if (pHandle->bSortPk) {
|
||||||
|
SColumnInfoData pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
|
||||||
|
blockDataAppendColInfo(pSortInput, &pkCol);
|
||||||
|
}
|
||||||
blockDataDestroy(pHandle->pDataBlock);
|
blockDataDestroy(pHandle->pDataBlock);
|
||||||
pHandle->pDataBlock = pSortInput;
|
pHandle->pDataBlock = pSortInput;
|
||||||
|
|
||||||
|
@ -1246,15 +1327,24 @@ static void initRowIdSort(SSortHandle* pHandle) {
|
||||||
pHandle->pageSize = 256 * 1024; // 256k
|
pHandle->pageSize = 256 * 1024; // 256k
|
||||||
pHandle->numOfPages = 256;
|
pHandle->numOfPages = 256;
|
||||||
|
|
||||||
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
|
||||||
SBlockOrderInfo bi = {0};
|
|
||||||
bi.order = pOrder->order;
|
|
||||||
bi.slotId = 0;
|
|
||||||
bi.nullFirst = NULL_ORDER_FIRST;
|
|
||||||
|
|
||||||
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
taosArrayPush(aOrder, &bi);
|
|
||||||
|
|
||||||
|
SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||||
|
SBlockOrderInfo biTs = {0};
|
||||||
|
biTs.order = pTsOrder->order;
|
||||||
|
biTs.slotId = 0;
|
||||||
|
biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
|
||||||
|
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
|
||||||
|
taosArrayPush(aOrder, &biTs);
|
||||||
|
|
||||||
|
if (pHandle->bSortPk) {
|
||||||
|
SBlockOrderInfo biPk = {0};
|
||||||
|
biPk.order = pkOrder->order;
|
||||||
|
biPk.slotId = 4;
|
||||||
|
biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
|
||||||
|
biPk.compFn = getKeyComparFunc(extPkCol->info.type, biPk.order);
|
||||||
|
taosArrayPush(aOrder, &biPk);
|
||||||
|
}
|
||||||
taosArrayDestroy(pHandle->pSortInfo);
|
taosArrayDestroy(pHandle->pSortInfo);
|
||||||
pHandle->pSortInfo = aOrder;
|
pHandle->pSortInfo = aOrder;
|
||||||
return;
|
return;
|
||||||
|
@ -1263,8 +1353,7 @@ static void initRowIdSort(SSortHandle* pHandle) {
|
||||||
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
|
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
|
||||||
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
|
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
|
||||||
pHandle->extRowsMemSize = extRowsMemSize;
|
pHandle->extRowsMemSize = extRowsMemSize;
|
||||||
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
pHandle->aExtRowsOrders = taosArrayDup(pHandle->pSortInfo, NULL);
|
||||||
pHandle->extRowsOrderInfo = *pOrder;
|
|
||||||
initRowIdSort(pHandle);
|
initRowIdSort(pHandle);
|
||||||
if (!osTempSpaceAvailable()) {
|
if (!osTempSpaceAvailable()) {
|
||||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||||
|
@ -1279,7 +1368,10 @@ int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
|
||||||
typedef struct SBlkMergeSupport {
|
typedef struct SBlkMergeSupport {
|
||||||
int64_t** aTs;
|
int64_t** aTs;
|
||||||
int32_t* aRowIdx;
|
int32_t* aRowIdx;
|
||||||
int32_t order;
|
int32_t tsOrder;
|
||||||
|
|
||||||
|
SBlockOrderInfo* pPkOrder;
|
||||||
|
SSDataBlock** aBlks;
|
||||||
} SBlkMergeSupport;
|
} SBlkMergeSupport;
|
||||||
|
|
||||||
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
|
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
|
@ -1297,12 +1389,36 @@ static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* par
|
||||||
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
|
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
|
||||||
|
|
||||||
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||||
if (pSup->order == TSDB_ORDER_DESC) {
|
if (pSup->tsOrder == TSDB_ORDER_DESC) {
|
||||||
ret = -1 * ret;
|
ret = -1 * ret;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t blockCompareTsPkFn(const void* pLeft, const void* pRight, void* param) {
|
||||||
|
int32_t left = *(int32_t*)pLeft;
|
||||||
|
int32_t right = *(int32_t*)pRight;
|
||||||
|
|
||||||
|
SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
|
||||||
|
if (pSup->aRowIdx[left] == -1) {
|
||||||
|
return 1;
|
||||||
|
} else if (pSup->aRowIdx[right] == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
|
||||||
|
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
|
||||||
|
|
||||||
|
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||||
|
if (pSup->tsOrder == TSDB_ORDER_DESC) {
|
||||||
|
ret = -1 * ret;
|
||||||
|
}
|
||||||
|
if (ret == 0 && pSup->pPkOrder) {
|
||||||
|
ret = tsortComparBlockCell(pSup->aBlks[left], pSup->aBlks[right], pSup->aRowIdx[left], pSup->aRowIdx[right], pSup->pPkOrder);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
|
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
|
||||||
int32_t pageId = -1;
|
int32_t pageId = -1;
|
||||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
|
@ -1358,18 +1474,27 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
int32_t numBlks = taosArrayGetSize(aBlk);
|
int32_t numBlks = taosArrayGetSize(aBlk);
|
||||||
|
|
||||||
SBlockOrderInfo* pOrigBlockOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
|
SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ?
|
||||||
SBlockOrderInfo* pHandleBlockOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
|
||||||
SBlkMergeSupport sup;
|
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
|
||||||
|
SBlkMergeSupport sup = {0};
|
||||||
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
|
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
|
||||||
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
|
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
|
||||||
sup.order = pOrigBlockOrder->order;
|
sup.tsOrder = pOrigBlockTsOrder->order;
|
||||||
|
sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*));
|
||||||
for (int i = 0; i < numBlks; ++i) {
|
for (int i = 0; i < numBlks; ++i) {
|
||||||
SSDataBlock* blk = taosArrayGetP(aBlk, i);
|
SSDataBlock* blk = taosArrayGetP(aBlk, i);
|
||||||
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockOrder->slotId);
|
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId);
|
||||||
sup.aTs[i] = (int64_t*)col->pData;
|
sup.aTs[i] = (int64_t*)col->pData;
|
||||||
sup.aRowIdx[i] = 0;
|
sup.aRowIdx[i] = 0;
|
||||||
|
sup.aBlks[i] = blk;
|
||||||
}
|
}
|
||||||
|
SBlockOrderInfo* pOrigBlockPkOrder = NULL;
|
||||||
|
if (pHandle->bSortPk) {
|
||||||
|
pOrigBlockPkOrder = (!pHandle->bSortByRowId) ?
|
||||||
|
taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
|
||||||
|
}
|
||||||
|
sup.pPkOrder = pOrigBlockPkOrder;
|
||||||
|
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
for (int i = 0; i < numBlks; ++i) {
|
for (int i = 0; i < numBlks; ++i) {
|
||||||
|
@ -1378,7 +1503,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
}
|
}
|
||||||
|
|
||||||
SMultiwayMergeTreeInfo* pTree = NULL;
|
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||||
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
|
||||||
|
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
taosMemoryFree(sup.aRowIdx);
|
taosMemoryFree(sup.aRowIdx);
|
||||||
taosMemoryFree(sup.aTs);
|
taosMemoryFree(sup.aTs);
|
||||||
|
@ -1390,8 +1516,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
int32_t nMergedRows = 0;
|
int32_t nMergedRows = 0;
|
||||||
bool mergeLimitReached = false;
|
bool mergeLimitReached = false;
|
||||||
size_t blkPgSz = pgHeaderSz;
|
size_t blkPgSz = pgHeaderSz;
|
||||||
int64_t lastPageBufTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||||
int64_t currTs = (pHandleBlockOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
|
||||||
while (nRows < totalRows) {
|
while (nRows < totalRows) {
|
||||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
|
@ -1400,7 +1526,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
|
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
|
||||||
|
|
||||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
|
||||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1418,8 +1544,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
|
|
||||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
mergeLimitReached = true;
|
mergeLimitReached = true;
|
||||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
|
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
|
||||||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
|
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
|
||||||
pHandle->currMergeLimitTs = lastPageBufTs;
|
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -1444,7 +1570,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
}
|
}
|
||||||
if (pHandle->pDataBlock->info.rows > 0) {
|
if (pHandle->pDataBlock->info.rows > 0) {
|
||||||
if (!mergeLimitReached) {
|
if (!mergeLimitReached) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
|
||||||
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
|
||||||
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1457,8 +1583,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
mergeLimitReached = true;
|
mergeLimitReached = true;
|
||||||
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_ASC) ||
|
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
|
||||||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockOrder->order == TSDB_ORDER_DESC)) {
|
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
|
||||||
pHandle->currMergeLimitTs = lastPageBufTs;
|
pHandle->currMergeLimitTs = lastPageBufTs;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1531,8 +1657,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
|
||||||
int32_t szSort = 0;
|
int32_t szSort = 0;
|
||||||
|
|
||||||
SBlockOrderInfo* pOrigOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : &pHandle->extRowsOrderInfo;
|
SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ?
|
||||||
if (pOrigOrder->order == TSDB_ORDER_ASC) {
|
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
|
||||||
|
if (pOrigTsOrder->order == TSDB_ORDER_ASC) {
|
||||||
pHandle->currMergeLimitTs = INT64_MAX;
|
pHandle->currMergeLimitTs = INT64_MAX;
|
||||||
} else {
|
} else {
|
||||||
pHandle->currMergeLimitTs = INT64_MIN;
|
pHandle->currMergeLimitTs = INT64_MIN;
|
||||||
|
@ -1554,10 +1681,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlk != NULL) {
|
if (pBlk != NULL) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
|
||||||
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
int64_t firstRowTs = *(int64_t*)tsCol->pData;
|
||||||
if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
|
||||||
(pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
(pOrigTsOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) {
|
||||||
if (bExtractedBlock) {
|
if (bExtractedBlock) {
|
||||||
blockDataDestroy(pBlk);
|
blockDataDestroy(pBlk);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue