feat: sma code refactor
This commit is contained in:
parent
069ce051f0
commit
9e7b43c836
|
@ -3586,12 +3586,11 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
|
||||||
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
|
||||||
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
|
||||||
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
|
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
|
||||||
if (tDecodeCStr(pCoder, (const char **)&pSma->indexName) < 0) return -1;
|
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
|
||||||
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
|
||||||
|
@ -3645,7 +3644,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
|
||||||
if (tStartDecode(pCoder) < 0) return -1;
|
if (tStartDecode(pCoder) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
if (tDecodeI64(pCoder, &pReq->indexUid) < 0) return -1;
|
||||||
if (tDecodeCStr(pCoder, (const char**)&pReq->indexName) < 0) return -1;
|
if (tDecodeCStrTo(pCoder, pReq->indexName) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(pCoder);
|
tEndDecode(pCoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -192,7 +192,7 @@ struct SMetaEntry {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} ntbEntry;
|
} ntbEntry;
|
||||||
struct {
|
struct {
|
||||||
STSmaWrapper tsma;
|
STSma *tsma;
|
||||||
} smaEntry;
|
} smaEntry;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
|
@ -73,6 +73,7 @@ struct SMeta {
|
||||||
TDB* pCtbIdx;
|
TDB* pCtbIdx;
|
||||||
TDB* pTagIdx;
|
TDB* pTagIdx;
|
||||||
TDB* pTtlIdx;
|
TDB* pTtlIdx;
|
||||||
|
TDB* pSmaIdx;
|
||||||
SMetaIdx* pIdx;
|
SMetaIdx* pIdx;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
||||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||||
if (tEncodeTSmaWrapper(pCoder, &pME->smaEntry.tsma) < 0) return -1;
|
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
||||||
} else {
|
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||||
|
if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
|
||||||
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open pSmaIdx
|
||||||
|
ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
|
||||||
|
if (ret < 0) {
|
||||||
|
metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// open index
|
// open index
|
||||||
if (metaOpenIdx(pMeta) < 0) {
|
if (metaOpenIdx(pMeta) < 0) {
|
||||||
metaError("vgId:%d failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
@ -118,6 +125,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
|
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
|
||||||
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
|
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
|
||||||
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
|
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
|
||||||
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
|
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
|
||||||
|
@ -134,6 +142,7 @@ _err:
|
||||||
int metaClose(SMeta *pMeta) {
|
int metaClose(SMeta *pMeta) {
|
||||||
if (pMeta) {
|
if (pMeta) {
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
|
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
|
||||||
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
|
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
|
||||||
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
|
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
|
||||||
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
|
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
|
||||||
|
|
|
@ -23,8 +23,6 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
// The table uid should exists and be super table or normal table.
|
// The table uid should exists and be super table or normal table.
|
||||||
// Check other cfg value
|
// Check other cfg value
|
||||||
|
|
||||||
// TODO: add atomicity
|
|
||||||
|
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
int kLen = 0;
|
int kLen = 0;
|
||||||
int vLen = 0;
|
int vLen = 0;
|
||||||
|
@ -55,7 +53,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
|
||||||
me.type = TSDB_TSMA_TABLE;
|
me.type = TSDB_TSMA_TABLE;
|
||||||
me.uid = pCfg->indexUid;
|
me.uid = pCfg->indexUid;
|
||||||
me.name = pCfg->indexName;
|
me.name = pCfg->indexName;
|
||||||
// me.smaEntry = xx;
|
me.smaEntry.tsma = pCfg;
|
||||||
|
|
||||||
if (metaHandleSmaEntry(pMeta, &me) < 0) goto _err;
|
if (metaHandleSmaEntry(pMeta, &me) < 0) goto _err;
|
||||||
|
|
||||||
|
@ -180,8 +178,15 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
|
||||||
|
|
||||||
|
return tdbDbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
|
||||||
|
}
|
||||||
|
|
||||||
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
|
@ -190,7 +195,9 @@ static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
|
if (metaSaveSmaToDB(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
// // update uid.idx
|
// // update uid.idx
|
||||||
// if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
|
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
|
if (metaUpdateSmaIdx(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -670,12 +670,11 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
pRsp->code = terrno;
|
pRsp->code = terrno;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
|
if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) {
|
||||||
pRsp->code = terrno;
|
pRsp->code = terrno;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue