update add/del tag index
This commit is contained in:
parent
b1241e5e0d
commit
7e720269e2
|
@ -1611,6 +1611,43 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetAlterStbRedoActions2(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb,
|
||||||
|
void *alterOriData, int32_t alterOriDataLen) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
void *pIter = NULL;
|
||||||
|
int32_t contLen;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (!mndVgroupInDb(pVgroup, pDb->uid)) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, alterOriData, alterOriDataLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_VND_CREATE_INDEX;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
|
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
|
||||||
taosRLockLatch(&pStb->lock);
|
taosRLockLatch(&pStb->lock);
|
||||||
|
|
||||||
|
@ -2639,6 +2676,11 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
|
||||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
|
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
|
||||||
|
if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER;
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -128,6 +128,10 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
|
||||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||||
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
int metaAlterCache(SMeta* pMeta, int32_t nPage);
|
||||||
|
|
||||||
|
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
|
}
|
||||||
|
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
|
|
||||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||||
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
|
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
|
||||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
||||||
|
|
|
@ -389,6 +389,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
tdbTbcClose(pUidIdxc);
|
tdbTbcClose(pUidIdxc);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
// impl later
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
|
// impl later
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
|
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMetaRsp **pMetaRsp) {
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
|
|
|
@ -1427,8 +1427,30 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
// impl later
|
SVCreateStbReq req = {0};
|
||||||
return TSDB_CODE_SUCCESS;
|
SDecoder dc = {0};
|
||||||
|
|
||||||
|
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
|
||||||
|
pRsp->code = TSDB_CODE_SUCCESS;
|
||||||
|
pRsp->pCont = NULL;
|
||||||
|
pRsp->contLen = 0;
|
||||||
|
|
||||||
|
tDecoderInit(&dc, pReq, len);
|
||||||
|
// decode req
|
||||||
|
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (metaAddIndexToSTable(pVnode->pMeta, version, &req) < 0) {
|
||||||
|
pRsp->code = terrno;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return 0;
|
||||||
|
_err:
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
// impl later
|
// impl later
|
||||||
|
|
Loading…
Reference in New Issue