batch create table

This commit is contained in:
Hongze Cheng 2021-12-31 08:30:40 +00:00
parent 54af08bc98
commit 1c57b5e746
4 changed files with 82 additions and 23 deletions

View File

@ -1285,8 +1285,10 @@ typedef struct {
int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq);
int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq);
int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct SVCreateTbRsp { typedef struct SVCreateTbRsp {
} SVCreateTbRsp; } SVCreateTbRsp;

View File

@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) {
return 0; return 0;
} }
int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver); tlen += taosEncodeFixedU64(buf, pReq->ver);
@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
return buf; return buf;
} }
int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq);
}
return tlen;
}
void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize);
for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req;
buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req);
}
return buf;
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static int tmsgStartEncode(SMsgEncoder *pME) { static int tmsgStartEncode(SMsgEncoder *pME) {
struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode));

View File

@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
} }
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg * pMsg; SRpcMsg *pMsg;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq; SVCreateTbReq vCreateTbReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen); SVCreateTbBatchReq vCreateTbBatchReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) { if (ptr == NULL) {
// TODO: handle error // TODO: handle error
} }
@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB: case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_TABLE:
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error // TODO: handle error
@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the requst struct // TODO: maybe need to clear the requst struct
break; break;
case TDMT_VND_CREATE_TABLE:
tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
}
}
case TDMT_VND_DROP_STB: case TDMT_VND_DROP_STB:
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {

View File

@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL; SArray* array = NULL;
SName name = {0}; SName name = {0};
tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db)); tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
pEpSet->numOfEps = info->numOfEps; pEpSet->numOfEps = info->numOfEps;
pEpSet->inUse = info->inUse; pEpSet->inUse = info->inUse;
for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
pEpSet->port[i] = info->epAddr[i].port; pEpSet->port[i] = info->epAddr[i].port;
} }
@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
val = htonl(pCreate->numOfVgroups); val = htonl(pCreate->numOfVgroups);
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -468,7 +468,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pValList, i); SToken* pItem = taosArrayGet(pValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf); code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -481,7 +481,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) { if (row == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
@ -501,15 +501,15 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
// pEpSet->inUse = info.inUse; // pEpSet->inUse = info.inUse;
// pEpSet->numOfEps = info.numOfEps; // pEpSet->numOfEps = info.numOfEps;
// for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { // for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
// pEpSet->port[i] = info.epAddr[i].port; // pEpSet->port[i] = info.epAddr[i].port;
// tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); // tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
// } // }
// ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); // ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
// ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); // ((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
SVgroupTablesBatch *pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) { if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0}; SVgroupTablesBatch tBatch = {0};
tBatch.info = info; tBatch.info = info;
@ -518,14 +518,35 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
taosArrayPush(tBatch.req.pArray, &req); taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch)); taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup } else { // add to the correct vgroup
assert(info.vgId == pTableBatch->info.vgId); assert(info.vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req); taosArrayPush(pTableBatch->req.pArray, &req);
} }
} }
// TODO: serialize and // TODO: serialize and
void *pBuf = NULL; SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch** ppTbBatch = NULL;
do {
ppTbBatch = taosHashIterate(pVgroupHashmap, ppTbBatch);
if (ppTbBatch == NULL) break;
SVgroupTablesBatch* pTbBatch = *ppTbBatch;
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
taosArrayPush(pBufArray, &buf);
} while (true);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -634,7 +655,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_SHOW: { case TSDB_SQL_SHOW: {
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf);
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW;
break; break;
} }