Merge branch 'feat/tag_refact' of https://github.com/taosdata/TDengine into feat/row_refact

This commit is contained in:
Hongze Cheng 2022-05-28 07:28:58 +00:00
commit 4ed29dbf18
3 changed files with 158 additions and 103 deletions

View File

@ -18,6 +18,7 @@
#include "os.h"
#include "talgo.h"
#include "tarray.h"
#include "tencode.h"
#include "ttypes.h"
#include "tutil.h"
@ -59,12 +60,12 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData);
void tTagGet(STag *pTag, STagVal *pTagVal);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(STag *pTag, SArray **ppArray);
// STRUCT =================
struct STColumn {
@ -118,7 +119,10 @@ struct SColVal {
};
struct STagVal {
int16_t cid;
union {
int16_t cid;
char *pKey;
};
int8_t type;
uint32_t nData;
uint8_t *pData;

View File

@ -642,6 +642,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n
return n;
}
static FORCE_INLINE int32_t tPutCStr(uint8_t* p, char* pData) {
return tPutBinary(p, (uint8_t*)pData, strlen(pData) + 1);
}
static FORCE_INLINE int32_t tGetCStr(uint8_t* p, char** ppData) { return tGetBinary(p, (uint8_t**)ppData, NULL); }
#ifdef __cplusplus
}
#endif

View File

@ -31,16 +31,13 @@ typedef struct {
} STSKVRow;
#pragma pack(pop)
typedef struct STagIdx {
int16_t cid;
uint16_t offset;
} STagIdx;
#pragma pack(push, 1)
struct STag {
uint16_t len;
uint16_t nTag;
STagIdx idx[];
int8_t isJson;
int16_t len;
int16_t nTag;
int32_t ver;
int16_t idx[];
};
#pragma pack(pop)
@ -521,123 +518,150 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
return 0;
}
static FORCE_INLINE int tTagIdxCmprFn(const void *p1, const void *p2) {
STagIdx *pTagIdx1 = (STagIdx *)p1;
STagIdx *pTagIdx2 = (STagIdx *)p2;
if (pTagIdx1->cid < pTagIdx1->cid) {
static int tTagValCmprFn(const void *p1, const void *p2) {
if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
return -1;
} else if (pTagIdx1->cid > pTagIdx1->cid) {
} else if (((STagVal *)p1)->cid > ((STagVal *)p2)->cid) {
return 1;
}
ASSERT(0);
return 0;
}
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag) {
STagVal *pTagVal;
uint8_t *p;
int32_t n;
uint16_t tsize = sizeof(STag) + sizeof(STagIdx) * nTag;
static int tTagValJsonCmprFn(const void *p1, const void *p2) {
return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey);
}
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
int32_t n = 0;
// key
if (isJson) {
n += tPutCStr(p ? p + n : p, pTagVal->pKey);
} else {
n += tPutI16v(p ? p + n : p, pTagVal->cid);
}
// type
n += tPutI8(p ? p + n : p, pTagVal->type);
// value
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
} else {
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
if (p) memcpy(p + n, pTagVal->pData, pTagVal->nData);
n += pTagVal->nData;
}
return n;
}
static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
int32_t n = 0;
// key
if (isJson) {
n += tGetCStr(p + n, &pTagVal->pKey);
} else {
n += tGetI16v(p + n, &pTagVal->cid);
}
// type
n += tGetI8(p + n, &pTagVal->type);
// value
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
} else {
pTagVal->pData = p + n;
pTagVal->nData = TYPE_BYTES[pTagVal->type];
n += pTagVal->nData;
}
return n;
}
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
int32_t code = 0;
uint8_t *p = NULL;
int16_t n = 0;
int16_t nTag = taosArrayGetSize(pArray);
int32_t szTag = sizeof(STag) + sizeof(int16_t) * nTag;
// sort
if (isJson) {
qsort(pArray->pData, nTag, sizeof(STagVal), tTagValJsonCmprFn);
} else {
qsort(pArray->pData, nTag, sizeof(STagVal), tTagValCmprFn);
}
// get size
for (int16_t iTag = 0; iTag < nTag; iTag++) {
pTagVal = &pTagVals[iTag];
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
tsize += tPutBinary(NULL, pTagVal->pData, pTagVal->nData);
} else {
ASSERT(pTagVal->nData == TYPE_BYTES[pTagVal->type]);
tsize += pTagVal->nData;
}
szTag += tPutTagVal(NULL, (STagVal *)taosArrayGet(pArray, iTag), isJson);
}
(*ppTag) = (STag *)taosMemoryMalloc(tsize);
if (*ppTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
// TODO
// if (szTag >= 16 * 1024) {
// code = TSDB_CODE_IVLD_TAG;
// goto _err;
// }
// build tag
(*ppTag) = (STag *)taosMemoryMalloc(szTag);
if ((*ppTag) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
p = (uint8_t *)&((*ppTag)->idx[nTag]);
n = 0;
(*ppTag)->len = tsize;
(*ppTag)->isJson = isJson ? 1 : 0;
(*ppTag)->len = szTag;
(*ppTag)->nTag = nTag;
(*ppTag)->ver = version;
p = (uint8_t *)&(*ppTag)->idx[nTag];
n = 0;
for (int16_t iTag = 0; iTag < nTag; iTag++) {
pTagVal = &pTagVals[iTag];
(*ppTag)->idx[iTag].cid = pTagVal->cid;
(*ppTag)->idx[iTag].offset = n;
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tPutBinary(p + n, pTagVal->pData, pTagVal->nData);
} else {
memcpy(p + n, pTagVal->pData, pTagVal->nData);
n += pTagVal->nData;
}
(*ppTag)->idx[iTag] = n;
n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson);
}
qsort((*ppTag)->idx, (*ppTag)->nTag, sizeof(STagIdx), tTagIdxCmprFn);
return 0;
return code;
_err:
return code;
}
void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag);
}
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag) {
STagVal *pTagVals;
int16_t nTags = 0;
SSchema *pColumn;
uint8_t *p;
uint32_t n;
void tTagGet(STag *pTag, STagVal *pTagVal) {
int16_t lidx = 0;
int16_t ridx = pTag->nTag - 1;
int16_t midx;
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
STagVal tv;
int c;
pTagVals = (STagVal *)taosMemoryMalloc(sizeof(*pTagVals) * nCols);
if (pTagVals == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTagVal->type = TSDB_DATA_TYPE_NULL;
pTagVal->pData = NULL;
pTagVal->nData = 0;
while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
for (int32_t i = 0; i < nCols; i++) {
pColumn = &pSchema[i];
if (i == iCol) {
p = pData;
n = nData;
tGetTagVal(p + pTag->idx[midx], &tv, pTag->isJson);
if (pTag->isJson) {
c = tTagValJsonCmprFn(pTagVal, &tv);
} else {
tTagGet(pTag, pColumn->colId, pColumn->type, &p, &n);
c = tTagValCmprFn(pTagVal, &tv);
}
if (p == NULL) continue;
ASSERT(IS_VAR_DATA_TYPE(pColumn->type) || n == pColumn->bytes);
pTagVals[nTags].cid = pColumn->colId;
pTagVals[nTags].type = pColumn->type;
pTagVals[nTags].nData = n;
pTagVals[nTags].pData = p;
nTags++;
}
// create new tag
if (tTagNew(pTagVals, nTags, ppTag) < 0) {
taosMemoryFree(pTagVals);
return -1;
}
taosMemoryFree(pTagVals);
return 0;
}
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData) {
STagIdx *pTagIdx = bsearch(&((STagIdx){.cid = cid}), pTag->idx, pTag->nTag, sizeof(STagIdx), tTagIdxCmprFn);
if (pTagIdx == NULL) {
*ppData = NULL;
*nData = 0;
} else {
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag] + pTagIdx->offset;
if (IS_VAR_DATA_TYPE(type)) {
tGetBinary(p, ppData, nData);
if (c < 0) {
ridx = midx - 1;
} else if (c > 0) {
lidx = midx + 1;
} else {
*ppData = p;
*nData = TYPE_BYTES[type];
pTagVal->type = tv.type;
pTagVal->nData = tv.nData;
pTagVal->pData = tv.pData;
break;
}
}
}
@ -648,6 +672,28 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag) {
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag) { return tDecodeBinary(pDecoder, (uint8_t **)ppTag, NULL); }
int32_t tTagToValArray(STag *pTag, SArray **ppArray) {
int32_t code = 0;
uint8_t *p = (uint8_t *)&pTag->idx[pTag->nTag];
STagVal tv;
(*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal));
if (*ppArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
tGetTagVal(p + pTag->idx[iTag], &tv, pTag->isJson);
taosArrayPush(*ppArray, &tv);
}
return code;
_err:
return code;
}
#if 1 // ===================================================================================================================
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {