Merge branch 'feat/tag_refact' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
aa6da0b5ea
|
@ -59,13 +59,18 @@ void tTSRowBuilderReset(STSRowBuilder *pBuilder);
|
|||
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
|
||||
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
|
||||
|
||||
// STagVal
|
||||
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
|
||||
bool isJson);
|
||||
|
||||
// STag
|
||||
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
|
||||
void tTagFree(STag *pTag);
|
||||
void tTagGet(STag *pTag, STagVal *pTagVal);
|
||||
bool tTagGet(const STag *pTag, STagVal *pTagVal);
|
||||
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||
int32_t tTagToValArray(STag *pTag, SArray **ppArray);
|
||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln);
|
||||
|
||||
// STRUCT =================
|
||||
struct STColumn {
|
||||
|
@ -123,11 +128,52 @@ struct STagVal {
|
|||
int16_t cid;
|
||||
char *pKey;
|
||||
};
|
||||
int8_t type;
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
int8_t type;
|
||||
union {
|
||||
int8_t i8;
|
||||
uint8_t u8;
|
||||
int16_t i16;
|
||||
uint16_t u16;
|
||||
int32_t i32;
|
||||
uint32_t u32;
|
||||
int64_t i64;
|
||||
uint64_t u64;
|
||||
float f;
|
||||
double d;
|
||||
struct {
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
|
||||
bool isJson) {
|
||||
STagVal tagVal = {0};
|
||||
if (isJson) {
|
||||
tagVal.pKey = (char *)key;
|
||||
} else {
|
||||
tagVal.cid = *(int16_t *)key;
|
||||
}
|
||||
|
||||
tagVal.type = type;
|
||||
tagVal.pData = pData;
|
||||
tagVal.nData = nData;
|
||||
taosArrayPush(pTagArray, &tagVal);
|
||||
}
|
||||
|
||||
#pragma pack(push, 1)
|
||||
#define TD_TAG_JSON ((int8_t)0x1)
|
||||
#define TD_TAG_LARGE ((int8_t)0x2)
|
||||
struct STag {
|
||||
int8_t flags;
|
||||
int16_t len;
|
||||
int16_t nTag;
|
||||
int32_t ver;
|
||||
int8_t idx[];
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
#if 1 //================================================================================================================================================
|
||||
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
|
||||
#define TD_SUPPORT_BITMAP
|
||||
|
@ -370,109 +416,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols);
|
|||
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update,
|
||||
TDRowVerT maxVer);
|
||||
|
||||
// ----------------- K-V data row structure
|
||||
/* |<-------------------------------------- len -------------------------------------------->|
|
||||
* |<----- header ----->|<--------------------------- body -------------------------------->|
|
||||
* +----------+----------+---------------------------------+---------------------------------+
|
||||
* | uint16_t | int16_t | | |
|
||||
* +----------+----------+---------------------------------+---------------------------------+
|
||||
* | len | ncols | cols index | data part |
|
||||
* +----------+----------+---------------------------------+---------------------------------+
|
||||
*/
|
||||
typedef void *SKVRow;
|
||||
|
||||
typedef struct {
|
||||
int16_t colId;
|
||||
uint16_t offset;
|
||||
} SColIdx;
|
||||
|
||||
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
|
||||
|
||||
#define kvRowLen(r) (*(uint16_t *)(r))
|
||||
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
|
||||
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
|
||||
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
|
||||
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
|
||||
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
|
||||
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
|
||||
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
|
||||
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
|
||||
#define kvRowFree(r) taosMemoryFreeClear(r)
|
||||
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
|
||||
#define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r))
|
||||
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
|
||||
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
|
||||
#define kvRowKeys(r) POINTER_SHIFT(r, *(uint16_t *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(int16_t)))
|
||||
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
|
||||
|
||||
SKVRow tdKVRowDup(SKVRow row);
|
||||
int32_t tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
|
||||
int32_t tdEncodeKVRow(void **buf, SKVRow row);
|
||||
void *tdDecodeKVRow(void *buf, SKVRow *row);
|
||||
void tdSortKVRowByColIdx(SKVRow row);
|
||||
|
||||
static FORCE_INLINE int32_t comparTagId(const void *key1, const void *key2) {
|
||||
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
|
||||
return 1;
|
||||
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
|
||||
return -1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *tdGetKVRowValOfCol(const SKVRow row, int16_t colId) {
|
||||
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
|
||||
if (ret == NULL) return NULL;
|
||||
return kvRowColVal(row, (SColIdx *)ret);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
|
||||
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
|
||||
}
|
||||
|
||||
// ----------------- K-V data row builder
|
||||
typedef struct {
|
||||
int16_t tCols;
|
||||
int16_t nCols;
|
||||
SColIdx *pColIdx;
|
||||
uint16_t alloc;
|
||||
uint16_t size;
|
||||
void *buf;
|
||||
} SKVRowBuilder;
|
||||
|
||||
int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
|
||||
|
||||
static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen) {
|
||||
if (pBuilder->nCols >= pBuilder->tCols) {
|
||||
pBuilder->tCols *= 2;
|
||||
SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
|
||||
if (pColIdx == NULL) return -1;
|
||||
pBuilder->pColIdx = pColIdx;
|
||||
}
|
||||
|
||||
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
|
||||
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
|
||||
|
||||
pBuilder->nCols++;
|
||||
|
||||
if (tlen > pBuilder->alloc - pBuilder->size) {
|
||||
while (tlen > pBuilder->alloc - pBuilder->size) {
|
||||
pBuilder->alloc *= 2;
|
||||
}
|
||||
void *buf = taosMemoryRealloc(pBuilder->buf, pBuilder->alloc);
|
||||
if (buf == NULL) return -1;
|
||||
pBuilder->buf = buf;
|
||||
}
|
||||
|
||||
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
|
||||
pBuilder->size += tlen;
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -480,3 +423,15 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co
|
|||
#endif
|
||||
|
||||
#endif /*_TD_COMMON_DATA_FORMAT_H_*/
|
||||
|
||||
// SKVRowBuilder;
|
||||
|
||||
// int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
// void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
// void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
|
||||
// SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
|
||||
|
||||
// static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen)
|
||||
|
||||
// #ifdef JSON_TAG_REFACTOR
|
||||
// TODO: JSON_TAG_TODO
|
||||
|
|
|
@ -1747,6 +1747,15 @@ typedef struct SVCreateTbReq {
|
|||
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
|
||||
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
|
||||
|
||||
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
|
||||
taosMemoryFreeClear(req->name);
|
||||
if (req->type == TSDB_CHILD_TABLE) {
|
||||
taosMemoryFreeClear(req->ctb.pTag);
|
||||
} else if (req->type == TSDB_NORMAL_TABLE) {
|
||||
taosMemoryFreeClear(req->ntb.schemaRow.pSchema);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int32_t nReqs;
|
||||
union {
|
||||
|
|
|
@ -530,6 +530,24 @@ static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
|
|||
return sizeof(int64_t);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tPutFloat(uint8_t* p, float f) {
|
||||
union {
|
||||
uint32_t ui;
|
||||
float f;
|
||||
} v = {.f = f};
|
||||
|
||||
return tPutU32(p, v.ui);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tPutDouble(uint8_t* p, double d) {
|
||||
union {
|
||||
uint64_t ui;
|
||||
double d;
|
||||
} v = {.d = d};
|
||||
|
||||
return tPutU64(p, v.ui);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
|
||||
|
||||
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
|
||||
|
@ -619,6 +637,34 @@ static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tGetFloat(uint8_t* p, float* f) {
|
||||
int32_t n = 0;
|
||||
|
||||
union {
|
||||
uint32_t ui;
|
||||
float f;
|
||||
} v;
|
||||
|
||||
n = tGetU32(p, &v.ui);
|
||||
|
||||
*f = v.f;
|
||||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tGetDouble(uint8_t* p, double* d) {
|
||||
int32_t n = 0;
|
||||
|
||||
union {
|
||||
uint64_t ui;
|
||||
double d;
|
||||
} v;
|
||||
|
||||
n = tGetU64(p, &v.ui);
|
||||
|
||||
*d = v.d;
|
||||
return n;
|
||||
}
|
||||
|
||||
// =====================
|
||||
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
|
||||
int n = 0;
|
||||
|
|
|
@ -983,11 +983,23 @@ static char* parseTagDatatoJson(void* p) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
int16_t nCols = kvRowNCols(p);
|
||||
SArray* pTagVals = NULL;
|
||||
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
int16_t nCols = taosArrayGetSize(pTagVals);
|
||||
char tagJsonKey[256] = {0};
|
||||
for (int j = 0; j < nCols; ++j) {
|
||||
SColIdx* pColIdx = kvRowColIdxAt(p, j);
|
||||
char* val = (char*)(kvRowColVal(p, pColIdx));
|
||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
||||
|
||||
// JSON_TAG_REFACTOR
|
||||
pTagVal->nData;
|
||||
pTagVal->pData; // for varChar, len not included
|
||||
// TODO: adapt below code;
|
||||
char* val = (char*)pTagVal->pData;
|
||||
// TODO: adapt below code;
|
||||
|
||||
if (j == 0) {
|
||||
if (*val == TSDB_DATA_TYPE_NULL) {
|
||||
string = taosMemoryCalloc(1, 8);
|
||||
|
|
|
@ -127,7 +127,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
|||
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
||||
dataLen = CHAR_BYTES;
|
||||
} else if (*pData == TSDB_DATA_TYPE_JSON) {
|
||||
dataLen = kvRowLen(pData + CHAR_BYTES);
|
||||
dataLen = ((STag*)(pData + CHAR_BYTES))->len;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -1634,6 +1634,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if(!tagArray) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// cal size
|
||||
int32_t cap = sizeof(SSubmitReq);
|
||||
|
@ -1655,18 +1660,33 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
ASSERT(0);
|
||||
|
||||
|
||||
STagVal tagVal = {.cid = 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.pData = (uint8_t*)&pDataBlock->info.groupId,
|
||||
.nData = sizeof(uint64_t)};
|
||||
STag* pTag = NULL;
|
||||
taosArrayClear(tagArray);
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (!pTag) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return NULL;
|
||||
}
|
||||
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
|
||||
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) return NULL;
|
||||
taosMemoryFree(cname);
|
||||
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
|
||||
if (code < 0) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
|
||||
|
@ -1709,22 +1729,42 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
ASSERT(0);
|
||||
STagVal tagVal = {.cid = 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.pData = (uint8_t*)&pDataBlock->info.groupId,
|
||||
.nData = sizeof(uint64_t)};
|
||||
taosArrayClear(tagArray);
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (!pTag) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
taosMemoryFreeClear(ret);
|
||||
return NULL;
|
||||
}
|
||||
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
|
||||
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) return NULL;
|
||||
if (code < 0) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
taosMemoryFreeClear(ret);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, blockData, schemaLen);
|
||||
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
|
||||
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
||||
tEncoderClear(&encoder);
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
|
||||
if (code < 0) {
|
||||
taosArrayDestroy(tagArray);
|
||||
taosMemoryFreeClear(ret);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
blkHead->schemaLen = htonl(schemaLen);
|
||||
|
||||
|
@ -1759,5 +1799,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
}
|
||||
|
||||
ret->length = htonl(ret->length);
|
||||
taosArrayDestroy(tagArray);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
#include "tdatablock.h"
|
||||
#include "tlog.h"
|
||||
|
||||
static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson);
|
||||
|
||||
typedef struct SKVIdx {
|
||||
int32_t cid;
|
||||
int32_t offset;
|
||||
|
@ -31,18 +33,6 @@ typedef struct {
|
|||
} STSKVRow;
|
||||
#pragma pack(pop)
|
||||
|
||||
#pragma pack(push, 1)
|
||||
#define TD_TAG_JSON ((int8_t)0x1)
|
||||
#define TD_TAG_LARGE ((int8_t)0x2)
|
||||
struct STag {
|
||||
int8_t flags;
|
||||
int16_t len;
|
||||
int16_t nTag;
|
||||
int32_t ver;
|
||||
int8_t idx[];
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
||||
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
|
||||
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
|
||||
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
|
||||
|
@ -532,6 +522,109 @@ static int tTagValCmprFn(const void *p1, const void *p2) {
|
|||
static int tTagValJsonCmprFn(const void *p1, const void *p2) {
|
||||
return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey);
|
||||
}
|
||||
|
||||
static void debugPrintTagVal(int8_t type, const void *val, int32_t vlen, const char *tag, int32_t ln) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
char tmpVal[32] = {0};
|
||||
memcpy(tmpVal, val, 32);
|
||||
printf("%s:%d type:%d vlen:%d, val:\"%s\"\n", tag, ln, (int32_t)type, vlen, tmpVal);
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%f\n", tag, ln, (int32_t)type, vlen, *(float *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
printf("%s:%d type:%d vlen:%d, val:%lf\n", tag, ln, (int32_t)type, vlen, *(double *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIu8 "\n", tag, ln, (int32_t)type, vlen, *(uint8_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIi8 "\n", tag, ln, (int32_t)type, vlen, *(int8_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIi16 "\n", tag, ln, (int32_t)type, vlen, *(int16_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIi32 "\n", tag, ln, (int32_t)type, vlen, *(int32_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIi64 "\n", tag, ln, (int32_t)type, vlen, *(int64_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIi64 "\n", tag, ln, (int32_t)type, vlen, *(int64_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIu8 "\n", tag, ln, (int32_t)type, vlen, *(uint8_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIu16 "\n", tag, ln, (int32_t)type, vlen, *(uint16_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIu32 "\n", tag, ln, (int32_t)type, vlen, *(uint32_t *)val);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
printf("%s:%d type:%d vlen:%d, val:%" PRIu64 "\n", tag, ln, (int32_t)type, vlen, *(uint64_t *)val);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// if (isLarge) {
|
||||
// p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
|
||||
// } else {
|
||||
// p = (uint8_t *)&pTag->idx[pTag->nTag];
|
||||
// }
|
||||
|
||||
// (*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal));
|
||||
// if (*ppArray == NULL) {
|
||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// goto _err;
|
||||
// }
|
||||
|
||||
// for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
|
||||
// if (isLarge) {
|
||||
// offset = ((int16_t *)pTag->idx)[iTag];
|
||||
// } else {
|
||||
// offset = pTag->idx[iTag];
|
||||
// }
|
||||
|
||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
|
||||
int8_t isJson = pTag->flags & TD_TAG_JSON;
|
||||
int8_t isLarge = pTag->flags & TD_TAG_LARGE;
|
||||
uint8_t *p = NULL;
|
||||
int16_t offset = 0;
|
||||
|
||||
if (isLarge) {
|
||||
p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
|
||||
} else {
|
||||
p = (uint8_t *)&pTag->idx[pTag->nTag];
|
||||
}
|
||||
printf("%s:%d >>> STAG === %s:%s, len: %d, nTag: %d, sver:%d\n", tag, ln, isJson ? "json" : "normal",
|
||||
isLarge ? "large" : "small", (int32_t)pTag->len, (int32_t)pTag->nTag, pTag->ver);
|
||||
for (uint16_t n = 0; n < pTag->nTag; ++n) {
|
||||
if (isLarge) {
|
||||
offset = ((int16_t *)pTag->idx)[n];
|
||||
} else {
|
||||
offset = pTag->idx[n];
|
||||
}
|
||||
STagVal tagVal = {0};
|
||||
if (isJson) {
|
||||
tagVal.pKey = (char *)POINTER_SHIFT(p, offset);
|
||||
} else {
|
||||
tagVal.cid = *(int16_t *)POINTER_SHIFT(p, offset);
|
||||
}
|
||||
printf("%s:%d loop[%d-%d] offset=%d\n", __func__, __LINE__, (int32_t)pTag->nTag, (int32_t)n, (int32_t)offset);
|
||||
tGetTagVal(p + offset, &tagVal, isJson);
|
||||
debugPrintTagVal(tagVal.type, tagVal.pData, tagVal.nData, __func__, __LINE__);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
||||
int32_t n = 0;
|
||||
|
||||
|
@ -549,9 +642,45 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
|||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
|
||||
} else {
|
||||
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
|
||||
if (p) memcpy(p + n, pTagVal->pData, pTagVal->nData);
|
||||
n += pTagVal->nData;
|
||||
p = p ? p + n : p;
|
||||
switch (pTagVal->type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
n += tPutI8(p, pTagVal->i8 ? 1 : 0);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
n += tPutI8(p, pTagVal->i8);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
n += tPutI16(p, pTagVal->i16);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
n += tPutI32(p, pTagVal->i32);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
n += tPutI64(p, pTagVal->i64);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
n += tPutFloat(p, pTagVal->f);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
n += tPutDouble(p, pTagVal->d);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
n += tPutU8(p, pTagVal->u8);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
n += tPutU16(p, pTagVal->u16);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
n += tPutU32(p, pTagVal->u32);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
n += tPutU64(p, pTagVal->u64);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
return n;
|
||||
|
@ -573,9 +702,42 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
|||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
|
||||
} else {
|
||||
pTagVal->pData = p + n;
|
||||
pTagVal->nData = TYPE_BYTES[pTagVal->type];
|
||||
n += pTagVal->nData;
|
||||
switch (pTagVal->type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
n += tGetI8(p + n, &pTagVal->i8);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
n += tGetI16(p, &pTagVal->i16);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
n += tGetI32(p, &pTagVal->i32);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
n += tGetI64(p, &pTagVal->i64);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
n += tGetFloat(p, &pTagVal->f);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
n += tGetDouble(p, &pTagVal->d);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
n += tGetU8(p, &pTagVal->u8);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
n += tGetU16(p, &pTagVal->u16);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
n += tGetU32(p, &pTagVal->u32);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
n += tGetU64(p, &pTagVal->u64);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
return n;
|
||||
|
@ -609,7 +771,7 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
|
|||
ASSERT(szTag <= INT16_MAX);
|
||||
|
||||
// build tag
|
||||
(*ppTag) = (STag *)taosMemoryMalloc(szTag);
|
||||
(*ppTag) = (STag *)taosMemoryCalloc(szTag, 1);
|
||||
if ((*ppTag) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -640,6 +802,8 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
|
|||
n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson);
|
||||
}
|
||||
|
||||
debugPrintSTag(*ppTag, __func__, __LINE__);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
@ -650,7 +814,7 @@ void tTagFree(STag *pTag) {
|
|||
if (pTag) taosMemoryFree(pTag);
|
||||
}
|
||||
|
||||
void tTagGet(STag *pTag, STagVal *pTagVal) {
|
||||
bool tTagGet(const STag *pTag, STagVal *pTagVal) {
|
||||
int16_t lidx = 0;
|
||||
int16_t ridx = pTag->nTag - 1;
|
||||
int16_t midx;
|
||||
|
@ -690,24 +854,34 @@ void tTagGet(STag *pTag, STagVal *pTagVal) {
|
|||
} else if (c > 0) {
|
||||
lidx = midx + 1;
|
||||
} else {
|
||||
pTagVal->type = tv.type;
|
||||
pTagVal->nData = tv.nData;
|
||||
pTagVal->pData = tv.pData;
|
||||
break;
|
||||
memcpy(pTagVal, &tv, sizeof(tv));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) {
|
||||
return tEncodeBinary(pEncoder, (const uint8_t *)pTag, pTag->len);
|
||||
}
|
||||
|
||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); }
|
||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) {
|
||||
uint32_t len = 0;
|
||||
return tDecodeBinary(pDecoder, (uint8_t **)ppTag, &len);
|
||||
}
|
||||
|
||||
int32_t tTagToValArray(STag *pTag, SArray **ppArray) {
|
||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray) {
|
||||
int32_t code = 0;
|
||||
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
|
||||
STagVal tv;
|
||||
uint8_t *p = NULL;
|
||||
STagVal tv = {0};
|
||||
int8_t isLarge = pTag->flags & TD_TAG_LARGE;
|
||||
int16_t offset = 0;
|
||||
|
||||
if (isLarge) {
|
||||
p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
|
||||
} else {
|
||||
p = (uint8_t *)&pTag->idx[pTag->nTag];
|
||||
}
|
||||
|
||||
(*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal));
|
||||
if (*ppArray == NULL) {
|
||||
|
@ -716,7 +890,12 @@ int32_t tTagToValArray(STag *pTag, SArray **ppArray) {
|
|||
}
|
||||
|
||||
for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
|
||||
tGetTagVal(p + pTag->idx[iTag], &tv, pTag->flags & TD_TAG_JSON);
|
||||
if (isLarge) {
|
||||
offset = ((int16_t *)pTag->idx)[iTag];
|
||||
} else {
|
||||
offset = pTag->idx[iTag];
|
||||
}
|
||||
tGetTagVal(p + offset, &tv, pTag->flags & TD_TAG_JSON);
|
||||
taosArrayPush(*ppArray, &tv);
|
||||
}
|
||||
|
||||
|
@ -1052,162 +1231,4 @@ void tdResetDataCols(SDataCols *pCols) {
|
|||
}
|
||||
}
|
||||
|
||||
SKVRow tdKVRowDup(SKVRow row) {
|
||||
SKVRow trow = taosMemoryMalloc(kvRowLen(row));
|
||||
if (trow == NULL) return NULL;
|
||||
|
||||
kvRowCpy(trow, row);
|
||||
return trow;
|
||||
}
|
||||
|
||||
static int compareColIdx(const void *a, const void *b) {
|
||||
const SColIdx *x = (const SColIdx *)a;
|
||||
const SColIdx *y = (const SColIdx *)b;
|
||||
if (x->colId > y->colId) {
|
||||
return 1;
|
||||
}
|
||||
if (x->colId < y->colId) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tdSortKVRowByColIdx(SKVRow row) { qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); }
|
||||
|
||||
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
|
||||
SColIdx *pColIdx = NULL;
|
||||
SKVRow row = *orow;
|
||||
SKVRow nrow = NULL;
|
||||
void *ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
|
||||
|
||||
if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row
|
||||
int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
|
||||
int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
|
||||
int oRowCols = kvRowNCols(row);
|
||||
|
||||
ASSERT(diff > 0);
|
||||
nrow = taosMemoryMalloc(nRowLen);
|
||||
if (nrow == NULL) return -1;
|
||||
|
||||
kvRowSetLen(nrow, nRowLen);
|
||||
kvRowSetNCols(nrow, oRowCols + 1);
|
||||
|
||||
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
|
||||
memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
|
||||
|
||||
pColIdx = kvRowColIdxAt(nrow, oRowCols);
|
||||
pColIdx->colId = colId;
|
||||
pColIdx->offset = kvRowValLen(row);
|
||||
|
||||
memcpy(kvRowColVal(nrow, pColIdx), value, diff); // copy new value
|
||||
|
||||
tdSortKVRowByColIdx(nrow);
|
||||
|
||||
*orow = nrow;
|
||||
taosMemoryFree(row);
|
||||
} else {
|
||||
ASSERT(((SColIdx *)ptr)->colId == colId);
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);
|
||||
|
||||
if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
|
||||
memcpy(pOldVal, value, varDataTLen(value));
|
||||
} else { // need to reallocate the memory
|
||||
int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
|
||||
ASSERT(nlen > 0);
|
||||
nrow = taosMemoryMalloc(nlen);
|
||||
if (nrow == NULL) return -1;
|
||||
|
||||
kvRowSetLen(nrow, nlen);
|
||||
kvRowSetNCols(nrow, kvRowNCols(row));
|
||||
|
||||
int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
|
||||
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
|
||||
memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
|
||||
// Copy left value part
|
||||
int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
|
||||
if (lsize > 0) {
|
||||
memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
|
||||
POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
|
||||
}
|
||||
|
||||
for (int i = 0; i < kvRowNCols(nrow); i++) {
|
||||
pColIdx = kvRowColIdxAt(nrow, i);
|
||||
|
||||
if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
|
||||
pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
|
||||
}
|
||||
}
|
||||
|
||||
*orow = nrow;
|
||||
taosMemoryFree(row);
|
||||
}
|
||||
} else {
|
||||
memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdEncodeKVRow(void **buf, SKVRow row) {
|
||||
// May change the encode purpose
|
||||
if (buf != NULL) {
|
||||
kvRowCpy(*buf, row);
|
||||
*buf = POINTER_SHIFT(*buf, kvRowLen(row));
|
||||
}
|
||||
|
||||
return kvRowLen(row);
|
||||
}
|
||||
|
||||
void *tdDecodeKVRow(void *buf, SKVRow *row) {
|
||||
*row = tdKVRowDup(buf);
|
||||
if (*row == NULL) return NULL;
|
||||
return POINTER_SHIFT(buf, kvRowLen(*row));
|
||||
}
|
||||
|
||||
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
|
||||
pBuilder->tCols = 128;
|
||||
pBuilder->nCols = 0;
|
||||
pBuilder->pColIdx = (SColIdx *)taosMemoryMalloc(sizeof(SColIdx) * pBuilder->tCols);
|
||||
if (pBuilder->pColIdx == NULL) return -1;
|
||||
pBuilder->alloc = 1024;
|
||||
pBuilder->size = 0;
|
||||
pBuilder->buf = taosMemoryMalloc(pBuilder->alloc);
|
||||
if (pBuilder->buf == NULL) {
|
||||
taosMemoryFree(pBuilder->pColIdx);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
|
||||
taosMemoryFreeClear(pBuilder->pColIdx);
|
||||
taosMemoryFreeClear(pBuilder->buf);
|
||||
}
|
||||
|
||||
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
|
||||
pBuilder->nCols = 0;
|
||||
pBuilder->size = 0;
|
||||
}
|
||||
|
||||
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
|
||||
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
|
||||
// if (tlen == 0) return NULL; // nCols == 0 means no tags
|
||||
|
||||
tlen += TD_KV_ROW_HEAD_SIZE;
|
||||
|
||||
SKVRow row = taosMemoryMalloc(tlen);
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
kvRowSetNCols(row, pBuilder->nCols);
|
||||
kvRowSetLen(row, tlen);
|
||||
|
||||
if (pBuilder->nCols > 0) {
|
||||
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
|
||||
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
#endif
|
|
@ -3846,7 +3846,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
|
|||
|
||||
if (pReq->type == TSDB_CHILD_TABLE) {
|
||||
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
|
||||
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1;
|
||||
if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
|
||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
|
||||
} else {
|
||||
|
@ -3858,8 +3858,6 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
|
|||
}
|
||||
|
||||
int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
|
||||
uint32_t len;
|
||||
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
|
||||
|
@ -3871,7 +3869,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
|
|||
|
||||
if (pReq->type == TSDB_CHILD_TABLE) {
|
||||
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
|
||||
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
|
||||
if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;
|
||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
|
||||
} else {
|
||||
|
|
|
@ -30,7 +30,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
||||
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
|
||||
if (tEncodeBinary(pCoder, pME->ctbEntry.pTags, kvRowLen(pME->ctbEntry.pTags)) < 0) return -1;
|
||||
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
|
||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
||||
|
@ -47,7 +47,6 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
}
|
||||
|
||||
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||
uint32_t len;
|
||||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
|
||||
|
@ -62,7 +61,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
|
||||
if (tDecodeBinary(pCoder, &pME->ctbEntry.pTags, &len) < 0) return -1; // (TODO)
|
||||
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
|
||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||
|
|
|
@ -575,5 +575,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
|
|||
|
||||
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) {
|
||||
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
|
||||
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid);
|
||||
STagVal tagVal = {.cid = cid};
|
||||
tTagGet((const STag *)pEntry->ctbEntry.pTags, &tagVal);
|
||||
return tagVal.pData;
|
||||
}
|
|
@ -31,9 +31,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
int vLen = 0;
|
||||
const void *pKey = NULL;
|
||||
const void *pVal = NULL;
|
||||
void * pBuf = NULL;
|
||||
void *pBuf = NULL;
|
||||
int32_t szBuf = 0;
|
||||
void * p = NULL;
|
||||
void *p = NULL;
|
||||
SMetaReader mr = {0};
|
||||
|
||||
// validate req
|
||||
|
@ -87,7 +87,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
|
|||
}
|
||||
|
||||
// drop all child tables
|
||||
TBC * pCtbIdxc = NULL;
|
||||
TBC *pCtbIdxc = NULL;
|
||||
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
|
||||
|
||||
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
|
||||
|
@ -142,8 +142,8 @@ _exit:
|
|||
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
SMetaEntry oStbEntry = {0};
|
||||
SMetaEntry nStbEntry = {0};
|
||||
TBC * pUidIdxc = NULL;
|
||||
TBC * pTbDbc = NULL;
|
||||
TBC *pUidIdxc = NULL;
|
||||
TBC *pTbDbc = NULL;
|
||||
const void *pData;
|
||||
int nData;
|
||||
int64_t oversion;
|
||||
|
@ -262,7 +262,7 @@ _err:
|
|||
}
|
||||
|
||||
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
|
||||
void * pData = NULL;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
int rc = 0;
|
||||
tb_uid_t uid;
|
||||
|
@ -288,7 +288,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
|||
}
|
||||
|
||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||
void * pData = NULL;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
int rc = 0;
|
||||
int64_t version;
|
||||
|
@ -324,14 +324,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
|||
}
|
||||
|
||||
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
|
||||
void * pVal = NULL;
|
||||
void *pVal = NULL;
|
||||
int nVal = 0;
|
||||
const void * pData = NULL;
|
||||
const void *pData = NULL;
|
||||
int nData = 0;
|
||||
int ret = 0;
|
||||
tb_uid_t uid;
|
||||
int64_t oversion;
|
||||
SSchema * pColumn = NULL;
|
||||
SSchema *pColumn = NULL;
|
||||
SMetaEntry entry = {0};
|
||||
SSchemaWrapper *pSchema;
|
||||
int c;
|
||||
|
@ -479,7 +479,7 @@ _err:
|
|||
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
|
||||
SMetaEntry ctbEntry = {0};
|
||||
SMetaEntry stbEntry = {0};
|
||||
void * pVal = NULL;
|
||||
void *pVal = NULL;
|
||||
int nVal = 0;
|
||||
int ret;
|
||||
int c;
|
||||
|
@ -510,7 +510,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
oversion = *(int64_t *)pData;
|
||||
|
||||
// search table.db
|
||||
TBC * pTbDbc = NULL;
|
||||
TBC *pTbDbc = NULL;
|
||||
SDecoder dc1 = {0};
|
||||
SDecoder dc2 = {0};
|
||||
|
||||
|
@ -534,7 +534,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
metaDecodeEntry(&dc2, &stbEntry);
|
||||
|
||||
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||
SSchema * pColumn = NULL;
|
||||
SSchema *pColumn = NULL;
|
||||
int32_t iCol = 0;
|
||||
for (;;) {
|
||||
pColumn = NULL;
|
||||
|
@ -563,29 +563,34 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
}
|
||||
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
|
||||
} else {
|
||||
SKVRowBuilder kvrb = {0};
|
||||
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
|
||||
SKVRow pNewTag = NULL;
|
||||
|
||||
tdInitKVRowBuilder(&kvrb);
|
||||
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
|
||||
STag *pNewTag = NULL;
|
||||
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
|
||||
SSchema *pCol = &pTagSchema->pSchema[i];
|
||||
if (iCol == i) {
|
||||
tdAddColToKVRow(&kvrb, pCol->colId, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
|
||||
tTagValPush(pTagArray, &pCol->colId, pCol->type, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal, false);
|
||||
} else {
|
||||
void *p = tdGetKVRowValOfCol(pOldTag, pCol->colId);
|
||||
if (p) {
|
||||
STagVal tagVal = {.cid = pCol->colId};
|
||||
if (tTagGet(pOldTag, &tagVal) && tagVal.pData) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
tdAddColToKVRow(&kvrb, pCol->colId, p, varDataTLen(p));
|
||||
tTagValPush(pTagArray, &pCol->colId, pCol->type, varDataVal(tagVal.pData), varDataLen(tagVal.pData), false);
|
||||
} else {
|
||||
tdAddColToKVRow(&kvrb, pCol->colId, p, pCol->bytes);
|
||||
tTagValPush(pTagArray, &pCol->colId, pCol->type, tagVal.pData, pCol->bytes, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb);
|
||||
tdDestroyKVRowBuilder(&kvrb);
|
||||
if ((terrno = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag)) < 0) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
goto _err;
|
||||
}
|
||||
ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
|
||||
taosArrayDestroy(pTagArray);
|
||||
}
|
||||
|
||||
// save to table.db
|
||||
|
@ -639,8 +644,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
|
|||
|
||||
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
STbDbKey tbDbKey;
|
||||
void * pKey = NULL;
|
||||
void * pVal = NULL;
|
||||
void *pKey = NULL;
|
||||
void *pVal = NULL;
|
||||
int kLen = 0;
|
||||
int vLen = 0;
|
||||
SEncoder coder = {0};
|
||||
|
@ -721,17 +726,17 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
||||
}
|
||||
|
||||
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
|
||||
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
||||
int32_t nTagData = 0;
|
||||
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type,
|
||||
tb_uid_t uid, STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
||||
// int32_t nTagData = 0;
|
||||
|
||||
if (pTagData) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
nTagData = varDataTLen(pTagData);
|
||||
} else {
|
||||
nTagData = tDataTypes[type].bytes;
|
||||
}
|
||||
}
|
||||
// if (pTagData) {
|
||||
// if (IS_VAR_DATA_TYPE(type)) {
|
||||
// nTagData = varDataTLen(pTagData);
|
||||
// } else {
|
||||
// nTagData = tDataTypes[type].bytes;
|
||||
// }
|
||||
// }
|
||||
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
|
||||
|
||||
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
|
||||
|
@ -755,14 +760,15 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
|
|||
}
|
||||
|
||||
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||
void * pData = NULL;
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
STbDbKey tbDbKey = {0};
|
||||
SMetaEntry stbEntry = {0};
|
||||
STagIdxKey * pTagIdxKey = NULL;
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
|
||||
const void * pTagData = NULL; //
|
||||
const void *pTagData = NULL; //
|
||||
int32_t nTagData = 0;
|
||||
SDecoder dc = {0};
|
||||
|
||||
// get super table
|
||||
|
@ -775,7 +781,11 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
metaDecodeEntry(&dc, &stbEntry);
|
||||
|
||||
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
|
||||
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
|
||||
|
||||
STagVal tagVal = {.cid = pTagColumn->colId};
|
||||
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
|
||||
pTagData = tagVal.pData;
|
||||
nTagData = (int32_t)tagVal.nData;
|
||||
|
||||
// update tag index
|
||||
#ifdef USE_INVERTED_INDEX
|
||||
|
@ -790,8 +800,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
|
||||
indexMultiTermDestroy(tmGroup);
|
||||
#else
|
||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
|
||||
&pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||
return -1;
|
||||
}
|
||||
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||
|
@ -804,7 +814,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
|
||||
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
SEncoder coder = {0};
|
||||
void * pVal = NULL;
|
||||
void *pVal = NULL;
|
||||
int vLen = 0;
|
||||
int rcode = 0;
|
||||
SSkmDbKey skmDbKey = {0};
|
||||
|
|
|
@ -313,17 +313,17 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_
|
|||
} else { // these are tags
|
||||
const char* p = NULL;
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
const uint8_t* tmp = mr.me.ctbEntry.pTags;
|
||||
const STag* tmp = (const STag*)mr.me.ctbEntry.pTags;
|
||||
|
||||
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
|
||||
char* data = taosMemoryCalloc(tmp->len + 1, 1);
|
||||
if (data == NULL) {
|
||||
metaReaderClear(&mr);
|
||||
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
|
||||
qError("doTagScan calloc error:%d", tmp->len + 1);
|
||||
return;
|
||||
}
|
||||
|
||||
*data = TSDB_DATA_TYPE_JSON;
|
||||
memcpy(data + 1, tmp, kvRowLen(tmp));
|
||||
memcpy(data + 1, tmp, tmp->len);
|
||||
p = data;
|
||||
} else {
|
||||
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
||||
|
@ -1584,16 +1584,16 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
colDataAppend(pDst, count, str, false);
|
||||
} else { // it is a tag value
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
const uint8_t* tmp = mr.me.ctbEntry.pTags;
|
||||
const STag* tmp = (const STag*)mr.me.ctbEntry.pTags;
|
||||
// TODO opt perf by realloc memory
|
||||
char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
|
||||
char* data = taosMemoryCalloc(tmp->len + 1, 1);
|
||||
if (data == NULL) {
|
||||
qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1);
|
||||
qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), tmp->len + 1);
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
*data = TSDB_DATA_TYPE_JSON;
|
||||
memcpy(data + 1, tmp, kvRowLen(tmp));
|
||||
memcpy(data + 1, tmp, tmp->len);
|
||||
colDataAppend(pDst, count, data, false);
|
||||
taosMemoryFree(data);
|
||||
} else {
|
||||
|
@ -1628,8 +1628,8 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
|
||||
SSDataBlock* pResBlock, SArray* pColMatchInfo,
|
||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
|
|
@ -58,7 +58,9 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta);
|
|||
int32_t getNumOfTags(const STableMeta* pTableMeta);
|
||||
STableComInfo getTableInfo(const STableMeta* pTableMeta);
|
||||
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* errMsg, int16_t startColId);
|
||||
#endif
|
||||
|
||||
int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen);
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ typedef struct SInsertParseContext {
|
|||
SMsgBuf msg; // input
|
||||
STableMeta* pTableMeta; // each table
|
||||
SParsedDataColInfo tags; // each table
|
||||
SKVRowBuilder tagsBuilder; // each table
|
||||
SArray* pTagVals; // each table
|
||||
SVCreateTbReq createTblReq; // each table
|
||||
SHashObj* pVgroupsHashObj; // global
|
||||
SHashObj* pTableBlockHashObj; // global
|
||||
|
@ -72,9 +72,10 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
|||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
||||
typedef struct SKvParam {
|
||||
SKVRowBuilder* builder;
|
||||
SSchema* schema;
|
||||
char buf[TSDB_MAX_TAGS_LEN];
|
||||
int16_t pos;
|
||||
SArray* pTagVals;
|
||||
SSchema* schema;
|
||||
char buf[TSDB_MAX_TAGS_LEN];
|
||||
} SKvParam;
|
||||
|
||||
typedef struct SMemParam {
|
||||
|
@ -212,7 +213,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
|
|||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
}
|
||||
|
||||
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
strncpy(tbname, p + 1, tbLen);
|
||||
/*tbLen = */ strdequote(tbname);
|
||||
|
||||
|
@ -619,14 +620,14 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
|
|||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
return func(pMsgBuf, pToken->z, pToken->n, param);
|
||||
}
|
||||
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
case TSDB_DATA_TYPE_JSON: {
|
||||
if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||
return buildSyntaxErrMsg(pMsgBuf, "json string too long than 4095", pToken->z);
|
||||
}
|
||||
return func(pMsgBuf, pToken->z, pToken->n, param);
|
||||
}
|
||||
|
||||
#endif
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
int64_t tmpVal;
|
||||
if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -757,9 +758,11 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi
|
|||
int8_t type = pa->schema->type;
|
||||
int16_t colId = pa->schema->colId;
|
||||
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
if (TSDB_DATA_TYPE_JSON == type) {
|
||||
return parseJsontoTagData(value, pa->builder, pMsgBuf, colId);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (value == NULL) { // it is a null data
|
||||
// tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
|
||||
|
@ -768,49 +771,56 @@ static int32_t KvRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, voi
|
|||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_BINARY == type) {
|
||||
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
|
||||
tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
|
||||
memcpy(pa->buf + pa->pos, value, len);
|
||||
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), len, false);
|
||||
pa->pos += len;
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == type) {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
|
||||
ASSERT((pa->pos + pa->schema->bytes - VARSTR_HEADER_SIZE) <= TSDB_MAX_TAGS_LEN);
|
||||
|
||||
int32_t output = 0;
|
||||
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
if (!taosMbsToUcs4(value, len, (TdUcs4*)(pa->buf + pa->pos), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
|
||||
}
|
||||
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
|
||||
return buildSyntaxErrMsg(pMsgBuf, buf, value);
|
||||
}
|
||||
|
||||
varDataSetLen(pa->buf, output);
|
||||
tdAddColToKVRow(pa->builder, colId, pa->buf, varDataTLen(pa->buf));
|
||||
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), output, false);
|
||||
pa->pos += output;
|
||||
} else {
|
||||
tdAddColToKVRow(pa->builder, colId, value, TYPE_BYTES[type]);
|
||||
memcpy(pa->buf + pa->pos, value, TYPE_BYTES[type]);
|
||||
tTagValPush(pa->pTagVals, &colId, type, (uint8_t*)(pa->buf + pa->pos), TYPE_BYTES[type], false);
|
||||
pa->pos + TYPE_BYTES[type];
|
||||
}
|
||||
ASSERT(pa->pos <= TSDB_MAX_TAGS_LEN);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, SKVRow row, int64_t suid) {
|
||||
static int32_t buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid) {
|
||||
pTbReq->type = TD_CHILD_TABLE;
|
||||
pTbReq->name = strdup(tname);
|
||||
pTbReq->ctb.suid = suid;
|
||||
pTbReq->ctb.pTag = row;
|
||||
pTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// pSql -> tag1_value, ...)
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint8_t precision, const char* tName) {
|
||||
if (tdInitKVRowBuilder(&pCxt->tagsBuilder) < 0) {
|
||||
ASSERT(!pCxt->pTagVals);
|
||||
if (!(pCxt->pTagVals = taosArrayInit(pCxt->tags.numOfBound, sizeof(STagVal)))) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SKvParam param = {.builder = &pCxt->tagsBuilder};
|
||||
SKvParam param = {.pTagVals = pCxt->pTagVals, .pos = 0};
|
||||
SToken sToken;
|
||||
bool isParseBindParam = false;
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
// TODO: JSON_TAG_REFACTOR => here would have json tag?
|
||||
for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
|
||||
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
|
||||
|
||||
|
@ -837,13 +847,13 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||
if (NULL == row) {
|
||||
// TODO: JSON_TAG_REFACTOR (would be JSON tag or normal tag)
|
||||
STag* pTag = NULL;
|
||||
if (tTagNew(param.pTagVals, 1, false, &pTag) != 0) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
|
||||
}
|
||||
tdSortKVRowByColIdx(row);
|
||||
|
||||
return buildCreateTbReq(&pCxt->createTblReq, tName, row, pCxt->pTableMeta->suid);
|
||||
return buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid);
|
||||
}
|
||||
|
||||
static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
||||
|
@ -1062,7 +1072,7 @@ void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
|
|||
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
|
||||
taosMemoryFreeClear(pCxt->pTableMeta);
|
||||
destroyBoundColumnInfo(&pCxt->tags);
|
||||
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
|
||||
taosArrayDestroy(pCxt->pTagVals);
|
||||
destroyCreateSubTbReq(&pCxt->createTblReq);
|
||||
}
|
||||
|
||||
|
@ -1082,9 +1092,9 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
|||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||
int32_t tbNum = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
bool autoCreateTbl = false;
|
||||
int32_t tbNum = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
bool autoCreateTbl = false;
|
||||
|
||||
// for each table
|
||||
while (1) {
|
||||
|
@ -1186,8 +1196,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
||||
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
|
||||
pCxt->pTableBlockHashObj);
|
||||
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl,
|
||||
pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
|
||||
|
||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||
pCxt->pVgroupsHashObj = NULL;
|
||||
|
@ -1219,6 +1229,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
||||
.pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
||||
.totalNum = 0,
|
||||
.pTagVals = NULL,
|
||||
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
|
||||
.pStmtCb = pContext->pStmtCb};
|
||||
|
||||
|
@ -1332,13 +1343,13 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
|
|||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
SKVRowBuilder tagBuilder;
|
||||
if (tdInitKVRowBuilder(&tagBuilder) < 0) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
}
|
||||
|
||||
SSchema* pSchema = pDataBlock->pTableMeta->schema;
|
||||
SKvParam param = {.builder = &tagBuilder};
|
||||
SKvParam param = {.pTagVals = pTagArray, .pos = 0};
|
||||
|
||||
for (int c = 0; c < tags->numOfBound; ++c) {
|
||||
if (bind[c].is_null && bind[c].is_null[0]) {
|
||||
|
@ -1357,19 +1368,19 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
|
|||
CHECK_CODE(KvRowAppend(&pBuf, (char*)bind[c].buffer, colLen, ¶m));
|
||||
}
|
||||
|
||||
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
|
||||
if (NULL == row) {
|
||||
tdDestroyKVRowBuilder(&tagBuilder);
|
||||
STag* pTag = NULL;
|
||||
|
||||
// TODO: JSON_TAG_REFACTOR (if is json or not)?
|
||||
if (0 != tTagNew(pTagArray, 1, false, &pTag)) {
|
||||
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
}
|
||||
tdSortKVRowByColIdx(row);
|
||||
|
||||
SVCreateTbReq tbReq = {0};
|
||||
CHECK_CODE(buildCreateTbReq(&tbReq, tName, row, suid));
|
||||
CHECK_CODE(buildCreateTbReq(&tbReq, tName, pTag, suid));
|
||||
CHECK_CODE(buildCreateTbMsg(pDataBlock, &tbReq));
|
||||
|
||||
destroyCreateSubTbReq(&tbReq);
|
||||
tdDestroyKVRowBuilder(&tagBuilder);
|
||||
taosArrayDestroy(pTagArray);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1601,7 +1612,6 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields
|
|||
|
||||
typedef struct SmlExecTableHandle {
|
||||
SParsedDataColInfo tags; // each table
|
||||
SKVRowBuilder tagsBuilder; // each table
|
||||
SVCreateTbReq createTblReq; // each table
|
||||
} SmlExecTableHandle;
|
||||
|
||||
|
@ -1613,7 +1623,6 @@ typedef struct SmlExecHandle {
|
|||
|
||||
static void smlDestroyTableHandle(void* pHandle) {
|
||||
SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
|
||||
tdDestroyKVRowBuilder(&handle->tagsBuilder);
|
||||
destroyBoundColumnInfo(&handle->tags);
|
||||
destroyCreateSubTbReq(&handle->createTblReq);
|
||||
}
|
||||
|
@ -1689,13 +1698,23 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema,
|
||||
SKVRow* row, SMsgBuf* msg) {
|
||||
if (tdInitKVRowBuilder(tagsBuilder) < 0) {
|
||||
/**
|
||||
* @brief No json tag for schemaless
|
||||
*
|
||||
* @param cols
|
||||
* @param tags
|
||||
* @param pSchema
|
||||
* @param ppTag
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SMsgBuf* msg) {
|
||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SKvParam param = {.builder = tagsBuilder};
|
||||
SKvParam param = {.pTagVals = pTagArray, .pos = 0};
|
||||
for (int i = 0; i < tags->numOfBound; ++i) {
|
||||
SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
|
||||
param.schema = pTagSchema;
|
||||
|
@ -1707,11 +1726,12 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
|
|||
}
|
||||
}
|
||||
|
||||
*row = tdGetKVRowFromBuilder(tagsBuilder);
|
||||
if (*row == NULL) {
|
||||
if (tTagNew(pTagArray, 1, false, ppTag) != 0) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
tdSortKVRowByColIdx(*row);
|
||||
|
||||
taosArrayDestroy(pTagArray);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1728,14 +1748,13 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
|||
buildInvalidOperationMsg(&pBuf, "bound tags error");
|
||||
return ret;
|
||||
}
|
||||
SKVRow row = NULL;
|
||||
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
|
||||
&row, &pBuf);
|
||||
STag* pTag = NULL;
|
||||
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &pBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, row, pTableMeta->suid);
|
||||
buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid);
|
||||
|
||||
STableDataBlocks* pDataBlock = NULL;
|
||||
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||
|
|
|
@ -4131,8 +4131,8 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, SKVRow row,
|
||||
uint64_t suid, SVgroupInfo* pVgInfo) {
|
||||
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
|
||||
const STag* pTag, uint64_t suid, SVgroupInfo* pVgInfo) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
|
||||
strcpy(name.dbname, pStmt->dbName);
|
||||
|
@ -4142,7 +4142,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
|
|||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(pStmt->tableName);
|
||||
req.ctb.suid = suid;
|
||||
req.ctb.pTag = row;
|
||||
req.ctb.pTag = (uint8_t*)pTag;
|
||||
if (pStmt->ignoreExists) {
|
||||
req.flags |= TD_CREATE_IF_NOT_EXISTS;
|
||||
}
|
||||
|
@ -4162,23 +4162,25 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
||||
SKVRowBuilder* pBuilder) {
|
||||
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
|
||||
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
|
||||
}
|
||||
|
||||
return parseJsontoTagData(pVal->literal, pBuilder, &pCxt->msgBuf, pSchema->colId);
|
||||
// static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
||||
// SKVRowBuilder* pBuilder) {
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
|
||||
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
|
||||
}
|
||||
|
||||
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
||||
tdAddColToKVRow(pBuilder, pSchema->colId, nodesGetValueFromNode(pVal),
|
||||
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return parseJsontoTagData(pVal->literal, pBuilder, &pCxt->msgBuf, pSchema->colId);
|
||||
}
|
||||
#endif
|
||||
|
||||
// if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
||||
// tdAddColToKVRow(pBuilder, pSchema->colId, nodesGetValueFromNode(pVal),
|
||||
// IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
|
||||
// }
|
||||
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
|
||||
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
|
||||
int32_t code = getFuncInfo(pCxt, pFunc);
|
||||
|
@ -4207,13 +4209,22 @@ static int32_t translateTagVal(STranslateContext* pCxt, uint8_t precision, SSche
|
|||
}
|
||||
|
||||
static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta,
|
||||
SKVRowBuilder* pBuilder) {
|
||||
STag** ppTag) {
|
||||
int32_t numOfTags = getNumOfTags(pSuperTableMeta);
|
||||
if (LIST_LENGTH(pStmt->pValsOfTags) != LIST_LENGTH(pStmt->pSpecificTags) ||
|
||||
numOfTags < LIST_LENGTH(pStmt->pValsOfTags)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
|
||||
}
|
||||
|
||||
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
|
||||
char* pTagBuf = taosMemoryCalloc(1, TSDB_MAX_TAGS_LEN);
|
||||
if (!pTagArray || !pTagBuf) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(pTagBuf);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
int32_t code = 0;
|
||||
int16_t nTags = 0, nBufPos = 0;
|
||||
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||
SNode * pTag, *pNode;
|
||||
FORBOTH(pTag, pStmt->pSpecificTags, pNode, pStmt->pValsOfTags) {
|
||||
|
@ -4226,10 +4237,12 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
|
|||
}
|
||||
}
|
||||
if (NULL == pSchema) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(pTagBuf);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
|
||||
}
|
||||
SValueNode* pVal = NULL;
|
||||
int32_t code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal);
|
||||
code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pSchema, pNode, &pVal);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pVal) {
|
||||
pVal = (SValueNode*)pNode;
|
||||
|
@ -4237,29 +4250,72 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
|
|||
REPLACE_LIST2_NODE(pVal);
|
||||
}
|
||||
}
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addValToKVRow(pCxt, pVal, pSchema, pBuilder);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
||||
// TODO: JSON_TAG_TODO: is copy is a must?
|
||||
void* nodeVal = nodesGetValueFromNode(pVal);
|
||||
if (IS_VAR_DATA_TYPE(pSchema->type)) {
|
||||
memcpy(pTagBuf + nBufPos, varDataVal(nodeVal), varDataLen(nodeVal));
|
||||
tTagValPush(pTagArray, &pSchema->colId, pSchema->type, (uint8_t*)pTagBuf + nBufPos, varDataLen(nodeVal), false);
|
||||
nBufPos += varDataLen(pVal->datum.p);
|
||||
} else {
|
||||
memcpy(pTagBuf + nBufPos, varDataVal(nodeVal), TYPE_BYTES[pSchema->type]);
|
||||
tTagValPush(pTagArray, &pSchema->colId, pSchema->type, (uint8_t*)pTagBuf + nBufPos, TYPE_BYTES[pSchema->type],
|
||||
false);
|
||||
nBufPos += TYPE_BYTES[pSchema->type];
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(pTagBuf);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: JSON_TAG_TODO: version
|
||||
code = tTagNew(pTagArray, 1, false, ppTag);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(pTagBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(pTagBuf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta,
|
||||
SKVRowBuilder* pBuilder) {
|
||||
STag** ppTag) {
|
||||
if (getNumOfTags(pSuperTableMeta) != LIST_LENGTH(pStmt->pValsOfTags)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
|
||||
}
|
||||
|
||||
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||
SSchema* pTagSchemas = getTableTagSchema(pSuperTableMeta);
|
||||
SNode* pNode;
|
||||
int32_t code = 0;
|
||||
int32_t index = 0;
|
||||
SArray* pTagArray = taosArrayInit(LIST_LENGTH(pStmt->pValsOfTags), sizeof(STagVal));
|
||||
char* pTagBuf = taosMemoryCalloc(1, TSDB_MAX_TAGS_LEN);
|
||||
|
||||
const char* qTagBuf = pTagBuf;
|
||||
|
||||
if (!pTagArray || !pTagBuf) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(qTagBuf);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
FOREACH(pNode, pStmt->pValsOfTags) {
|
||||
SValueNode* pVal = NULL;
|
||||
int32_t code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema + index, pNode, &pVal);
|
||||
SSchema* pTagSchema = pTagSchemas + index;
|
||||
code = translateTagVal(pCxt, pSuperTableMeta->tableInfo.precision, pTagSchema, pNode, &pVal);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pVal) {
|
||||
pVal = (SValueNode*)pNode;
|
||||
|
@ -4267,14 +4323,46 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
|
|||
REPLACE_NODE(pVal);
|
||||
}
|
||||
}
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addValToKVRow(pCxt, pVal, pTagSchema + index++, pBuilder);
|
||||
}
|
||||
#endif
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
||||
char* tmpVal = nodesGetValueFromNode(pVal);
|
||||
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||
memcpy(pTagBuf, varDataVal(tmpVal), varDataLen(tmpVal));
|
||||
tTagValPush(pTagArray, &pTagSchema->colId, pTagSchema->type, (uint8_t*)pTagBuf, varDataLen(tmpVal), false);
|
||||
pTagBuf += varDataLen(tmpVal);
|
||||
} else {
|
||||
memcpy(pTagBuf, tmpVal, TYPE_BYTES[pTagSchema->type]);
|
||||
tTagValPush(pTagArray, &pTagSchema->colId, pTagSchema->type, (uint8_t*)pTagBuf, TYPE_BYTES[pTagSchema->type],
|
||||
false);
|
||||
pTagBuf += TYPE_BYTES[pTagSchema->type];
|
||||
}
|
||||
}
|
||||
++index;
|
||||
}
|
||||
// TODO: buf is need to store the tags
|
||||
|
||||
// TODO: JSON_TAG_TODO remove below codes if code is 0 all the time.
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(qTagBuf);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, code);
|
||||
}
|
||||
}
|
||||
// TODO: JSON_TAG_TODO: version?
|
||||
// TODO: JSON_TAG_REFACTOR: json or not
|
||||
if (0 != (code = tTagNew(pTagArray, 1, false, ppTag))) {
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(qTagBuf);
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, code);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosMemoryFreeClear(qTagBuf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -4292,26 +4380,13 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
|
|||
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
|
||||
}
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tdInitKVRowBuilder(&kvRowBuilder);
|
||||
}
|
||||
STag* pTag = NULL;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL != pStmt->pSpecificTags) {
|
||||
code = buildKVRowForBindTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder);
|
||||
code = buildKVRowForBindTags(pCxt, pStmt, pSuperTableMeta, &pTag);
|
||||
} else {
|
||||
code = buildKVRowForAllTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder);
|
||||
}
|
||||
}
|
||||
|
||||
SKVRow row = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
row = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
if (NULL == row) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tdSortKVRowByColIdx(row);
|
||||
code = buildKVRowForAllTags(pCxt, pStmt, pSuperTableMeta, &pTag);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4320,11 +4395,10 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
|
|||
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, row, pSuperTableMeta->uid, &info);
|
||||
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, &info);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pSuperTableMeta);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -4546,6 +4620,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
|
||||
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
|
||||
if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
int32_t code = tdInitKVRowBuilder(&kvRowBuilder);
|
||||
|
||||
|
@ -4571,6 +4646,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
pReq->pTagVal = row;
|
||||
pStmt->pVal->datum.p = row; // for free
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
#endif
|
||||
} else {
|
||||
pReq->nTagVal = pStmt->pVal->node.resType.bytes;
|
||||
if (TSDB_DATA_TYPE_NCHAR == pStmt->pVal->node.resType.type) {
|
||||
|
|
|
@ -322,6 +322,7 @@ static bool isValidateTag(char* input) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#ifdef JSON_TAG_REFACTOR
|
||||
int32_t parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* pMsgBuf, int16_t startColId) {
|
||||
// set json NULL data
|
||||
uint8_t jsonNULL = TSDB_DATA_TYPE_NULL;
|
||||
|
@ -442,6 +443,7 @@ end:
|
|||
cJSON_Delete(root);
|
||||
return retCode;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t userAuthToString(int32_t acctId, const char* pUser, const char* pDb, AUTH_TYPE type, char* pStr) {
|
||||
return sprintf(pStr, "%s*%d.%s*%d", pUser, acctId, pDb, type);
|
||||
|
|
|
@ -922,8 +922,13 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
char *getJsonValue(char *json, char *key){ //todo
|
||||
json++; // jump type
|
||||
char *getJsonValue(char *json, char *key) { // todo
|
||||
json++; // jump type
|
||||
|
||||
STagVal tagVal = {.pKey = key};
|
||||
tTagGet(((const STag *)json), &tagVal);
|
||||
return (char *)tagVal.pData;
|
||||
#if 0
|
||||
int16_t cols = kvRowNCols(json);
|
||||
for (int i = 0; i < cols; ++i) {
|
||||
SColIdx *pColIdx = kvRowColIdxAt(json, i);
|
||||
|
@ -939,6 +944,7 @@ char *getJsonValue(char *json, char *key){ //todo
|
|||
}
|
||||
}
|
||||
return NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
|
||||
|
|
|
@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
|
|||
|
||||
SNode *pLeft = NULL, *pRight = NULL;
|
||||
scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar);
|
||||
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, kvRowLen(json), 1, json);
|
||||
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, ((STag*)json)->len, 1, json);
|
||||
scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight);
|
||||
}
|
||||
|
||||
|
@ -1105,6 +1105,7 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
|
|||
nodesDestroyNode(opNode);
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST(columnTest, json_column_arith_op) {
|
||||
scltInitLogFile();
|
||||
char *rightvTmp= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44}";
|
||||
|
@ -1178,7 +1179,7 @@ TEST(columnTest, json_column_arith_op) {
|
|||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
taosMemoryFree(row);
|
||||
}
|
||||
|
||||
#endif
|
||||
void *prepareNchar(char* rightData){
|
||||
int32_t len = 0;
|
||||
int32_t inputLen = strlen(rightData);
|
||||
|
@ -1188,7 +1189,7 @@ void *prepareNchar(char* rightData){
|
|||
varDataSetLen(t, len);
|
||||
return t;
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST(columnTest, json_column_logic_op) {
|
||||
scltInitLogFile();
|
||||
char *rightvTmp= "{\"k1\":4,\"k2\":\"hello\",\"k3\":null,\"k4\":true,\"k5\":5.44,\"k6\":\"6.6hello\"}";
|
||||
|
@ -1308,6 +1309,7 @@ TEST(columnTest, json_column_logic_op) {
|
|||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
taosMemoryFree(row);
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(columnTest, smallint_value_add_int_column) {
|
||||
scltInitLogFile();
|
||||
|
|
Loading…
Reference in New Issue