support child table having query
This commit is contained in:
parent
22882196c3
commit
a5b4ae6945
|
@ -1235,7 +1235,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
|
||||
bool doFilterFieldData(SQueryInfo* pQueryInfo, char *input, tFilePage* pOutput, SExprFilter* pFieldFilters, int16_t type, bool* notSkipped) {
|
||||
bool doFilterFieldData(char *input, SExprFilter* pFieldFilters, int16_t type, bool* notSkipped) {
|
||||
bool qualified = false;
|
||||
|
||||
for(int32_t k = 0; k < pFieldFilters->pFilters->numOfFilters; ++k) {
|
||||
|
@ -1293,7 +1293,7 @@ int32_t doHavingFilter(SQueryInfo* pQueryInfo, tFilePage* pOutput, bool* notSkip
|
|||
|
||||
char* pInput = pOutput->data + pOutput->num* pFieldFilters->pSqlExpr->offset;
|
||||
|
||||
doFilterFieldData(pQueryInfo, pInput, pOutput, pFieldFilters, type, notSkipped);
|
||||
doFilterFieldData(pInput, pFieldFilters, type, notSkipped);
|
||||
if (*notSkipped == false) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -189,6 +189,8 @@ typedef struct SQuery {
|
|||
bool stabledev; // super table stddev query
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
|
||||
int32_t havingNum; // having expr number
|
||||
|
||||
SOrderVal order;
|
||||
int16_t numOfCols;
|
||||
int16_t numOfTags;
|
||||
|
@ -284,6 +286,7 @@ enum OPERATOR_TYPE_E {
|
|||
OP_Fill = 13,
|
||||
OP_MultiTableAggregate = 14,
|
||||
OP_MultiTableTimeInterval = 15,
|
||||
OP_Having = 16,
|
||||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
|
@ -401,6 +404,11 @@ typedef struct SOffsetOperatorInfo {
|
|||
int64_t offset;
|
||||
} SOffsetOperatorInfo;
|
||||
|
||||
typedef struct SHavingOperatorInfo {
|
||||
SArray* fp;
|
||||
} SHavingOperatorInfo;
|
||||
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
SFillInfo *pFillInfo;
|
||||
SSDataBlock *pRes;
|
||||
|
|
|
@ -181,6 +181,7 @@ static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntime
|
|||
static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
static SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
static SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
||||
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
||||
|
@ -1819,6 +1820,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
if (pQuery->havingNum > 0) {
|
||||
pRuntimeEnv->proot = createHavingOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
|
||||
}
|
||||
|
||||
if (pQuery->limit.offset > 0) {
|
||||
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
|
||||
}
|
||||
|
@ -4653,6 +4658,111 @@ static SSDataBlock* doOffset(void* param) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem, __filter_func_t fp) {
|
||||
char* input = p->pData + p->info.bytes * rid;
|
||||
bool isnull = isNull(input, p->info.type);
|
||||
if (isnull) {
|
||||
return (fp == isNullOperator) ? true : false;
|
||||
} else {
|
||||
if (fp == notNullOperator) {
|
||||
return true;
|
||||
} else if (fp == isNullOperator) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (fp(filterElem, input, input, p->info.type)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
void doHavingImpl(SOperatorInfo *pOperator, SSDataBlock *pBlock) {
|
||||
SHavingOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t f = 0;
|
||||
int32_t allQualified = 1;
|
||||
int32_t exprQualified = 0;
|
||||
|
||||
for (int32_t r = 0; r < pBlock->info.rows; ++r) {
|
||||
allQualified = 1;
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
SExprInfo* pExprInfo = &(pOperator->pExpr[i]);
|
||||
if (pExprInfo->pFilter == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SArray* es = taosArrayGetP(pInfo->fp, i);
|
||||
assert(es);
|
||||
|
||||
size_t fpNum = taosArrayGetSize(es);
|
||||
|
||||
exprQualified = 0;
|
||||
for (int32_t m = 0; m < fpNum; ++m) {
|
||||
__filter_func_t fp = taosArrayGetP(es, m);
|
||||
|
||||
assert(fp);
|
||||
|
||||
//SColIndex* colIdx = &pExprInfo->base.colInfo;
|
||||
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
SColumnFilterElem filterElem = {.filterInfo = *pExprInfo->pFilter};
|
||||
|
||||
if (doFilterData(p, r, &filterElem, fp)) {
|
||||
exprQualified = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (exprQualified == 0) {
|
||||
allQualified = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allQualified == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
memmove(pColInfoData->pData + f * bytes, pColInfoData->pData + bytes * r, bytes);
|
||||
}
|
||||
|
||||
++f;
|
||||
}
|
||||
|
||||
pBlock->info.rows = f;
|
||||
}
|
||||
|
||||
static SSDataBlock* doHaving(void* param) {
|
||||
SOperatorInfo *pOperator = (SOperatorInfo *)param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
while (1) {
|
||||
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
doHavingImpl(pOperator, pBlock);
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static SSDataBlock* doIntervalAgg(void* param) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -5003,6 +5113,13 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
}
|
||||
|
||||
static void destroyHavingOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SHavingOperatorInfo* pInfo = (SHavingOperatorInfo*) param;
|
||||
if (pInfo->fp) {
|
||||
taosArrayDestroy(pInfo->fp);
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
|
||||
|
||||
|
@ -5059,6 +5176,81 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
int32_t initFilterFp(SExprInfo* pExpr, int32_t numOfOutput, SArray** fps) {
|
||||
__filter_func_t fp = NULL;
|
||||
|
||||
*fps = taosArrayInit(numOfOutput, sizeof(SArray*));
|
||||
if (*fps == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SExprInfo* pExprInfo = &(pExpr[i]);
|
||||
SColIndex* colIdx = &pExprInfo->base.colInfo;
|
||||
|
||||
if (pExprInfo->pFilter == NULL || !TSDB_COL_IS_NORMAL_COL(colIdx->flag)) {
|
||||
taosArrayPush(*fps, &fp);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t filterNum = pExprInfo->base.filterNum;
|
||||
SColumnFilterInfo *filterInfo = pExprInfo->pFilter;
|
||||
|
||||
SArray* es = taosArrayInit(filterNum, sizeof(__filter_func_t));
|
||||
|
||||
for (int32_t j = 0; j < filterNum; ++j) {
|
||||
int32_t lower = filterInfo->lowerRelOptr;
|
||||
int32_t upper = filterInfo->upperRelOptr;
|
||||
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
|
||||
qError("invalid rel optr");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
__filter_func_t ffp = getFilterOperator(lower, upper);
|
||||
if (ffp == NULL) {
|
||||
qError("invalid filter info");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayPush(es, &ffp);
|
||||
|
||||
filterInfo += 1;
|
||||
}
|
||||
|
||||
taosArrayPush(*fps, &es);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SHavingOperatorInfo* pInfo = calloc(1, sizeof(SHavingOperatorInfo));
|
||||
|
||||
initFilterFp(pExpr, numOfOutput, &pInfo->fp);
|
||||
|
||||
assert(pInfo->fp);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
pOperator->name = "HavingOperator";
|
||||
pOperator->operatorType = OP_Having;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->upstream = upstream;
|
||||
pOperator->exec = doHaving;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->cleanup = destroyHavingOperatorInfo;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
|
||||
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
|
||||
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
|
||||
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
|
||||
|
@ -5856,8 +6048,8 @@ int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int
|
|||
memcpy(*dst, src, sizeof(*src) * filterNum);
|
||||
|
||||
for (int32_t i = 0; i < filterNum; i++) {
|
||||
if (dst[i]->filterstr && dst[i]->len > 0) {
|
||||
void *pz = calloc(1, dst[i]->len + 1);
|
||||
if ((*dst)[i].filterstr && dst[i]->len > 0) {
|
||||
void *pz = calloc(1, (*dst)[i].len + 1);
|
||||
|
||||
if (pz == NULL) {
|
||||
if (i == 0) {
|
||||
|
@ -5871,7 +6063,7 @@ int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int
|
|||
|
||||
memcpy(pz, (void *)src->pz, src->len + 1);
|
||||
|
||||
dst[i]->pz = (int64_t)pz;
|
||||
(*dst)[i].pz = (int64_t)pz;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6288,6 +6480,10 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
|
|||
if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) {
|
||||
pQuery->tagLen += pExprs[col].bytes;
|
||||
}
|
||||
|
||||
if (pExprs[col].pFilter) {
|
||||
++pQuery->havingNum;
|
||||
}
|
||||
}
|
||||
|
||||
doUpdateExprColumnIndex(pQuery);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue