[td-13039] support add group by keys.
This commit is contained in:
parent
21aaecd016
commit
311e65daa7
|
@ -1403,14 +1403,6 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bytes = pColInfo->info.bytes;
|
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
|
||||||
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
|
||||||
} else {
|
|
||||||
pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
|
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
|
||||||
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
|
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
|
||||||
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
||||||
|
|
|
@ -1715,10 +1715,31 @@ static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assign the group keys or user input constant values if required
|
||||||
|
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
if (pCtx[i].functionId == -1) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
|
||||||
|
if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
|
||||||
|
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
|
// set result exists, todo refactor
|
||||||
|
memcpy(dest, data, pColInfoData->info.bytes);
|
||||||
|
pEntryInfo->hasResult = DATA_SET_FLAG;
|
||||||
|
pEntryInfo->numOfRes = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
|
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
|
||||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||||
SGroupbyOperatorInfo *pInfo = pOperator->info;
|
SGroupbyOperatorInfo *pInfo = pOperator->info;
|
||||||
|
|
||||||
|
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||||
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||||
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
|
//qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
|
||||||
|
@ -1751,7 +1772,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
int32_t rowIndex = j - num;
|
||||||
|
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
|
// assign the group keys or user input constant values if required
|
||||||
|
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||||
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
keepGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
num = 1;
|
num = 1;
|
||||||
}
|
}
|
||||||
|
@ -1764,7 +1789,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
int32_t rowIndex = pBlock->info.rows - num;
|
||||||
|
doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC);
|
||||||
|
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue