feat: add sort/group logic for json
This commit is contained in:
parent
14f5019c6e
commit
a2ebeda89c
|
@ -186,6 +186,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
|
||||||
*(double*)p = *(double*)v;
|
*(double*)p = *(double*)v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getJsonValueLen(const char *data);
|
||||||
|
|
||||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||||
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
||||||
|
|
|
@ -99,6 +99,24 @@ void colDataTrim(SColumnInfoData* pColumnInfoData) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getJsonValueLen(const char *data) {
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
if (*data == TSDB_DATA_TYPE_NULL) {
|
||||||
|
dataLen = CHAR_BYTES;
|
||||||
|
} else if (*data == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
|
||||||
|
} else if (*data == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
dataLen = DOUBLE_BYTES + CHAR_BYTES;
|
||||||
|
} else if (*data == TSDB_DATA_TYPE_BOOL) {
|
||||||
|
dataLen = CHAR_BYTES + CHAR_BYTES;
|
||||||
|
} else if (*data == TD_TAG_JSON) { // json string
|
||||||
|
dataLen = ((STag*)(data))->len;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return dataLen;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) {
|
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) {
|
||||||
ASSERT(pColumnInfoData != NULL);
|
ASSERT(pColumnInfoData != NULL);
|
||||||
|
|
||||||
|
@ -118,19 +136,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
if (type == TSDB_DATA_TYPE_JSON) {
|
if (type == TSDB_DATA_TYPE_JSON) {
|
||||||
if (*pData == TSDB_DATA_TYPE_NULL) {
|
dataLen = getJsonValueLen(pData);
|
||||||
dataLen = CHAR_BYTES;
|
|
||||||
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
|
|
||||||
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
dataLen = DOUBLE_BYTES + CHAR_BYTES;
|
|
||||||
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
|
||||||
dataLen = CHAR_BYTES + CHAR_BYTES;
|
|
||||||
} else if (*pData == TD_TAG_JSON) { // json string
|
|
||||||
dataLen = ((STag*)(pData))->len;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}else {
|
}else {
|
||||||
dataLen = varDataTLen(pData);
|
dataLen = varDataTLen(pData);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,15 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
|
||||||
|
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t dataLen = getJsonValueLen(val);
|
||||||
|
|
||||||
|
if (memcmp(pkey->pData, val, dataLen) == 0){
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
int32_t len = varDataLen(val);
|
int32_t len = varDataLen(val);
|
||||||
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
|
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -129,7 +137,10 @@ void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock*
|
||||||
} else {
|
} else {
|
||||||
pkey->isNull = false;
|
pkey->isNull = false;
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t dataLen = getJsonValueLen(val);
|
||||||
|
memcpy(pkey->pData, val, dataLen);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
memcpy(pkey->pData, val, varDataTLen(val));
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
ASSERT(varDataTLen(val) <= pkey->bytes);
|
ASSERT(varDataTLen(val) <= pkey->bytes);
|
||||||
} else {
|
} else {
|
||||||
|
@ -153,7 +164,11 @@ int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||||
}
|
}
|
||||||
|
|
||||||
isNull[i] = 0;
|
isNull[i] = 0;
|
||||||
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t dataLen = getJsonValueLen(pkey->pData);
|
||||||
|
memcpy(pStart, (pkey->pData), dataLen);
|
||||||
|
pStart += dataLen;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
varDataCopy(pStart, pkey->pData);
|
varDataCopy(pStart, pkey->pData);
|
||||||
pStart += varDataTLen(pkey->pData);
|
pStart += varDataTLen(pkey->pData);
|
||||||
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
||||||
|
@ -178,7 +193,10 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
|
||||||
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
char* data = colDataGetData(pColInfoData, rowIndex);
|
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t dataLen = getJsonValueLen(data);
|
||||||
|
memcpy(dest, data, dataLen);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
varDataCopy(dest, data);
|
varDataCopy(dest, data);
|
||||||
} else {
|
} else {
|
||||||
memcpy(dest, data, pColInfoData->info.bytes);
|
memcpy(dest, data, pColInfoData->info.bytes);
|
||||||
|
@ -447,6 +465,16 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
if (colDataIsNull_s(pColInfoData, j)) {
|
if (colDataIsNull_s(pColInfoData, j)) {
|
||||||
offset[(*rows)] = -1;
|
offset[(*rows)] = -1;
|
||||||
contentLen = 0;
|
contentLen = 0;
|
||||||
|
} else if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
|
||||||
|
offset[*rows] = (*columnLen);
|
||||||
|
char* src = colDataGetData(pColInfoData, j);
|
||||||
|
int32_t dataLen = getJsonValueLen(src);
|
||||||
|
|
||||||
|
memcpy(data + (*columnLen), src, dataLen);
|
||||||
|
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
|
||||||
|
ASSERT(v > 0);
|
||||||
|
|
||||||
|
contentLen = dataLen;
|
||||||
} else {
|
} else {
|
||||||
offset[*rows] = (*columnLen);
|
offset[*rows] = (*columnLen);
|
||||||
char* src = colDataGetData(pColInfoData, j);
|
char* src = colDataGetData(pColInfoData, j);
|
||||||
|
|
|
@ -140,7 +140,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
dataLen = 0;
|
dataLen = 0;
|
||||||
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
||||||
dataLen = varDataTLen(pData + CHAR_BYTES);
|
dataLen = varDataTLen(pData + CHAR_BYTES);
|
||||||
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
dataLen = LONG_BYTES;
|
dataLen = LONG_BYTES;
|
||||||
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
||||||
dataLen = CHAR_BYTES;
|
dataLen = CHAR_BYTES;
|
||||||
|
@ -457,10 +457,6 @@ static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *sta
|
||||||
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t nParam = sifGetOperParamNum(node->opType);
|
int32_t nParam = sifGetOperParamNum(node->opType);
|
||||||
if (nParam <= 1) {
|
|
||||||
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
SIFParam *params = NULL;
|
SIFParam *params = NULL;
|
||||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||||
|
|
||||||
|
@ -469,14 +465,12 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
|
|
||||||
sif_func_t operFn = sifNullFunc;
|
sif_func_t operFn = sifNullFunc;
|
||||||
code = sifGetOperFn(node->opType, &operFn, &output->status);
|
code = sifGetOperFn(node->opType, &operFn, &output->status);
|
||||||
if (ctx->noExec) {
|
if (!ctx->noExec) {
|
||||||
SIF_RET(code);
|
code = operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
||||||
} else {
|
|
||||||
return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
|
||||||
}
|
}
|
||||||
_return:
|
|
||||||
taosMemoryFree(params);
|
taosMemoryFree(params);
|
||||||
SIF_RET(code);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) {
|
static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
|
|
|
@ -227,6 +227,33 @@ int32_t compareJsonContainsKey(const void* pLeft, const void* pRight) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// string > number > bool > null
|
||||||
|
// ref: https://dev.mysql.com/doc/refman/8.0/en/json.html#json-comparison
|
||||||
|
int32_t compareJsonVal(const void *pLeft, const void *pRight) {
|
||||||
|
char leftType = *(char*)pLeft;
|
||||||
|
char rightType = *(char*)pRight;
|
||||||
|
if(leftType != rightType){
|
||||||
|
return leftType > rightType ? 1 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* realDataLeft = POINTER_SHIFT(pLeft, CHAR_BYTES);
|
||||||
|
char* realDataRight = POINTER_SHIFT(pRight, CHAR_BYTES);
|
||||||
|
if(leftType == TSDB_DATA_TYPE_BOOL) {
|
||||||
|
DEFAULT_COMP(GET_INT8_VAL(realDataLeft), GET_INT8_VAL(realDataRight));
|
||||||
|
}else if(leftType == TSDB_DATA_TYPE_DOUBLE){
|
||||||
|
DEFAULT_DOUBLE_COMP(GET_DOUBLE_VAL(realDataLeft), GET_DOUBLE_VAL(realDataRight));
|
||||||
|
}else if(leftType == TSDB_DATA_TYPE_NCHAR){
|
||||||
|
return compareLenPrefixedWStr(realDataLeft, realDataRight);
|
||||||
|
}else if(leftType == TSDB_DATA_TYPE_NULL) {
|
||||||
|
return 0;
|
||||||
|
}else{
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t compareJsonValDesc(const void *pLeft, const void *pRight) {
|
||||||
|
return compareJsonVal(pRight, pLeft);
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
* Compare two strings
|
* Compare two strings
|
||||||
* TSDB_MATCH: Match
|
* TSDB_MATCH: Match
|
||||||
|
@ -601,6 +628,8 @@ __compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
|
||||||
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
|
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
|
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
|
return (order == TSDB_ORDER_ASC) ? compareJsonVal : compareJsonValDesc;
|
||||||
default:
|
default:
|
||||||
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
|
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,12 +120,12 @@ class TDTestCase:
|
||||||
tdSql.error("select * from jsons1 where jtag contains 'location'='beijing'")
|
tdSql.error("select * from jsons1 where jtag contains 'location'='beijing'")
|
||||||
#
|
#
|
||||||
# # test function error
|
# # test function error
|
||||||
# tdSql.error("select avg(jtag->'tag1') from jsons1")
|
tdSql.error("select avg(jtag->'tag1') from jsons1")
|
||||||
# tdSql.error("select avg(jtag) from jsons1")
|
tdSql.error("select avg(jtag) from jsons1")
|
||||||
# tdSql.error("select min(jtag->'tag1') from jsons1")
|
tdSql.error("select min(jtag->'tag1') from jsons1")
|
||||||
# tdSql.error("select min(jtag) from jsons1")
|
tdSql.error("select min(jtag) from jsons1")
|
||||||
# tdSql.error("select ceil(jtag->'tag1') from jsons1")
|
tdSql.error("select ceil(jtag->'tag1') from jsons1")
|
||||||
# tdSql.error("select ceil(jtag) from jsons1")
|
tdSql.error("select ceil(jtag) from jsons1")
|
||||||
#
|
#
|
||||||
# # test select normal column
|
# # test select normal column
|
||||||
tdSql.query("select dataint from jsons1")
|
tdSql.query("select dataint from jsons1")
|
||||||
|
@ -176,7 +176,6 @@ class TDTestCase:
|
||||||
tdSql.checkColNameList(res, cname_list)
|
tdSql.checkColNameList(res, cname_list)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # test where with json tag
|
# # test where with json tag
|
||||||
# tdSql.error("select * from jsons1_1 where jtag is not null")
|
# tdSql.error("select * from jsons1_1 where jtag is not null")
|
||||||
# tdSql.error("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
# tdSql.error("select * from jsons1 where jtag='{\"tag1\":11,\"tag2\":\"\"}'")
|
||||||
|
@ -313,8 +312,8 @@ class TDTestCase:
|
||||||
# tdSql.checkRows(2)
|
# tdSql.checkRows(2)
|
||||||
#
|
#
|
||||||
# # test with tbname/normal column
|
# # test with tbname/normal column
|
||||||
# tdSql.query("select * from jsons1 where tbname = 'jsons1_1'")
|
tdSql.query("select * from jsons1 where tbname = 'jsons1_1'")
|
||||||
# tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
# tdSql.query("select * from jsons1 where tbname = 'jsons1_1' and jtag contains 'tag3'")
|
# tdSql.query("select * from jsons1 where tbname = 'jsons1_1' and jtag contains 'tag3'")
|
||||||
# tdSql.checkRows(2)
|
# tdSql.checkRows(2)
|
||||||
# tdSql.query("select * from jsons1 where tbname = 'jsons1_1' and jtag contains 'tag3' and dataint=3")
|
# tdSql.query("select * from jsons1 where tbname = 'jsons1_1' and jtag contains 'tag3' and dataint=3")
|
||||||
|
@ -345,14 +344,14 @@ class TDTestCase:
|
||||||
# tdSql.checkRows(1)
|
# tdSql.checkRows(1)
|
||||||
#
|
#
|
||||||
# # test distinct
|
# # test distinct
|
||||||
# tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
tdSql.execute("insert into jsons1_14 using jsons1 tags('{\"tag1\":\"收到货\",\"tag2\":\"\",\"tag3\":null}') values(1591062628000, 2, NULL, '你就会', 'dws')")
|
||||||
# tdSql.query("select distinct jtag->'tag1' from jsons1")
|
# tdSql.query("select distinct jtag->'tag1' from jsons1")
|
||||||
# tdSql.checkRows(8)
|
# tdSql.checkRows(8)
|
||||||
# tdSql.query("select distinct jtag from jsons1")
|
# tdSql.query("select distinct jtag from jsons1")
|
||||||
# tdSql.checkRows(9)
|
# tdSql.checkRows(9)
|
||||||
#
|
#
|
||||||
# #test dumplicate key with normal colomn
|
# #test dumplicate key with normal colomn
|
||||||
# tdSql.execute("INSERT INTO jsons1_15 using jsons1 tags('{\"tbname\":\"tt\",\"databool\":true,\"datastr\":\"是是是\"}') values(1591060828000, 4, false, 'jjsf', \"你就会\")")
|
tdSql.execute("INSERT INTO jsons1_15 using jsons1 tags('{\"tbname\":\"tt\",\"databool\":true,\"datastr\":\"是是是\"}') values(1591060828000, 4, false, 'jjsf', \"你就会\")")
|
||||||
# tdSql.query("select *,tbname,jtag from jsons1 where jtag->'datastr' match '是' and datastr match 'js'")
|
# tdSql.query("select *,tbname,jtag from jsons1 where jtag->'datastr' match '是' and datastr match 'js'")
|
||||||
# tdSql.checkRows(1)
|
# tdSql.checkRows(1)
|
||||||
# tdSql.query("select tbname,jtag->'tbname' from jsons1 where jtag->'tbname'='tt' and tbname='jsons1_14'")
|
# tdSql.query("select tbname,jtag->'tbname' from jsons1 where jtag->'tbname'='tt' and tbname='jsons1_14'")
|
||||||
|
|
Loading…
Reference in New Issue