refactor: vnode
This commit is contained in:
parent
0d89d93787
commit
9fc55aa7f4
|
@ -1444,10 +1444,9 @@ typedef struct SVCreateTbReq {
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
col_id_t nCols;
|
int16_t nCols;
|
||||||
col_id_t nBSmaCols;
|
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
col_id_t nTagCols;
|
int16_t nTagCols;
|
||||||
SSchema* pTagSchema;
|
SSchema* pTagSchema;
|
||||||
SRSmaParam* pRSmaParam;
|
SRSmaParam* pRSmaParam;
|
||||||
} stbCfg;
|
} stbCfg;
|
||||||
|
@ -1456,14 +1455,15 @@ typedef struct SVCreateTbReq {
|
||||||
SKVRow pTag;
|
SKVRow pTag;
|
||||||
} ctbCfg;
|
} ctbCfg;
|
||||||
struct {
|
struct {
|
||||||
col_id_t nCols;
|
int16_t nCols;
|
||||||
col_id_t nBSmaCols;
|
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
SRSmaParam* pRSmaParam;
|
|
||||||
} ntbCfg;
|
} ntbCfg;
|
||||||
};
|
};
|
||||||
} SVCreateTbReq, SVUpdateTbReq;
|
} SVCreateTbReq, SVUpdateTbReq;
|
||||||
|
|
||||||
|
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq);
|
||||||
|
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
} SVCreateTbRsp, SVUpdateTbRsp;
|
} SVCreateTbRsp, SVUpdateTbRsp;
|
||||||
|
|
|
@ -394,6 +394,80 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tEncodeSVCreateTbReq(SCoder *pCoder, const SVCreateTbReq *pReq) {
|
||||||
|
#if 0
|
||||||
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
|
||||||
|
if (tEncodeU32v(pCoder, pReq->ttl) < 0) return -1;
|
||||||
|
if (tEncodeU32v(pCoder, pReq->keep) < 0) return -1;
|
||||||
|
if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
|
||||||
|
|
||||||
|
if (pReq->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (tEncodeI64(pCoder, pReq->stbCfg.suid) < 0) return -1;
|
||||||
|
if (tEncodeI16v(pCoder, pReq->stbCfg.nCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->stbCfg.nCols; i++) {
|
||||||
|
if (tEncodeSSchema(pCoder, pReq->stbCfg.pSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tEncodeI16v(pCoder, pReq->stbCfg.nTagCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->stbCfg.nTagCols; i++) {
|
||||||
|
if (tEncodeSSchema(pCoder, pReq->stbCfg.pTagSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (pReq->type == TSDB_CHILD_TABLE) {
|
||||||
|
if (tEncodeI64(pCoder, pReq->ctbCfg.suid) < 0) return -1;
|
||||||
|
// TODO: encode SKVRow
|
||||||
|
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||||
|
if (tEncodeI16v(pCoder, pReq->ntbCfg.nCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->ntbCfg.nCols; i++) {
|
||||||
|
if (tEncodeSSchema(pCoder, pReq->stbCfg.pSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(pCoder);
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
|
||||||
|
#if 0
|
||||||
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
||||||
|
if (tDecodeU32v(pCoder, &pReq->ttl) < 0) return -1;
|
||||||
|
if (tDecodeU32v(pCoder, &pReq->keep) < 0) return -1;
|
||||||
|
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1;
|
||||||
|
|
||||||
|
if (pReq->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (tDecodeI64(pCoder, &pReq->stbCfg.suid) < 0) return -1;
|
||||||
|
if (tDecodeI16v(pCoder, &pReq->stbCfg.nCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->stbCfg.nCols; i++) {
|
||||||
|
if (tDecodeSSchema(pCoder, &pReq->stbCfg.pSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeI16v(pCoder, pReq->stbCfg.nTagCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->stbCfg.nTagCols; i++) {
|
||||||
|
if (tDecodeSSchema(pCoder, pReq->stbCfg.pTagSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (pReq->type == TSDB_CHILD_TABLE) {
|
||||||
|
if (tDecodeI64(pCoder, pReq->ctbCfg.suid) < 0) return -1;
|
||||||
|
// TODO: decode SKVRow
|
||||||
|
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||||
|
if (tDecodeI16v(pCoder, pReq->ntbCfg.nCols) < 0) return -1;
|
||||||
|
for (int i = 0; i < pReq->ntbCfg.nCols; i++) {
|
||||||
|
if (tDecodeSSchema(pCoder, pReq->stbCfg.pSchema + i) < 0) return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(pCoder);
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
@ -406,7 +480,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
case TD_SUPER_TABLE:
|
case TD_SUPER_TABLE:
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
|
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
|
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
|
|
||||||
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
|
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
|
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
|
||||||
|
@ -438,7 +511,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
break;
|
break;
|
||||||
case TD_NORMAL_TABLE:
|
case TD_NORMAL_TABLE:
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nCols);
|
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nCols);
|
||||||
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols);
|
|
||||||
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
|
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].flags);
|
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].flags);
|
||||||
|
@ -446,15 +518,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
|
||||||
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
|
||||||
}
|
}
|
||||||
if (pReq->rollup && pReq->ntbCfg.pRSmaParam) {
|
|
||||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
|
||||||
tlen += taosEncodeBinary(buf, (const void *)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
|
||||||
tlen += taosEncodeFixedI32(buf, param->delay);
|
|
||||||
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
|
|
||||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
|
||||||
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -473,7 +536,6 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
case TD_SUPER_TABLE:
|
case TD_SUPER_TABLE:
|
||||||
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
|
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
|
||||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
|
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
|
||||||
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
|
|
||||||
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
|
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
|
||||||
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
|
||||||
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
|
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
|
||||||
|
@ -515,7 +577,6 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
break;
|
break;
|
||||||
case TD_NORMAL_TABLE:
|
case TD_NORMAL_TABLE:
|
||||||
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
|
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
|
||||||
buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols));
|
|
||||||
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
|
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
|
||||||
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
|
||||||
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
|
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
|
||||||
|
@ -524,23 +585,6 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
|
||||||
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
|
||||||
}
|
}
|
||||||
if (pReq->rollup) {
|
|
||||||
pReq->ntbCfg.pRSmaParam = (SRSmaParam *)taosMemoryMalloc(sizeof(SRSmaParam));
|
|
||||||
SRSmaParam *param = pReq->ntbCfg.pRSmaParam;
|
|
||||||
buf = taosDecodeBinaryTo(buf, (void *)¶m->xFilesFactor, sizeof(param->xFilesFactor));
|
|
||||||
buf = taosDecodeFixedI32(buf, ¶m->delay);
|
|
||||||
buf = taosDecodeFixedI8(buf, ¶m->nFuncIds);
|
|
||||||
if (param->nFuncIds > 0) {
|
|
||||||
param->pFuncIds = (func_id_t *)taosMemoryMalloc(param->nFuncIds * sizeof(func_id_t));
|
|
||||||
for (int8_t i = 0; i < param->nFuncIds; ++i) {
|
|
||||||
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
param->pFuncIds = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pReq->ntbCfg.pRSmaParam = NULL;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -357,7 +357,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.stbCfg.nCols = pStb->numOfColumns;
|
req.stbCfg.nCols = pStb->numOfColumns;
|
||||||
req.stbCfg.nTagCols = pStb->numOfTags;
|
req.stbCfg.nTagCols = pStb->numOfTags;
|
||||||
req.stbCfg.pTagSchema = pStb->pTags;
|
req.stbCfg.pTagSchema = pStb->pTags;
|
||||||
req.stbCfg.nBSmaCols = pStb->numOfSmas;
|
|
||||||
req.stbCfg.pSchema = (SSchema *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
req.stbCfg.pSchema = (SSchema *)taosMemoryCalloc(pStb->numOfColumns, sizeof(SSchema));
|
||||||
if (req.stbCfg.pSchema == NULL) {
|
if (req.stbCfg.pSchema == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -253,10 +253,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
|
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
|
||||||
} else {
|
} else {
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
|
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
|
||||||
if (pCreateTbReq->ntbCfg.pRSmaParam) {
|
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam->pFuncIds);
|
|
||||||
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2756,6 +2756,7 @@ static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
pReq->ntbCfg.pRSmaParam = taosMemoryCalloc(1, sizeof(SRSmaParam));
|
pReq->ntbCfg.pRSmaParam = taosMemoryCalloc(1, sizeof(SRSmaParam));
|
||||||
if (NULL == pReq->ntbCfg.pRSmaParam) {
|
if (NULL == pReq->ntbCfg.pRSmaParam) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -2770,6 +2771,7 @@ static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
SNode* pFunc = NULL;
|
SNode* pFunc = NULL;
|
||||||
FOREACH(pFunc, pOptions->pFuncs) { pReq->ntbCfg.pRSmaParam->pFuncIds[index++] = ((SFunctionNode*)pFunc)->funcId; }
|
FOREACH(pFunc, pOptions->pFuncs) { pReq->ntbCfg.pRSmaParam->pFuncIds[index++] = ((SFunctionNode*)pFunc)->funcId; }
|
||||||
|
#endif
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue