fix table meta query problem
This commit is contained in:
parent
36437b65ce
commit
d9cd1a7a0f
|
@ -38,7 +38,7 @@ typedef struct SMetaCfg {
|
||||||
} SMetaCfg;
|
} SMetaCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t nCols;
|
uint32_t nCols;
|
||||||
SSchema *pSchema;
|
SSchema *pSchema;
|
||||||
} SSchemaWrapper;
|
} SSchemaWrapper;
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
|
int32_t padding;
|
||||||
} SSchemaKey;
|
} SSchemaKey;
|
||||||
|
|
||||||
struct SMetaDB {
|
struct SMetaDB {
|
||||||
|
@ -55,6 +56,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *
|
||||||
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
|
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
|
||||||
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
|
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
|
||||||
static void metaClearTbCfg(STbCfg *pTbCfg);
|
static void metaClearTbCfg(STbCfg *pTbCfg);
|
||||||
|
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
|
||||||
|
static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
|
||||||
|
|
||||||
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
|
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
|
||||||
|
|
||||||
|
@ -169,18 +172,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
pBuf = buf;
|
pBuf = buf;
|
||||||
memset(&key, 0, sizeof(key));
|
memset(&key, 0, sizeof(key));
|
||||||
memset(&value, 0, sizeof(key));
|
memset(&value, 0, sizeof(key));
|
||||||
SSchemaKey schemaKey = {uid, 0 /*TODO*/};
|
SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
|
||||||
|
|
||||||
key.data = &schemaKey;
|
key.data = &schemaKey;
|
||||||
key.size = sizeof(schemaKey);
|
key.size = sizeof(schemaKey);
|
||||||
|
|
||||||
taosEncodeFixedU32(&pBuf, ncols);
|
SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
|
||||||
for (size_t i = 0; i < ncols; i++) {
|
metaEncodeSchema(&pBuf, &sw);
|
||||||
taosEncodeFixedI8(&pBuf, pSchema[i].type);
|
|
||||||
taosEncodeFixedI32(&pBuf, pSchema[i].colId);
|
|
||||||
taosEncodeFixedI32(&pBuf, pSchema[i].bytes);
|
|
||||||
taosEncodeString(&pBuf, pSchema[i].name);
|
|
||||||
}
|
|
||||||
|
|
||||||
value.data = buf;
|
value.data = buf;
|
||||||
value.size = POINTER_DISTANCE(pBuf, buf);
|
value.size = POINTER_DISTANCE(pBuf, buf);
|
||||||
|
@ -197,6 +195,38 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
|
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
|
||||||
|
int tlen = 0;
|
||||||
|
SSchema *pSchema;
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedU32(buf, pSW->nCols);
|
||||||
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
|
pSchema = pSW->pSchema + i;
|
||||||
|
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pSchema->colId);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||||||
|
tlen += taosEncodeString(buf, pSchema->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
|
||||||
|
SSchema *pSchema;
|
||||||
|
|
||||||
|
buf = taosDecodeFixedU32(buf, &pSW->nCols);
|
||||||
|
pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * pSW->nCols);
|
||||||
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
|
pSchema = pSW->pSchema + i;
|
||||||
|
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pSchema->colId);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||||
|
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
static SMetaDB *metaNewDB() {
|
static SMetaDB *metaNewDB() {
|
||||||
SMetaDB *pDB = NULL;
|
SMetaDB *pDB = NULL;
|
||||||
pDB = (SMetaDB *)calloc(1, sizeof(*pDB));
|
pDB = (SMetaDB *)calloc(1, sizeof(*pDB));
|
||||||
|
@ -376,15 +406,8 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
|
||||||
tsize += taosEncodeFixedU8(buf, pTbCfg->type);
|
tsize += taosEncodeFixedU8(buf, pTbCfg->type);
|
||||||
|
|
||||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||||
tsize += taosEncodeVariantU32(buf, pTbCfg->stbCfg.nTagCols);
|
SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
|
||||||
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) {
|
tsize += metaEncodeSchema(buf, &sw);
|
||||||
tsize += taosEncodeFixedI8(buf, pTbCfg->stbCfg.pTagSchema[i].type);
|
|
||||||
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].colId);
|
|
||||||
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].bytes);
|
|
||||||
tsize += taosEncodeString(buf, pTbCfg->stbCfg.pTagSchema[i].name);
|
|
||||||
}
|
|
||||||
|
|
||||||
// tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema);
|
|
||||||
} else if (pTbCfg->type == META_CHILD_TABLE) {
|
} else if (pTbCfg->type == META_CHILD_TABLE) {
|
||||||
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
|
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
|
||||||
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
|
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
|
||||||
|
@ -403,14 +426,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
|
||||||
buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
|
buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
|
||||||
|
|
||||||
if (pTbCfg->type == META_SUPER_TABLE) {
|
if (pTbCfg->type == META_SUPER_TABLE) {
|
||||||
buf = taosDecodeVariantU32(buf, &(pTbCfg->stbCfg.nTagCols));
|
SSchemaWrapper sw;
|
||||||
pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols);
|
buf = metaDecodeSchema(buf, &sw);
|
||||||
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) {
|
pTbCfg->stbCfg.nTagCols = sw.nCols;
|
||||||
buf = taosDecodeFixedI8(buf, &(pTbCfg->stbCfg.pTagSchema[i].type));
|
pTbCfg->stbCfg.pTagSchema = sw.pSchema;
|
||||||
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].colId);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].bytes);
|
|
||||||
buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pTagSchema[i].name);
|
|
||||||
}
|
|
||||||
} else if (pTbCfg->type == META_CHILD_TABLE) {
|
} else if (pTbCfg->type == META_CHILD_TABLE) {
|
||||||
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
|
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
|
||||||
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
|
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
|
||||||
|
@ -496,7 +515,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
|
||||||
int ret;
|
int ret;
|
||||||
void * pBuf;
|
void * pBuf;
|
||||||
SSchema * pSchema;
|
SSchema * pSchema;
|
||||||
SSchemaKey schemaKey = {uid, sver};
|
SSchemaKey schemaKey = {uid, sver, 0};
|
||||||
DBT key = {0};
|
DBT key = {0};
|
||||||
DBT value = {0};
|
DBT value = {0};
|
||||||
|
|
||||||
|
@ -507,38 +526,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
|
||||||
// Query
|
// Query
|
||||||
ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
|
ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
printf("failed to query schema DB since %s================\n", db_strerror(ret));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode the schema
|
// Decode the schema
|
||||||
pBuf = value.data;
|
pBuf = value.data;
|
||||||
taosDecodeFixedI32(&pBuf, &nCols);
|
pSW = malloc(sizeof(*pSW));
|
||||||
if (isinline) {
|
metaDecodeSchema(pBuf, pSW);
|
||||||
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols);
|
|
||||||
if (pSW == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW));
|
|
||||||
} else {
|
|
||||||
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW));
|
|
||||||
if (pSW == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols);
|
|
||||||
if (pSW->pSchema == NULL) {
|
|
||||||
free(pSW);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < nCols; i++) {
|
|
||||||
pSchema = pSW->pSchema + i;
|
|
||||||
taosDecodeFixedI8(&pBuf, &(pSchema->type));
|
|
||||||
taosDecodeFixedI32(&pBuf, &(pSchema->colId));
|
|
||||||
taosDecodeFixedI32(&pBuf, &(pSchema->bytes));
|
|
||||||
taosDecodeStringTo(&pBuf, pSchema->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pSW;
|
return pSW;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue