[td-14493] support distinct.
This commit is contained in:
parent
7553344d9a
commit
50f0ab05ff
|
@ -110,11 +110,11 @@ typedef struct SFileBlockInfo {
|
||||||
#define FUNCTION_COV 38
|
#define FUNCTION_COV 38
|
||||||
|
|
||||||
typedef struct SResultRowEntryInfo {
|
typedef struct SResultRowEntryInfo {
|
||||||
int8_t hasResult:6; // result generated, not NULL value
|
// int8_t hasResult:6; // result generated, not NULL value
|
||||||
bool initialized:1; // output buffer has been initialized
|
bool initialized:1; // output buffer has been initialized
|
||||||
bool complete:1; // query has completed
|
bool complete:1; // query has completed
|
||||||
uint8_t isNullRes:1; // the result is null
|
uint8_t isNullRes:6; // the result is null
|
||||||
uint8_t numOfRes:7; // num of output result in current buffer
|
uint8_t numOfRes; // num of output result in current buffer
|
||||||
} SResultRowEntryInfo;
|
} SResultRowEntryInfo;
|
||||||
|
|
||||||
// determine the real data need to calculated the result
|
// determine the real data need to calculated the result
|
||||||
|
@ -157,7 +157,6 @@ typedef struct SResultDataInfo {
|
||||||
|
|
||||||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||||
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
|
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo)))
|
||||||
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
|
||||||
|
|
||||||
typedef struct SInputColumnInfoData {
|
typedef struct SInputColumnInfoData {
|
||||||
int32_t totalRows; // total rows in current columnar data
|
int32_t totalRows; // total rows in current columnar data
|
||||||
|
|
|
@ -1008,7 +1008,6 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData*
|
||||||
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
|
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
|
||||||
pCtx[k].sfp.process(&tw, 1, &out);
|
pCtx[k].sfp.process(&tw, 1, &out);
|
||||||
pEntryInfo->numOfRes = 1;
|
pEntryInfo->numOfRes = 1;
|
||||||
pEntryInfo->hasResult = ',';
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3281,6 +3280,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFilterInfo* filter = NULL;
|
SFilterInfo* filter = NULL;
|
||||||
|
|
||||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||||
|
|
||||||
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock};
|
||||||
|
@ -3292,6 +3292,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
SSDataBlock* px = createOneDataBlock(pBlock);
|
SSDataBlock* px = createOneDataBlock(pBlock);
|
||||||
blockDataEnsureCapacity(px, pBlock->info.rows);
|
blockDataEnsureCapacity(px, pBlock->info.rows);
|
||||||
|
|
||||||
|
// todo extract method
|
||||||
int32_t numOfRow = 0;
|
int32_t numOfRow = 0;
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||||
|
@ -3303,7 +3304,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, j)) {
|
||||||
|
colDataAppendNULL(pDst, numOfRow);
|
||||||
|
} else {
|
||||||
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
||||||
|
}
|
||||||
numOfRow += 1;
|
numOfRow += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3521,7 +3526,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResI
|
||||||
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
|
|
||||||
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0);
|
colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
|
|
|
@ -106,7 +106,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void recordGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
|
||||||
SColumnDataAgg* pColAgg = NULL;
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
@ -131,7 +131,7 @@ static void recordGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) {
|
||||||
ASSERT(pKey != NULL);
|
ASSERT(pKey != NULL);
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||||
|
|
||||||
|
@ -170,11 +170,12 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
|
||||||
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
char* data = colDataGetData(pColInfoData, rowIndex);
|
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
// set result exists, todo refactor
|
|
||||||
memcpy(dest, data, pColInfoData->info.bytes);
|
memcpy(dest, data, pColInfoData->info.bytes);
|
||||||
pEntryInfo->hasResult = DATA_SET_FLAG;
|
} else { // it is a NULL value
|
||||||
pEntryInfo->numOfRes = 1;
|
pEntryInfo->isNullRes = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pEntryInfo->numOfRes = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -197,7 +198,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
|
||||||
if (!pInfo->isInit) {
|
if (!pInfo->isInit) {
|
||||||
recordGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
pInfo->isInit = true;
|
pInfo->isInit = true;
|
||||||
num++;
|
num++;
|
||||||
continue;
|
continue;
|
||||||
|
@ -209,13 +210,14 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The first row of a new block does not belongs to the previous existed group
|
||||||
if (!equal && j == 0) {
|
if (!equal && j == 0) {
|
||||||
num++;
|
num++;
|
||||||
recordGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||||
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
@ -226,12 +228,12 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
// assign the group keys or user input constant values if required
|
// assign the group keys or user input constant values if required
|
||||||
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex);
|
||||||
recordGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols);
|
||||||
num = 1;
|
num = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
/*int32_t ret = */ generatedHashKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
/*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals);
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
||||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
||||||
|
@ -294,8 +296,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity,
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||||
|
|
|
@ -25,7 +25,6 @@
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
(_info)->numOfRes = (res); \
|
(_info)->numOfRes = (res); \
|
||||||
(_info)->hasResult = DATA_SET_FLAG; \
|
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
typedef struct SSumRes {
|
typedef struct SSumRes {
|
||||||
|
@ -715,7 +714,6 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pResInfo, notNullElems, 1);
|
SET_VAL(pResInfo, notNullElems, 1);
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO set the correct parameter.
|
// TODO set the correct parameter.
|
||||||
|
@ -775,9 +773,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
|
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
pResInfo->complete = true;
|
pResInfo->complete = true;
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -815,8 +811,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||||
|
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
pResInfo->complete = true; // set query completed on this column
|
pResInfo->complete = true; // set query completed on this column
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
break;
|
break;
|
||||||
|
@ -830,10 +824,8 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||||
|
|
||||||
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {
|
if (pResInfo->numOfRes == 0 || (*(TSKEY*)buf) < ts) {
|
||||||
pResInfo->hasResult = DATA_SET_FLAG;
|
|
||||||
memcpy(buf, data, pCtx->inputBytes);
|
memcpy(buf, data, pCtx->inputBytes);
|
||||||
|
|
||||||
*(TSKEY*)buf = ts;
|
*(TSKEY*)buf = ts;
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,7 +208,7 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock
|
||||||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
|
||||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
|
||||||
if (!pResInfo->hasResult) {
|
if (pResInfo->numOfRes == 0) {
|
||||||
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
|
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
|
||||||
colDataAppend(pCol, j, NULL, true); // TODO add set null data api
|
colDataAppend(pCol, j, NULL, true); // TODO add set null data api
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue