fix iter and case

This commit is contained in:
yihaoDeng 2023-02-18 18:24:10 +08:00
parent 51755e9783
commit eafc9db1c1
5 changed files with 86 additions and 38 deletions

View File

@ -28,6 +28,12 @@ typedef struct SSIdx {
void *pIdx; void *pIdx;
} SSIdx; } SSIdx;
// retrieve sma index and tag index
typedef struct {
void *pSmaIter;
void *pIdxIter;
} SSmaAndTagIter;
int32_t mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx); int32_t mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -43,9 +43,9 @@ static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq);
// static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); // static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndProcessGetIdxReq(SRpcMsg *pReq); static int32_t mndProcessGetIdxReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq); static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); // static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter); // static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter);
static void mndDestroyIdxObj(SIdxObj *pIdxObj); static void mndDestroyIdxObj(SIdxObj *pIdxObj);
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb); static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb);
@ -496,10 +496,11 @@ int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, i
pDb = mndAcquireDb(pMnode, pShow->db); pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
} }
int invalid = -1; SSmaAndTagIter *pIter = pShow->pIter;
int invalid = -1;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_IDX, pShow->pIter, (void **)&pIdx); pIter->pIdxIter = sdbFetch(pSdb, SDB_IDX, pIter->pIdxIter, (void **)&pIdx);
if (pShow->pIter == NULL) break; if (pIter->pIdxIter == NULL) break;
if (NULL != pDb && pIdx->dbUid != pDb->uid) { if (NULL != pDb && pIdx->dbUid != pDb->uid) {
sdbRelease(pSdb, pIdx); sdbRelease(pSdb, pIdx);
@ -559,10 +560,11 @@ int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, i
return numOfRows; return numOfRows;
} }
static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) { // static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; // SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); //
} // sdbCancelFetch(pSdb, pIter);
//}
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) { static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
// impl // impl
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -38,7 +38,7 @@ int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
if (type == SDB_SMA) { if (type == SDB_SMA) {
idx->type = SDB_SMA; idx->type = SDB_SMA;
idx->pIdx = pSma; idx->pIdx = pSma;
} else { // type == SDB_IDX } else {
mndReleaseSma(pMnode, pSma); mndReleaseSma(pMnode, pSma);
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST; terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
return -1; return -1;

View File

@ -45,27 +45,13 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
static void mndDestroySmaObj(SSmaObj *pSmaObj); static void mndDestroySmaObj(SSmaObj *pSmaObj);
// retrieve sma index and tag index // sma and tag index comm func
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows); static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read); static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
return read;
}
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
int ret = mndProcessDropSmaReq(pReq);
if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
terrno = 0;
ret = mndProcessDropTagIdxReq(pReq);
}
return ret;
}
static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) {
// TODO
}
int32_t mndInitSma(SMnode *pMnode) { int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_SMA, .sdbType = SDB_SMA,
@ -85,7 +71,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextIdx); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
@ -1244,10 +1230,10 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pDb = mndAcquireDb(pMnode, pShow->db); pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
} }
SSmaAndTagIter *pIter = pShow->pIter;
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_SMA, pShow->pIter, (void **)&pSma); pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pShow->pIter == NULL) break; if (pIter->pSmaIter == NULL) break;
if (NULL != pDb && pSma->dbUid != pDb->uid) { if (NULL != pDb && pSma->dbUid != pDb->uid) {
sdbRelease(pSdb, pSma); sdbRelease(pSdb, pSma);
@ -1305,7 +1291,30 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return numOfRows; return numOfRows;
} }
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) { // sma and tag index comm func
SSdb *pSdb = pMnode->pSdb; static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
sdbCancelFetch(pSdb, pIter); int ret = mndProcessDropSmaReq(pReq);
if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
terrno = 0;
ret = mndProcessDropTagIdxReq(pReq);
}
return ret;
}
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
}
int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
if (read < rows) read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
return read;
}
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
SSmaAndTagIter *p = pIter;
if (p != NULL) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, p->pSmaIter);
sdbCancelFetch(pSdb, p->pIdxIter);
}
taosMemoryFree(p);
} }

View File

@ -48,19 +48,50 @@ while $i < $tbNum
endw endw
print --> create sma and tag index, global name conflict print ==== create sma and tag index, global name conflict
sql create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m); sql create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql_error create index t2i on $mtPrefix (t2) sql_error create index t2i on $mtPrefix (t2)
sql drop index t2i sql drop index t2i
#print --> create tagindex and sma index, global name conflict print ==== create tagindex and sma index, global name conflict
sql create index t2i on $mtPrefix (t2) sql create index t2i on $mtPrefix (t2)
sql_error create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m); sql_error create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql drop index t2i sql drop index t2i
print ===== iter sma and tag index
sql create index tagt2i on $mtPrefix (t2)
sql create sma index smat2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql select * from information_schema.ins_indexes
if $rows != 2 then
return -1
endi
sql drop index smat2i
$i = 0
$smaPre = sma3
while $i < 5000
$sma = $smaPre . $i
$i = $i + 1
sql create sma index $sma on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
endw
sql select * from information_schema.ins_indexes
if $rows != 5001 then
return -1
endi
print ==== test name conflict print ==== test name conflict
sql_error create index ti3 on $mtPrefix(t2) sql_error create index ti3 on $mtPrefix(t2)