more refact meta
This commit is contained in:
parent
ad9fd5ec26
commit
07720702a6
|
@ -3568,8 +3568,8 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
|
||||||
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
|
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
||||||
// if (pReq->rollup) {
|
// if (pReq->rollup) {
|
||||||
// if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
|
// if (tDecodeSRSmaParam(pCoder, pReq->pRSmaParam) < 0) return -1;
|
||||||
// }
|
// }
|
||||||
|
@ -3626,6 +3626,8 @@ int tEncodeSVCreateTbReq(SCoder *pCoder, const SVCreateTbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
|
int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
|
||||||
|
uint64_t len;
|
||||||
|
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
|
||||||
|
@ -3637,7 +3639,7 @@ int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
|
||||||
|
|
||||||
if (pReq->type == TSDB_CHILD_TABLE) {
|
if (pReq->type == TSDB_CHILD_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
|
||||||
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, NULL) < 0) return -1;
|
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
|
||||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -83,6 +83,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// build SMetaEntry
|
// build SMetaEntry
|
||||||
|
me.version = version;
|
||||||
me.type = pReq->type;
|
me.type = pReq->type;
|
||||||
me.uid = pReq->uid;
|
me.uid = pReq->uid;
|
||||||
me.name = pReq->name;
|
me.name = pReq->name;
|
||||||
|
|
|
@ -24,13 +24,15 @@ void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery);
|
||||||
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
STableInfoReq infoReq = {0};
|
STableInfoReq infoReq = {0};
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
SMetaEntryReader meReader1 = {0};
|
SMetaEntryReader mer1 = {0};
|
||||||
SMetaEntryReader meReader2 = {0};
|
SMetaEntryReader mer2 = {0};
|
||||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
void *pRsp = NULL;
|
void *pRsp = NULL;
|
||||||
|
SSchemaWrapper schema = {0};
|
||||||
|
SSchemaWrapper schemaTag = {0};
|
||||||
|
|
||||||
// decode req
|
// decode req
|
||||||
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
||||||
|
@ -38,9 +40,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaRsp.dbId = pVnode->config.dbId;
|
||||||
strcpy(metaRsp.tbName, infoReq.tbName);
|
strcpy(metaRsp.tbName, infoReq.tbName);
|
||||||
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
|
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
|
||||||
metaRsp.dbId = pVnode->config.dbId;
|
|
||||||
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
|
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
|
||||||
code = vnodeValidateTableHash(pVnode, tableFName);
|
code = vnodeValidateTableHash(pVnode, tableFName);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -48,56 +51,44 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// query meta
|
// query meta
|
||||||
metaEntryReaderInit(&meReader1);
|
metaEntryReaderInit(&mer1);
|
||||||
|
|
||||||
if (metaGetTableEntryByName(pVnode->pMeta, &meReader1, infoReq.tbName) < 0) {
|
if (metaGetTableEntryByName(pVnode->pMeta, &mer1, infoReq.tbName) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (meReader1.me.type == TSDB_CHILD_TABLE) {
|
metaRsp.tableType = mer1.me.type;
|
||||||
metaEntryReaderInit(&meReader2);
|
|
||||||
if (metaGetTableEntryByUid(pVnode->pMeta, &meReader2, meReader1.me.ctbEntry.suid) < 0) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill response
|
|
||||||
metaRsp.tableType = meReader1.me.type;
|
|
||||||
metaRsp.vgId = TD_VID(pVnode);
|
metaRsp.vgId = TD_VID(pVnode);
|
||||||
metaRsp.tuid = meReader1.me.uid;
|
metaRsp.tuid = mer1.me.uid;
|
||||||
if (meReader1.me.type == TSDB_SUPER_TABLE) {
|
|
||||||
strcpy(metaRsp.stbName, meReader1.me.name);
|
if (mer1.me.type == TSDB_SUPER_TABLE) {
|
||||||
metaRsp.numOfTags = meReader1.me.stbEntry.schemaTag.nCols;
|
schema = mer1.me.stbEntry.schema;
|
||||||
metaRsp.numOfColumns = meReader1.me.stbEntry.schema.nCols;
|
schemaTag = mer1.me.stbEntry.schemaTag;
|
||||||
metaRsp.suid = meReader1.me.uid;
|
metaRsp.suid = mer1.me.uid;
|
||||||
metaRsp.pSchemas = taosMemoryMalloc((metaRsp.numOfTags + metaRsp.numOfColumns) * sizeof(SSchema));
|
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||||
if (metaRsp.pSchemas == NULL) {
|
metaEntryReaderInit(&mer2);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
if (metaGetTableEntryByUid(pVnode->pMeta, &mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||||
goto _exit;
|
|
||||||
}
|
metaRsp.suid = mer2.me.uid;
|
||||||
memcpy(metaRsp.pSchemas, meReader1.me.stbEntry.schema.pSchema, sizeof(SSchema) * metaRsp.numOfColumns);
|
schema = mer2.me.stbEntry.schema;
|
||||||
memcpy(metaRsp.pSchemas + metaRsp.numOfColumns, meReader1.me.stbEntry.schemaTag.pSchema,
|
schemaTag = mer2.me.stbEntry.schemaTag;
|
||||||
sizeof(SSchema) * metaRsp.numOfTags);
|
} else if (mer1.me.type == TSDB_NORMAL_TABLE) {
|
||||||
} else if (meReader1.me.type == TSDB_CHILD_TABLE) {
|
schema = mer1.me.ntbEntry.schema;
|
||||||
strcpy(metaRsp.stbName, meReader2.me.name);
|
|
||||||
metaRsp.numOfTags = meReader2.me.stbEntry.schemaTag.nCols;
|
|
||||||
metaRsp.numOfColumns = meReader2.me.stbEntry.schema.nCols;
|
|
||||||
metaRsp.suid = meReader2.me.uid;
|
|
||||||
metaRsp.pSchemas = taosMemoryMalloc((metaRsp.numOfTags + metaRsp.numOfColumns) * sizeof(SSchema));
|
|
||||||
if (metaRsp.pSchemas == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
memcpy(metaRsp.pSchemas, meReader2.me.stbEntry.schema.pSchema, sizeof(SSchema) * metaRsp.numOfColumns);
|
|
||||||
memcpy(metaRsp.pSchemas + metaRsp.numOfColumns, meReader2.me.stbEntry.schemaTag.pSchema,
|
|
||||||
sizeof(SSchema) * metaRsp.numOfTags);
|
|
||||||
} else if (meReader1.me.type == TSDB_NORMAL_TABLE) {
|
|
||||||
metaRsp.numOfTags = 0;
|
|
||||||
metaRsp.numOfColumns = meReader1.me.ntbEntry.schema.nCols;
|
|
||||||
metaRsp.suid = 0;
|
|
||||||
metaRsp.pSchemas = meReader1.me.ntbEntry.schema.pSchema;
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaRsp.numOfTags = schemaTag.nCols;
|
||||||
|
metaRsp.numOfColumns = schema.nCols;
|
||||||
|
metaRsp.precision = pVnode->config.tsdbCfg.precision;
|
||||||
|
metaRsp.sversion = schema.sver;
|
||||||
|
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
|
||||||
|
|
||||||
|
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
|
||||||
|
if (schemaTag.nCols) {
|
||||||
|
memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
|
||||||
|
}
|
||||||
|
|
||||||
// encode and send response
|
// encode and send response
|
||||||
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
|
rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
|
||||||
if (rspLen < 0) {
|
if (rspLen < 0) {
|
||||||
|
@ -121,11 +112,9 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
tmsgSendRsp(&rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (meReader1.me.type == TSDB_SUPER_TABLE || meReader1.me.type == TSDB_CHILD_TABLE) {
|
|
||||||
taosMemoryFree(metaRsp.pSchemas);
|
taosMemoryFree(metaRsp.pSchemas);
|
||||||
}
|
metaEntryReaderClear(&mer2);
|
||||||
metaEntryReaderClear(&meReader2);
|
metaEntryReaderClear(&mer1);
|
||||||
metaEntryReaderClear(&meReader1);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -309,6 +309,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
|
||||||
tEncodeSVCreateTbBatchRsp(&coder, &rsp);
|
tEncodeSVCreateTbBatchRsp(&coder, &rsp);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
taosArrayClear(rsp.pArray);
|
||||||
tCoderClear(&coder);
|
tCoderClear(&coder);
|
||||||
return rcode;
|
return rcode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3351,7 +3351,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
||||||
req.ctb.pTag = row;
|
req.ctb.pTag = row;
|
||||||
|
|
||||||
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||||
#if 0
|
|
||||||
if (pTableBatch == NULL) {
|
if (pTableBatch == NULL) {
|
||||||
SVgroupTablesBatch tBatch = {0};
|
SVgroupTablesBatch tBatch = {0};
|
||||||
tBatch.info = *pVgInfo;
|
tBatch.info = *pVgInfo;
|
||||||
|
@ -3364,7 +3363,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
||||||
} else { // add to the correct vgroup
|
} else { // add to the correct vgroup
|
||||||
taosArrayPush(pTableBatch->req.pArray, &req);
|
taosArrayPush(pTableBatch->req.pArray, &req);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
|
||||||
|
|
Loading…
Reference in New Issue