feat:add new logic for new tag format

This commit is contained in:
wangmm0220 2022-05-31 17:49:33 +08:00
parent 5b6f9bdb9c
commit 816cea07f9
14 changed files with 587 additions and 542 deletions

View File

@ -59,14 +59,11 @@ void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData); int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STagVal
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
bool isJson);
// STag // STag
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag); void tTagFree(STag *pTag);
bool tTagGet(const STag *pTag, STagVal *pTagVal); bool tTagGet(const STag *pTag, STagVal *pTagVal);
char* tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray); int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
@ -130,16 +127,7 @@ struct STagVal {
}; };
int8_t type; int8_t type;
union { union {
int8_t i8;
uint8_t u8;
int16_t i16;
uint16_t u16;
int32_t i32;
uint32_t u32;
int64_t i64; int64_t i64;
uint64_t u64;
float f;
double d;
struct { struct {
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
@ -147,24 +135,9 @@ struct STagVal {
}; };
}; };
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
bool isJson) {
STagVal tagVal = {0};
if (isJson) {
tagVal.pKey = (char *)key;
} else {
tagVal.cid = *(int16_t *)key;
}
tagVal.type = type;
tagVal.pData = pData;
tagVal.nData = nData;
taosArrayPush(pTagArray, &tagVal);
}
#pragma pack(push, 1) #pragma pack(push, 1)
#define TD_TAG_JSON ((int8_t)0x1) #define TD_TAG_JSON ((int8_t)0x80) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x2) #define TD_TAG_LARGE ((int8_t)0x40)
struct STag { struct STag {
int8_t flags; int8_t flags;
int16_t len; int16_t len;
@ -424,14 +397,3 @@ int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToM
#endif /*_TD_COMMON_DATA_FORMAT_H_*/ #endif /*_TD_COMMON_DATA_FORMAT_H_*/
// SKVRowBuilder;
// int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
// void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
// void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
// SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
// static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen)
// #ifdef JSON_TAG_REFACTOR
// TODO: JSON_TAG_TODO

View File

@ -287,7 +287,7 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0) #define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL))) #define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) #define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)

View File

@ -958,30 +958,11 @@ static char* parseTagDatatoJson(void* p) {
char tagJsonKey[256] = {0}; char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) { for (int j = 0; j < nCols; ++j) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// JSON_TAG_REFACTOR
pTagVal->nData;
pTagVal->pData; // for varChar, len not included
// TODO: adapt below code;
char* val = (char*)pTagVal->pData;
// TODO: adapt below code;
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
goto end;
}
continue;
}
// json key encode by binary // json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey)); memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val)); memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value // json value
val += varDataTLen(val); char type = pTagVal->type;
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if (type == TSDB_DATA_TYPE_NULL) { if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull(); cJSON* value = cJSON_CreateNull();
if (value == NULL) { if (value == NULL) {
@ -990,11 +971,11 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_NCHAR) { } else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL; cJSON* value = NULL;
if (varDataLen(realData) > 0) { if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1); char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue); int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) { if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val); tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData);
taosMemoryFree(tagJsonValue); taosMemoryFree(tagJsonValue);
goto end; goto end;
} }
@ -1003,7 +984,7 @@ static char* parseTagDatatoJson(void* p) {
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
} else if (varDataLen(realData) == 0) { } else if (pTagVal->nData == 0) {
value = cJSON_CreateString(""); value = cJSON_CreateString("");
} else { } else {
ASSERT(0); ASSERT(0);
@ -1011,22 +992,14 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData); double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd); cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
} }
cJSON_AddItemToObject(json, tagJsonKey, value); cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData); char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd); cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) { if (value == NULL) {
goto end; goto end;
@ -1091,7 +1064,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) { if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) { } else if (jsonInnerType == TD_TAG_JSON) {
char* jsonString = parseTagDatatoJson(jsonInnerData); char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString); taosMemoryFree(jsonString);
@ -1110,10 +1083,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
double jsonVd = *(double*)(jsonInnerData); double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd); sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) { } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false"); sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));

View File

@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type; int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData); int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) { if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) { } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = ((STag*)(pData + CHAR_BYTES))->len; dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*pData == TD_TAG_JSON) { // json string
dataLen = ((STag*)(pData))->len;
} else { } else {
ASSERT(0); ASSERT(0);
} }
dataLen += CHAR_BYTES; }else {
dataLen = varDataTLen(pData);
} }
SVarColAttr* pAttr = &pColumnInfoData->varmeta; SVarColAttr* pAttr = &pColumnInfoData->varmeta;

View File

@ -123,7 +123,7 @@ int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal
ASSERT(iCol != 0); ASSERT(iCol != 0);
ASSERT(pTColumn->colId != 0); ASSERT(pTColumn->colId != 0);
ASSERT(pRow->flags & 0xf != 0); ASSERT((pRow->flags & 0xf) != 0);
switch (pRow->flags & 0xf) { switch (pRow->flags & 0xf) {
case TSROW_HAS_NONE: case TSROW_HAS_NONE:
*pColVal = ColValNONE; *pColVal = ColValNONE;
@ -432,7 +432,6 @@ static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
} }
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
int32_t nDataTP, nDataKV; int32_t nDataTP, nDataKV;
uint32_t flags;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf; STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
int32_t nCols = pBuilder->pTSchema->numOfCols; int32_t nCols = pBuilder->pTSchema->numOfCols;
@ -446,7 +445,7 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
pBuilder->row.flags |= TSROW_HAS_NONE; pBuilder->row.flags |= TSROW_HAS_NONE;
} }
ASSERT(pBuilder->row.flags & 0xf != 0); ASSERT((pBuilder->row.flags & 0xf) != 0);
*(ppRow) = &pBuilder->row; *(ppRow) = &pBuilder->row;
switch (pBuilder->row.flags & 0xf) { switch (pBuilder->row.flags & 0xf) {
case TSROW_HAS_NONE: case TSROW_HAS_NONE:
@ -476,7 +475,7 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
if (nDataKV < nDataTP) { if (nDataKV < nDataTP) {
// generate KV row // generate KV row
ASSERT(pBuilder->row.flags & 0xf != TSROW_HAS_VAL); ASSERT((pBuilder->row.flags & 0xf) != TSROW_HAS_VAL);
pBuilder->row.flags |= TSROW_KV_ROW; pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.nData = nDataKV; pBuilder->row.nData = nDataKV;
@ -492,12 +491,12 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
pBuilder->row.nData = nDataTP; pBuilder->row.nData = nDataTP;
uint8_t *p; uint8_t *p;
uint8_t flags = pBuilder->row.flags & 0xf; uint8_t flags = (pBuilder->row.flags & 0xf);
if (flags == TSROW_HAS_VAL) { if (flags == TSROW_HAS_VAL) {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2; pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
} else { } else {
if (flags == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) { if (flags == (TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE)) {
pBuilder->row.pData = pBuilder->pTPBuf; pBuilder->row.pData = pBuilder->pTPBuf;
} else { } else {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1; pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
@ -620,7 +619,11 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
} }
printf("%s:%d loop[%d-%d] offset=%d\n", __func__, __LINE__, (int32_t)pTag->nTag, (int32_t)n, (int32_t)offset); printf("%s:%d loop[%d-%d] offset=%d\n", __func__, __LINE__, (int32_t)pTag->nTag, (int32_t)n, (int32_t)offset);
tGetTagVal(p + offset, &tagVal, isJson); tGetTagVal(p + offset, &tagVal, isJson);
debugPrintTagVal(tagVal.type, tagVal.pData, tagVal.nData, __func__, __LINE__); if(IS_VAR_DATA_TYPE(tagVal.type)){
debugPrintTagVal(tagVal.type, tagVal.pData, tagVal.nData, __func__, __LINE__);
}else{
debugPrintTagVal(tagVal.type, &tagVal.i64, tDataTypes[tagVal.type].bytes, __func__, __LINE__);
}
} }
printf("\n"); printf("\n");
} }
@ -643,44 +646,8 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData); n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
} else { } else {
p = p ? p + n : p; p = p ? p + n : p;
switch (pTagVal->type) { n += tDataTypes[pTagVal->type].bytes;
case TSDB_DATA_TYPE_BOOL: if(p) memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes);
n += tPutI8(p, pTagVal->i8 ? 1 : 0);
break;
case TSDB_DATA_TYPE_TINYINT:
n += tPutI8(p, pTagVal->i8);
break;
case TSDB_DATA_TYPE_SMALLINT:
n += tPutI16(p, pTagVal->i16);
break;
case TSDB_DATA_TYPE_INT:
n += tPutI32(p, pTagVal->i32);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
n += tPutI64(p, pTagVal->i64);
break;
case TSDB_DATA_TYPE_FLOAT:
n += tPutFloat(p, pTagVal->f);
break;
case TSDB_DATA_TYPE_DOUBLE:
n += tPutDouble(p, pTagVal->d);
break;
case TSDB_DATA_TYPE_UTINYINT:
n += tPutU8(p, pTagVal->u8);
break;
case TSDB_DATA_TYPE_USMALLINT:
n += tPutU16(p, pTagVal->u16);
break;
case TSDB_DATA_TYPE_UINT:
n += tPutU32(p, pTagVal->u32);
break;
case TSDB_DATA_TYPE_UBIGINT:
n += tPutU64(p, pTagVal->u64);
break;
default:
ASSERT(0);
}
} }
return n; return n;
@ -702,42 +669,8 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
if (IS_VAR_DATA_TYPE(pTagVal->type)) { if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData); n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
} else { } else {
switch (pTagVal->type) { n += tDataTypes[pTagVal->type].bytes;
case TSDB_DATA_TYPE_BOOL: memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes);
case TSDB_DATA_TYPE_TINYINT:
n += tGetI8(p + n, &pTagVal->i8);
break;
case TSDB_DATA_TYPE_SMALLINT:
n += tGetI16(p, &pTagVal->i16);
break;
case TSDB_DATA_TYPE_INT:
n += tGetI32(p, &pTagVal->i32);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
n += tGetI64(p, &pTagVal->i64);
break;
case TSDB_DATA_TYPE_FLOAT:
n += tGetFloat(p, &pTagVal->f);
break;
case TSDB_DATA_TYPE_DOUBLE:
n += tGetDouble(p, &pTagVal->d);
break;
case TSDB_DATA_TYPE_UTINYINT:
n += tGetU8(p, &pTagVal->u8);
break;
case TSDB_DATA_TYPE_USMALLINT:
n += tGetU16(p, &pTagVal->u16);
break;
case TSDB_DATA_TYPE_UINT:
n += tGetU32(p, &pTagVal->u32);
break;
case TSDB_DATA_TYPE_UBIGINT:
n += tGetU64(p, &pTagVal->u64);
break;
default:
ASSERT(0);
}
} }
return n; return n;
@ -814,6 +747,26 @@ void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag); if (pTag) taosMemoryFree(pTag);
} }
char *tTagValToData(const STagVal *value, bool isJson){
if(!value) return NULL;
char *data = NULL;
int8_t typeBytes = 0;
if (isJson) {
typeBytes = CHAR_BYTES;
}
if(IS_VAR_DATA_TYPE(value->type)){
data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData);
if(data == NULL) return NULL;
if(isJson) *data = value->type;
varDataLen(data + typeBytes) = value->nData;
memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
}else{
data = ((char*)&(value->i64)) - typeBytes; // json with type
}
return data;
}
bool tTagGet(const STag *pTag, STagVal *pTagVal) { bool tTagGet(const STag *pTag, STagVal *pTagVal) {
int16_t lidx = 0; int16_t lidx = 0;
int16_t ridx = pTag->nTag - 1; int16_t ridx = pTag->nTag - 1;

View File

@ -78,7 +78,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *tagVal);
#if 1 // refact APIs below (TODO) #if 1 // refact APIs below (TODO)
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;

View File

@ -564,9 +564,17 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
#endif #endif
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) { const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
ASSERT(pEntry->type == TSDB_CHILD_TABLE); ASSERT(pEntry->type == TSDB_CHILD_TABLE);
STagVal tagVal = {.cid = cid}; STag *tag = (STag *)pEntry->ctbEntry.pTags;
tTagGet((const STag *)pEntry->ctbEntry.pTags, &tagVal); tTagGet(tag, val);
return tagVal.pData;
if(val->type == TSDB_DATA_TYPE_NULL){
return NULL;
}
if (type == TSDB_DATA_TYPE_JSON){
return tag;
}else{
return val;
}
} }

View File

@ -573,15 +573,20 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
for (int32_t i = 0; i < pTagSchema->nCols; i++) { for (int32_t i = 0; i < pTagSchema->nCols; i++) {
SSchema *pCol = &pTagSchema->pSchema[i]; SSchema *pCol = &pTagSchema->pSchema[i];
if (iCol == i) { if (iCol == i) {
tTagValPush(pTagArray, &pCol->colId, pCol->type, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal, false); STagVal val = {0};
val.type = pCol->type;
val.cid = pCol->colId;
if (IS_VAR_DATA_TYPE(pCol->type)) {
val.pData = pAlterTbReq->pTagVal;
val.nData = pAlterTbReq->nTagVal;
}else{
memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}
taosArrayPush(pTagArray, &val);
} else { } else {
STagVal tagVal = {.cid = pCol->colId}; STagVal val = {0};
if (tTagGet(pOldTag, &tagVal) && tagVal.pData) { if (tTagGet(pOldTag, &val)) {
if (IS_VAR_DATA_TYPE(pCol->type)) { taosArrayPush(pTagArray, &val);
tTagValPush(pTagArray, &pCol->colId, pCol->type, varDataVal(tagVal.pData), varDataLen(tagVal.pData), false);
} else {
tTagValPush(pTagArray, &pCol->colId, pCol->type, tagVal.pData, pCol->bytes, false);
}
} }
} }
} }

View File

@ -303,29 +303,22 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = NULL; STagVal tagVal = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { tagVal.cid = pExpr->base.pParam[0].pCol->colId;
const STag* tmp = (const STag*)mr.me.ctbEntry.pTags; const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
char* data = taosMemoryCalloc(tmp->len + 1, 1); char *data = NULL;
if (data == NULL) { if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
metaReaderClear(&mr); data = tTagValToData((const STagVal *)p, false);
qError("doTagScan calloc error:%d", tmp->len + 1); }else {
return; data = (char*)p;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data + 1, tmp, tmp->len);
p = data;
} else {
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL)); colDataAppend(pColInfoData, i, data, (data == NULL));
} }
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data){
taosMemoryFree((void*)p); taosMemoryFree(data);
} }
} }
} }
@ -1632,22 +1625,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name); STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false); colDataAppend(pDst, count, str, false);
} else { // it is a tag value } else { // it is a tag value
if (pDst->info.type == TSDB_DATA_TYPE_JSON) { STagVal val = {0};
const STag* tmp = (const STag*)mr.me.ctbEntry.pTags; val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
// TODO opt perf by realloc memory const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);
char* data = taosMemoryCalloc(tmp->len + 1, 1);
if (data == NULL) {
qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), tmp->len + 1);
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
*data = TSDB_DATA_TYPE_JSON; char *data = NULL;
memcpy(data + 1, tmp, tmp->len); if(pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
colDataAppend(pDst, count, data, false); data = tTagValToData((const STagVal *)p, false);
}else {
data = (char*)p;
}
colDataAppend(pDst, count, data, (data == NULL));
if(pDst->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal *)p)->type) && data){
taosMemoryFree(data); taosMemoryFree(data);
} else {
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
} }
} }
} }

View File

@ -55,9 +55,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
int32_t getNumOfTags(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta);
STableComInfo getTableInfo(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta);
STableMeta* tableMetaDup(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta);
#ifdef JSON_TAG_REFACTOR int32_t parseJsontoTagData(const char* json, SArray* pTagVals, SMsgBuf* errMsg);
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId);
#endif
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);

View File

@ -54,7 +54,6 @@ typedef struct SInsertParseContext {
SMsgBuf msg; // input SMsgBuf msg; // input
STableMeta* pTableMeta; // each table STableMeta* pTableMeta; // each table
SParsedDataColInfo tags; // each table SParsedDataColInfo tags; // each table
SArray* pTagVals; // each table
SVCreateTbReq createTblReq; // each table SVCreateTbReq createTblReq; // each table
SHashObj* pVgroupsHashObj; // global SHashObj* pVgroupsHashObj; // global
SHashObj* pTableBlockHashObj; // global SHashObj* pTableBlockHashObj; // global
@ -425,7 +424,7 @@ static int parseTime(char** end, SToken* pToken, int16_t timePrec, int64_t* time
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) { static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT &&
@ -471,7 +470,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
uint64_t uv; uint64_t uv;
char* endptr = NULL; char* endptr = NULL;
int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf); int32_t code = checkAndTrimValue(pToken, tmpTokenBuf, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -620,14 +619,12 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
return func(pMsgBuf, pToken->z, pToken->n, param); return func(pMsgBuf, pToken->z, pToken->n, param);
} }
#ifdef JSON_TAG_REFACTOR
case TSDB_DATA_TYPE_JSON: { case TSDB_DATA_TYPE_JSON: {
if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
} }
return func(pMsgBuf, pToken->z, pToken->n, param); return func(pMsgBuf, pToken->z, pToken->n, param);
} }
#endif
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal; int64_t tmpVal;
if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
@ -752,108 +749,287 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) { static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
SKvParam* pa = (SKvParam*)param;
int8_t type = pa->schema->type;
int16_t colId = pa->schema->colId;
#ifdef JSON_TAG_REFACTOR
if (TSDB_DATA_TYPE_JSON == type) {
return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
}
#endif
if (value == NULL) { // it is a null data
// tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
// pa->colIdx);
return TSDB_CODE_SUCCESS;
}
if (TSDB_DATA_TYPE_BINARY == type) {
memcpy(pa->buf + pa->pos, value, len);
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), len, false);
pa->pos += len;
} else if (TSDB_DATA_TYPE_NCHAR == type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
ASSERT((pa->pos + pa->schema->bytes - VARSTR_HEADER_SIZE) <= TSDB_MAX_TAGS_LEN);
int32_t output = 0;
if (!taosMbsToUcs4(value, len, (TdUcs4*)(pa->buf + pa->pos), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (errno == E2BIG) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
return buildSyntaxErrMsg(pMsgBuf, buf, value);
}
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), output, false);
pa->pos += output;
} else {
memcpy(pa->buf + pa->pos, value, TYPE_BYTES[type]);
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), TYPE_BYTES[type], false);
pa->pos + TYPE_BYTES[type];
}
ASSERT(pa->pos <= TSDB_MAX_TAGS_LEN);
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
pTbReq->type = TD_CHILD_TABLE; pTbReq->type = TD_CHILD_TABLE;
pTbReq->name = strdup(tname); pTbReq->name = strdup(tname);
pTbReq->ctb.suid = suid; pTbReq->ctb.suid = suid;
pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.pTag = (uint8_t*)pTag;
return TSDB_CODE_SUCCESS; return;
}
static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema,
int16_t timePrec, char* tmpTokenBuf, STagVal *val, SMsgBuf* pMsgBuf) {
int64_t iv;
uint64_t uv;
char* endptr = NULL;
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
}
return TSDB_CODE_SUCCESS;
}
val->cid = pSchema->colId;
val->type = pSchema->bytes;
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
*(int8_t*)(&val->i64) = TRUE_VALUE;
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
*(int8_t*)(&val->i64) = FALSE_VALUE;
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_NK_INTEGER) {
*(int8_t*)(&val->i64) = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
} else if (pToken->type == TK_NK_FLOAT) {
*(int8_t*)(&val->i64) = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
}
*(int8_t*)(&val->i64) = iv;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(uv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
}
*(uint8_t*)(&val->i64) = uv;
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
}
*(int16_t*)(&val->i64) = iv;
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(uv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
}
*(uint16_t*)(&val->i64) = uv;
break;
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
}
*(int32_t*)(&val->i64) = iv;
break;
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(uv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
}
*(uint32_t*)(&val->i64) = uv;
break;
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
}
val->i64 = iv;
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT(uv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
}
*(uint64_t*)(&val->i64) = uv;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
double dv;
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
*(float*)(&val->i64) = dv;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv;
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
*(double*)(&val->i64) = dv;
break;
}
case TSDB_DATA_TYPE_BINARY: {
// Too long values will raise the invalid sql error message
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
val->pData = pToken->z;
val->nData = pToken->n;
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t output = 0;
void *p = taosMemoryCalloc(1, pToken->n * TSDB_NCHAR_SIZE);
if(p == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)(p), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (errno == E2BIG) {
taosMemoryFree(p);
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
taosMemoryFree(p);
return buildSyntaxErrMsg(pMsgBuf, buf, pToken->z);
}
val->pData = p;
val->nData = output;
break;
}
case TSDB_DATA_TYPE_JSON: {
if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
}
//return func(pMsgBuf, pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
if (parseTime(end, pToken, timePrec, &iv, pMsgBuf) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
}
val->i64 = iv;
break;
}
}
return TSDB_CODE_FAILED;
} }
// pSql -> tag1_value, ...) // pSql -> tag1_value, ...)
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) { static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
ASSERT(!pCxt->pTagVals); int32_t code = TSDB_CODE_SUCCESS;
if (!(pCxt->pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)))) { SArray *pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal));
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SKvParam param = {.pTagVals = pCxt->pTagVals, .pos = 0};
SToken sToken; SToken sToken;
bool isParseBindParam = false; bool isParseBindParam = false;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
// TODO: JSON_TAG_REFACTOR => here would have json tag?
for (int i = 0; i < pCxt->tags.numOfBound; ++i) { for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
if (sToken.type == TK_NK_QUESTION) { if (sToken.type == TK_NK_QUESTION) {
isParseBindParam = true; isParseBindParam = true;
if (NULL == pCxt->pStmtCb) { if (NULL == pCxt->pStmtCb) {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z); code = buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
goto end;
} }
continue; continue;
} }
if (isParseBindParam) { if (isParseBindParam) {
return buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values"); code = buildInvalidOperationMsg(&pCxt->msg, "no mix usage for ? and tag values");
goto end;
} }
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
param.schema = pTagSchema; char *tmpTokenBuf = taosMemoryCalloc(1, sToken.n); // this can be optimize with parse column
CHECK_CODE( code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg);
parseValueToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg)); if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(tmpTokenBuf);
goto end;
}
if(pTagSchema->type == TSDB_DATA_TYPE_JSON){
if (sToken.n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", sToken.z);
taosMemoryFree(tmpTokenBuf);
goto end;
}
code = parseJsontoTagData(sToken.z, pTagVals, &pCxt->msg);
taosMemoryFree(tmpTokenBuf);
if(code != TSDB_CODE_SUCCESS){
goto end;
}
}else{
STagVal val = {0};
code = parseTagToken(&pCxt->pSql, &sToken, pTagSchema, precision, tmpTokenBuf, &val, &pCxt->msg);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(tmpTokenBuf);
goto end;
}
if (pTagSchema->type != TSDB_DATA_TYPE_BINARY){
taosMemoryFree(tmpTokenBuf);
}
taosArrayPush(pTagVals, &val);
}
} }
if (isParseBindParam) { if (isParseBindParam) {
return TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto end;
} }
// TODO: JSON_TAG_REFACTOR (would be JSON tag or normal tag)
STag* pTag = NULL; STag* pTag = NULL;
if (tTagNew(param.pTagVals, 1, false, &pTag) != 0) { code = tTagNew(pTagVals, 1, false, &pTag);
return buildInvalidOperationMsg(&pCxt->msg, "out of memory"); if (code != TSDB_CODE_SUCCESS) {
goto end;
} }
return buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid); buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);
end:
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagVals, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagVals);
return code;
} }
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
@ -1072,7 +1248,6 @@ void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
taosMemoryFreeClear(pCxt->pTableMeta); taosMemoryFreeClear(pCxt->pTableMeta);
destroyBoundColumnInfo(&pCxt->tags); destroyBoundColumnInfo(&pCxt->tags);
taosArrayDestroy(pCxt->pTagVals);
destroyCreateSubTbReq(&pCxt->createTblReq); destroyCreateSubTbReq(&pCxt->createTblReq);
} }
@ -1348,41 +1523,75 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
return buildInvalidOperationMsg(&pBuf, "out of memory"); return buildInvalidOperationMsg(&pBuf, "out of memory");
} }
int32_t code = TSDB_CODE_SUCCESS;
SSchema* pSchema = pDataBlock->pTableMeta->schema; SSchema* pSchema = pDataBlock->pTableMeta->schema;
SKvParam param = {.pTagVals = pTagArray, .pos = 0};
for (int c = 0; c < tags->numOfBound; ++c) { for (int c = 0; c < tags->numOfBound; ++c) {
if (bind[c].is_null && bind[c].is_null[0]) { if (bind[c].is_null && bind[c].is_null[0]) {
KvRowAppend(&pBuf, NULL, 0, &param);
continue; continue;
} }
SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
param.schema = pTagSchema;
int32_t colLen = pTagSchema->bytes; int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) { if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0]; colLen = bind[c].length[0];
} }
CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, &param)); STagVal val = {0};
if(pTagSchema->type == TSDB_DATA_TYPE_BINARY){
val.pData = (uint8_t*)bind[c].buffer;
val.nData = colLen;
}else if(pTagSchema->type == TSDB_DATA_TYPE_NCHAR){
int32_t output = 0;
void *p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
if(p == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (errno == E2BIG) {
taosMemoryFree(p);
code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
goto end;
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
taosMemoryFree(p);
code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
goto end;
}
val.pData = p;
val.nData = output;
}else{
memcpy(&val.i64, bind[c].buffer, colLen);
}
taosArrayPush(pTagArray, &val);
} }
STag* pTag = NULL; STag* pTag = NULL;
// TODO: JSON_TAG_REFACTOR (if is json or not)? // TODO: stmt support json
if (0 != tTagNew(pTagArray, 1, false, &pTag)) { if (0 != tTagNew(pTagArray, 1, false, &pTag)) {
return buildInvalidOperationMsg(&pBuf, "out of memory"); code = buildInvalidOperationMsg(&pBuf, "out of memory");
goto end;
} }
SVCreateTbReq tbReq = {0}; SVCreateTbReq tbReq = {0};
CHECK_CODE(buildCreateTbReq(&tbReq, tName, pTag, suid)); buildCreateTbReq(&tbReq, tName, pTag, suid);
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq)); code = buildCreateTbMsg(pDataBlock, &tbReq);
destroyCreateSubTbReq(&tbReq); destroyCreateSubTbReq(&tbReq);
end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(p->type == TSDB_DATA_TYPE_NCHAR){
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
@ -1714,25 +1923,52 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SKvParam param = {.pTagVals = pTagArray, .pos = 0}; int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < tags->numOfBound; ++i) { for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->boundColumns[i]]; SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
param.schema = pTagSchema;
SSmlKv* kv = taosArrayGetP(cols, i); SSmlKv* kv = taosArrayGetP(cols, i);
if (IS_VAR_DATA_TYPE(kv->type)) {
KvRowAppend(msg, kv->value, kv->length, &param); STagVal val = {0};
} else { if(pTagSchema->type == TSDB_DATA_TYPE_BINARY){
KvRowAppend(msg, &(kv->value), kv->length, &param); val.pData = (uint8_t *)kv->value;
val.nData = kv->length;
}else if(pTagSchema->type == TSDB_DATA_TYPE_NCHAR){
int32_t output = 0;
void *p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
if(p == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
if (errno == E2BIG) {
taosMemoryFree(p);
code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
goto end;
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
taosMemoryFree(p);
code = buildSyntaxErrMsg(msg, buf, kv->value);
goto end;
}
val.pData = p;
val.nData = output;
}else{
memcpy(&val.i64, &(kv->value), kv->length);
}
taosArrayPush(pTagArray, &val);
}
code = tTagNew(pTagArray, 1, false, ppTag);
end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(p->type == TSDB_DATA_TYPE_NCHAR){
taosMemoryFree(p->pData);
} }
} }
if (tTagNew(pTagArray, 1, false, ppTag) != 0) {
taosArrayDestroy(pTagArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,

View File

@ -4144,26 +4144,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
} }
} }
// static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
// SKVRowBuilder* pBuilder) {
#ifdef JSON_TAG_REFACTOR
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
}
return parseJsontoTagData(pVal->literal, pBuilder, &pCxt->msgBuf, pSchema->colId);
}
#endif
// if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
// tdAddColToKVRow(pBuilder, pSchema->colId, nodesGetValueFromNode(pVal),
// IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
// }
// return TSDB_CODE_SUCCESS;
// }
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) { static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
int32_t code = getFuncInfo(pCxt, pFunc); int32_t code = getFuncInfo(pCxt, pFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -4199,16 +4179,14 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
} }
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal)); SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
char* pTagBuf = taosMemoryCalloc(1, TSDB_MAX_TAGS_LEN); if (!pTagArray) {
if (!pTagArray || !pTagBuf) {
taosArrayDestroy(pTagArray);
taosMemoryFreeClear(pTagBuf);
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
} }
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
int16_t nTags = 0, nBufPos = 0; int16_t nTags = 0, nBufPos = 0;
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta); SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
SNode * pTag, *pNode; SNode * pTag = NULL, *pNode = NULL;
bool isJson = false;
FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) { FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) {
SColumnNode* pCol = (SColumnNode*)pTag; SColumnNode* pCol = (SColumnNode*)pTag;
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
@ -4219,57 +4197,56 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
} }
} }
if (NULL == pSchema) { if (NULL == pSchema) {
taosArrayDestroy(pTagArray); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
taosMemoryFreeClear(pTagBuf); goto end;
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
} }
SValueNode* pVal = NULL; SValueNode* pVal = NULL;
code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal); code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_LIST2_NODE(pVal);
}
}
#ifdef JSON_TAG_REFACTOR
if (TSDB_CODE_SUCCESS == code) {
code = addValToKVRow(pCxt, pVal, pSchema, pBuilder);
}
#endif
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
// TODO: JSON_TAG_TODO: is copy is a must?
void* nodeVal = nodesGetValueFromNode(pVal);
if (IS_VAR_DATA_TYPE(pSchema->type)) {
memcpy(pTagBuf + nBufPos, varDataVal(nodeVal), varDataLen(nodeVal));
tTagValPush(pTagArray, &pSchema->colId, pSchema->type, (uint8_t*)pTagBuf + nBufPos, varDataLen(nodeVal), false);
nBufPos += varDataLen(pVal->datum.p);
} else {
memcpy(pTagBuf + nBufPos, varDataVal(nodeVal), TYPE_BYTES[pSchema->type]);
tTagValPush(pTagArray, &pSchema->colId, pSchema->type, (uint8_t*)pTagBuf + nBufPos, TYPE_BYTES[pSchema->type],
false);
nBufPos += TYPE_BYTES[pSchema->type];
}
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pTagArray); goto end;
taosMemoryFreeClear(pTagBuf); }
return code;
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_LIST2_NODE(pVal);
}
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
goto end;
}
isJson = true;
code = parseJsontoTagData(pVal->literal, pTagArray, &pCxt->msgBuf);
if(code != TSDB_CODE_SUCCESS){
goto end;
}
}else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
void* nodeVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
val.pData = varDataVal(nodeVal);
val.nData = varDataLen(nodeVal);
} else {
memcpy(&val.i64, nodeVal, pTagSchema->bytes);
}
taosArrayPush(pTagArray, &val);
} }
} }
// TODO: JSON_TAG_TODO: version
code = tTagNew(pTagArray, 1, false, ppTag); code = tTagNew(pTagArray, 1, false, ppTag);
if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pTagArray);
taosMemoryFreeClear(pTagBuf);
return code;
}
end:
if(isJson){
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
}
}
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
taosMemoryFreeClear(pTagBuf);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4281,71 +4258,64 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
SSchema* pTagSchemas = getTableTagSchema(pSuperTableMeta); SSchema* pTagSchemas = getTableTagSchema(pSuperTableMeta);
SNode* pNode; SNode* pNode;
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0; int32_t index = 0;
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal)); SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
char* pTagBuf = taosMemoryCalloc(1, TSDB_MAX_TAGS_LEN); if (!pTagArray) {
const char* qTagBuf = pTagBuf;
if (!pTagArray || !pTagBuf) {
taosArrayDestroy(pTagArray);
taosMemoryFreeClear(qTagBuf);
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
} }
bool isJson = false;
FOREACH(pNode, pStmt->pValsOfTags) { FOREACH(pNode, pStmt->pValsOfTags) {
SValueNode* pVal = NULL; SValueNode* pVal = NULL;
SSchema* pTagSchema = pTagSchemas + index; SSchema* pTagSchema = pTagSchemas + index;
code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema, pNode, &pVal); code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema, pNode, &pVal);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_NODE(pVal);
}
}
#ifdef JSON_TAG_REFACTOR
if (TSDB_CODE_SUCCESS == code) {
code = addValToKVRow(pCxt, pVal, pTagSchema + index++, pBuilder);
}
#endif
if (TSDB_CODE_SUCCESS == code) {
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
char* tmpVal = nodesGetValueFromNode(pVal);
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
memcpy(pTagBuf, varDataVal(tmpVal), varDataLen(tmpVal));
tTagValPush(pTagArray, &pTagSchema->colId, pTagSchema->type, (uint8_t*)pTagBuf, varDataLen(tmpVal), false);
pTagBuf += varDataLen(tmpVal);
} else {
memcpy(pTagBuf, tmpVal, TYPE_BYTES[pTagSchema->type]);
tTagValPush(pTagArray, &pTagSchema->colId, pTagSchema->type, (uint8_t*)pTagBuf, TYPE_BYTES[pTagSchema->type],
false);
pTagBuf += TYPE_BYTES[pTagSchema->type];
}
}
++index;
}
// TODO: buf is need to store the tags
// TODO: JSON_TAG_TODO remove below codes if code is 0 all the time.
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pTagArray); goto end;
taosMemoryFreeClear(qTagBuf);
return generateSyntaxErrMsg(&pCxt->msgBuf, code);
} }
if (NULL == pVal) {
pVal = (SValueNode*)pNode;
} else {
REPLACE_NODE(pVal);
}
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
code = buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
goto end;
}
isJson = true;
code = parseJsontoTagData(pVal->literal, pTagArray, &pCxt->msgBuf);
if(code != TSDB_CODE_SUCCESS){
goto end;
}
}else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
char* tmpVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
val.pData = varDataVal(tmpVal);
val.nData = varDataLen(tmpVal);
} else {
memcpy(&val.i64, tmpVal, pTagSchema->bytes);
}
taosArrayPush(pTagArray, &val);
}
++index;
} }
// TODO: JSON_TAG_TODO: version? code = tTagNew(pTagArray, 1, false, ppTag);
// TODO: JSON_TAG_REFACTOR: json or not
if (0 != (code = tTagNew(pTagArray, 1, false, ppTag))) { end:
taosArrayDestroy(pTagArray); if(isJson){
taosMemoryFreeClear(qTagBuf); for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
return generateSyntaxErrMsg(&pCxt->msgBuf, code); STagVal *p = (STagVal *)taosArrayGet(pTagArray, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
}
} }
taosArrayDestroy(pTagArray); taosArrayDestroy(pTagArray);
taosMemoryFreeClear(qTagBuf); return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt) { static int32_t checkCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt) {
@ -4602,39 +4572,42 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) { if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) {
#ifdef JSON_TAG_REFACTOR
SKVRowBuilder kvRowBuilder = {0};
int32_t code = tdInitKVRowBuilder(&kvRowBuilder);
if (TSDB_CODE_SUCCESS != code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pStmt->pVal->literal && if (pStmt->pVal->literal &&
strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal); return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal);
} }
SArray *pTagVals = taosArrayInit(1, sizeof(STagVal));
code = parseJsontoTagData(pStmt->pVal->literal, &kvRowBuilder, &pCxt->msgBuf, pSchema->colId); int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS != code) { STag* pTag = NULL;
do{
code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pCxt->msgBuf);
if (TSDB_CODE_SUCCESS != code) {
break;
}
code = tTagNew(pTagVals, 1, false, &pTag);
}while(0);
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal *p = (STagVal *)taosArrayGet(pTagVals, i);
if(IS_VAR_DATA_TYPE(p->type)){
taosMemoryFree(p->pData);
}
}
taosArrayDestroy(pTagVals);
if (code != TSDB_CODE_SUCCESS){
return code; return code;
} }
pReq->nTagVal = pTag->len;
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); pReq->pTagVal = (uint8_t *)pTag;
if (NULL == row) { pStmt->pVal->datum.p = (char*)pTag; // for free
tdDestroyKVRowBuilder(&kvRowBuilder);
return TSDB_CODE_OUT_OF_MEMORY;
}
pReq->nTagVal = kvRowLen(row);
pReq->pTagVal = row;
pStmt->pVal->datum.p = row; // for free
tdDestroyKVRowBuilder(&kvRowBuilder);
#endif
} else { } else {
pReq->nTagVal = pStmt->pVal->node.resType.bytes; pReq->nTagVal = pStmt->pVal->node.resType.bytes;
if (TSDB_DATA_TYPE_NCHAR == pStmt->pVal->node.resType.type) {
pReq->nTagVal = pReq->nTagVal * TSDB_NCHAR_SIZE;
}
pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal); pReq->pTagVal = nodesGetValueFromNode(pStmt->pVal);
// data and length are seperated for new tag format STagVal
if (IS_VAR_DATA_TYPE(pStmt->pVal->node.resType.type)) {
pReq->nTagVal = varDataLen(pReq->pTagVal);
pReq->pTagVal = varDataVal(pReq->pTagVal);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -328,13 +328,9 @@ static bool isValidateTag(char* input) {
return true; return true;
} }
#ifdef JSON_TAG_REFACTOR int32_t parseJsontoTagData(const char* json, SArray* pTagVals, SMsgBuf* pMsgBuf) {
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* pMsgBuf, int16_t startColId) {
// set json NULL data // set json NULL data
uint8_t jsonNULL = TSDB_DATA_TYPE_NULL;
int32_t jsonIndex = startColId + 1;
if (!json || strtrim((char*)json) == 0 || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) { if (!json || strtrim((char*)json) == 0 || strcasecmp(json, TSDB_DATA_NULL_STR_L) == 0) {
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -349,13 +345,12 @@ int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBu
return buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json); return buildSyntaxErrMsg(pMsgBuf, "json error invalide value", json);
} }
int32_t retCode = 0; int32_t retCode = TSDB_CODE_SUCCESS;
char* tagKV = NULL;
SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); SHashObj* keyHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
cJSON* item = cJSON_GetArrayItem(root, i); cJSON* item = cJSON_GetArrayItem(root, i);
if (!item) { if (!item) {
qError("json inner error:%d", i); uError("json inner error:%d", i);
retCode = buildSyntaxErrMsg(pMsgBuf, "json inner error", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json inner error", json);
goto end; goto end;
} }
@ -367,89 +362,60 @@ int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBu
} }
size_t keyLen = strlen(jsonKey); size_t keyLen = strlen(jsonKey);
if (keyLen > TSDB_MAX_JSON_KEY_LEN) { if (keyLen > TSDB_MAX_JSON_KEY_LEN) {
qError("json key too long error"); uError("json key too long error");
retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey); retCode = buildSyntaxErrMsg(pMsgBuf, "json key too long, more than 256", jsonKey);
goto end; goto end;
} }
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) { if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue; continue;
} }
// key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: DOUBLE_BYTES STagVal val = {0};
tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES, 1); val.pKey = jsonKey;
if (!tagKV) { taosHashPut(keyHash, jsonKey, keyLen, &keyLen, CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
strncpy(varDataVal(tagKV), jsonKey, keyLen);
varDataSetLen(tagKV, keyLen);
if (taosHashGetSize(keyHash) == 0) {
uint8_t jsonNotNULL = TSDB_DATA_TYPE_JSON;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, &jsonNotNULL, CHAR_BYTES); // add json type
}
taosHashPut(keyHash, jsonKey, keyLen, &keyLen,
CHAR_BYTES); // add key to hash to remove dumplicate, value is useless
if (item->type == cJSON_String) { // add json value format: type|data if (item->type == cJSON_String) { // add json value format: type|data
char* jsonValue = item->valuestring; char* jsonValue = item->valuestring;
int32_t valLen = (int32_t)strlen(jsonValue); int32_t valLen = (int32_t)strlen(jsonValue);
int32_t totalLen = keyLen + VARSTR_HEADER_SIZE + valLen * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE + CHAR_BYTES; char* tmp = taosMemoryCalloc(1, valLen * TSDB_NCHAR_SIZE);
char* tmp = taosMemoryRealloc(tagKV, totalLen);
if (!tmp) { if (!tmp) {
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY; retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end; goto end;
} }
tagKV = tmp; val.type = TSDB_DATA_TYPE_NCHAR;
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); if (valLen > 0 && !taosMbsToUcs4(jsonValue, valLen, (TdUcs4*)tmp,
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
*valueType = TSDB_DATA_TYPE_NCHAR;
if (valLen > 0 && !taosMbsToUcs4(jsonValue, valLen, (TdUcs4*)varDataVal(valueData),
(int32_t)(valLen * TSDB_NCHAR_SIZE), &valLen)) { (int32_t)(valLen * TSDB_NCHAR_SIZE), &valLen)) {
qError("charset:%s to %s. val:%s, errno:%s, convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, jsonValue, uError("charset:%s to %s. val:%s, errno:%s, convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, jsonValue,
strerror(errno)); strerror(errno));
retCode = buildSyntaxErrMsg(pMsgBuf, "charset convert json error", jsonValue); retCode = buildSyntaxErrMsg(pMsgBuf, "charset convert json error", jsonValue);
goto end; goto end;
} }
val.nData = valLen;
varDataSetLen(valueData, valLen); val.pData = tmp;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, totalLen);
} else if (item->type == cJSON_Number) { } else if (item->type == cJSON_Number) {
if (!isfinite(item->valuedouble)) { if (!isfinite(item->valuedouble)) {
qError("json value is invalidate"); uError("json value is invalidate");
retCode = buildSyntaxErrMsg(pMsgBuf, "json value number is illegal", json); retCode = buildSyntaxErrMsg(pMsgBuf, "json value number is illegal", json);
goto end; goto end;
} }
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_DOUBLE;
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); *((double*)&(val.i64)) = item->valuedouble;
*valueType = TSDB_DATA_TYPE_DOUBLE;
*((double*)valueData) = item->valuedouble;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES);
} else if (item->type == cJSON_True || item->type == cJSON_False) { } else if (item->type == cJSON_True || item->type == cJSON_False) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_BOOL;
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); *((char*)&(val.i64)) = (char)(item->valueint);
*valueType = TSDB_DATA_TYPE_BOOL;
*valueData = (char)(item->valueint);
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + CHAR_BYTES);
} else if (item->type == cJSON_NULL) { } else if (item->type == cJSON_NULL) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); val.type = TSDB_DATA_TYPE_NULL;
*valueType = TSDB_DATA_TYPE_NULL;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
} else { } else {
retCode = buildSyntaxErrMsg(pMsgBuf, "invalidate json value", json); retCode = buildSyntaxErrMsg(pMsgBuf, "invalidate json value", json);
goto end; goto end;
} }
} taosArrayPush(pTagVals, &val);
if (taosHashGetSize(keyHash) == 0) { // set json NULL true
tdAddColToKVRow(kvRowBuilder, jsonIndex, &jsonNULL, CHAR_BYTES);
} }
end: end:
taosMemoryFree(tagKV);
taosHashCleanup(keyHash); taosHashCleanup(keyHash);
cJSON_Delete(root); cJSON_Delete(root);
return retCode; return retCode;
} }
#endif
static int32_t buildTableReq(SHashObj* pTablesHash, SArray** pTables) { static int32_t buildTableReq(SHashObj* pTablesHash, SArray** pTables) {
if (NULL != pTablesHash) { if (NULL != pTablesHash) {

View File

@ -922,29 +922,12 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
} }
} }
char *getJsonValue(char *json, char *key) { // todo STagVal *getJsonValue(char *json, STagVal *tagVal) {
json++; // jump type bool find = tTagGet(((const STag *)json), tagVal); // json value is null and not exist is different
if(!find){
STagVal tagVal = {.pKey = key}; return NULL;
tTagGet(((const STag *)json), &tagVal);
return (char *)tagVal.pData;
#if 0
int16_t cols = kvRowNCols(json);
for (int i = 0; i < cols; ++i) {
SColIdx *pColIdx = kvRowColIdxAt(json, i);
char *data = kvRowColVal(json, pColIdx);
if(i == 0){
if(*data == TSDB_DATA_TYPE_NULL) {
return NULL;
}
continue;
}
if(memcmp(key, data, varDataTLen(data)) == 0){
return data + varDataTLen(data);
}
} }
return NULL; return tagVal;
#endif
} }
void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
@ -963,13 +946,13 @@ void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
continue; continue;
} }
char *pLeftData = colDataGetVarData(pLeft->columnData, i); char *pLeftData = colDataGetVarData(pLeft->columnData, i);
char *value = getJsonValue(pLeftData, pRightData); STagVal val = {.pKey = pRightData};
if (!value) { STagVal *value = getJsonValue(pLeftData, &val);
colDataSetNull_var(pOutputCol, i); char *data = tTagValToData(value, true);
pOutputCol->hasNull = true; colDataAppend(pOutputCol, i, data, data == NULL);
continue; if(value && IS_VAR_DATA_TYPE(value->type) && data){
taosMemoryFree(data)
} }
colDataAppend(pOutputCol, i, value, false);
} }
} }