fix: select tags - ctb cursor pause/unlock and lock/resume
This commit is contained in:
parent
27d7c659bc
commit
4a9391929c
|
@ -106,6 +106,8 @@ typedef struct SMCtbCursor {
|
|||
void *pVal;
|
||||
int kLen;
|
||||
int vLen;
|
||||
int8_t paused;
|
||||
int lock;
|
||||
} SMCtbCursor;
|
||||
|
||||
typedef struct SRowBuffPos {
|
||||
|
@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool
|
|||
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||
*/
|
||||
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
|
||||
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
|
||||
int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first);
|
||||
void (*pauseCtbCursor)(SMCtbCursor* pCtbCur);
|
||||
void (*closeCtbCursor)(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
|
||||
} SStoreMeta;
|
||||
|
||||
|
|
|
@ -168,7 +168,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
|||
|
||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first);
|
||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur);
|
||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
|
||||
void metaCloseStbCursor(SMStbCursor* pStbCur);
|
||||
|
|
|
@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
|
|||
|
||||
pCtbCur->pMeta = pMeta;
|
||||
pCtbCur->suid = uid;
|
||||
if (lock) {
|
||||
metaRLock(pMeta);
|
||||
}
|
||||
pCtbCur->lock = lock;
|
||||
pCtbCur->paused = 1;
|
||||
|
||||
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
|
||||
ret = metaResumeCtbCursor(pCtbCur, 1);
|
||||
if (ret < 0) {
|
||||
metaULock(pMeta);
|
||||
taosMemoryFree(pCtbCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// move to the suid
|
||||
ctbIdxKey.suid = uid;
|
||||
ctbIdxKey.uid = INT64_MIN;
|
||||
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
|
||||
if (c > 0) {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
|
||||
return pCtbCur;
|
||||
}
|
||||
|
||||
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
|
||||
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
|
||||
if (pCtbCur) {
|
||||
if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
|
||||
if (pCtbCur->pCur) {
|
||||
tdbTbcClose(pCtbCur->pCur);
|
||||
if (!pCtbCur->paused) {
|
||||
if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta);
|
||||
if (pCtbCur->pCur) {
|
||||
tdbTbcClose(pCtbCur->pCur);
|
||||
}
|
||||
}
|
||||
tdbFree(pCtbCur->pKey);
|
||||
tdbFree(pCtbCur->pVal);
|
||||
}
|
||||
taosMemoryFree(pCtbCur);
|
||||
}
|
||||
|
||||
tdbFree(pCtbCur->pKey);
|
||||
tdbFree(pCtbCur->pVal);
|
||||
void metaPauseCtbCursor(SMCtbCursor* pCtbCur) {
|
||||
if (!pCtbCur->paused) {
|
||||
tdbTbcClose((TBC*)pCtbCur->pCur);
|
||||
if (pCtbCur->lock) {
|
||||
metaULock(pCtbCur->pMeta);
|
||||
}
|
||||
pCtbCur->paused = 1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first) {
|
||||
if (pCtbCur->paused) {
|
||||
pCtbCur->paused = 0;
|
||||
|
||||
if (pCtbCur->lock) {
|
||||
metaRLock(pCtbCur->pMeta);
|
||||
}
|
||||
int ret = 0;
|
||||
ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
|
||||
if (ret < 0) {
|
||||
metaCloseCtbCursor(pCtbCur);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosMemoryFree(pCtbCur);
|
||||
if (first) {
|
||||
SCtbIdxKey ctbIdxKey;
|
||||
// move to the suid
|
||||
ctbIdxKey.suid = pCtbCur->suid;
|
||||
ctbIdxKey.uid = INT64_MIN;
|
||||
int c = 0;
|
||||
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
|
||||
if (c > 0) {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
} else {
|
||||
int c = 0;
|
||||
ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c);
|
||||
if (c < 0) {
|
||||
tdbTbcMoveToPrev(pCtbCur->pCur);
|
||||
} else {
|
||||
tdbTbcMoveToNext(pCtbCur->pCur);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
||||
|
@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
|
|||
}
|
||||
|
||||
taosHashCleanup(pSepecifiedUidMap);
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
|||
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
|
||||
|
||||
pMeta->openCtbCursor = metaOpenCtbCursor;
|
||||
pMeta->resumeCtbCursor = metaResumeCtbCursor;
|
||||
pMeta->pauseCtbCursor = metaPauseCtbCursor;
|
||||
pMeta->closeCtbCursor = metaCloseCtbCursor;
|
||||
pMeta->ctbCursorNext = metaCtbCursorNext;
|
||||
}
|
||||
|
|
|
@ -452,7 +452,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
|
|||
taosArrayPush(list, &info);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -473,7 +473,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
|
|||
taosArrayPush(list, &id);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 1);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -536,7 +536,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
|||
++(*num);
|
||||
}
|
||||
|
||||
metaCloseCtbCursor(pCur, 0);
|
||||
metaCloseCtbCursor(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2863,12 +2863,14 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pInfo->pCtbCursor == NULL) {
|
||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||
} else {
|
||||
pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
|
||||
}
|
||||
|
||||
SArray* aUidTags = pInfo->aUidTags;
|
||||
SArray* aFilterIdxs = pInfo->aFilterIdxs;
|
||||
int32_t count = 0;
|
||||
|
||||
bool ctbCursorFinished = false;
|
||||
while (1) {
|
||||
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
||||
taosArrayClear(aFilterIdxs);
|
||||
|
@ -2878,6 +2880,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
||||
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
|
||||
if (uid == 0) {
|
||||
ctbCursorFinished = true;
|
||||
break;
|
||||
}
|
||||
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||
|
@ -2906,7 +2909,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (count > 0) {
|
||||
pAPI->metaFn.pauseCtbCursor(pInfo->pCtbCursor);
|
||||
}
|
||||
if (count == 0 || ctbCursorFinished) {
|
||||
pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
||||
pInfo->pCtbCursor = NULL;
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
|
@ -2971,7 +2982,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
|||
static void destroyTagScanOperatorInfo(void* param) {
|
||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||
if (pInfo->pCtbCursor != NULL) {
|
||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
|
||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor);
|
||||
}
|
||||
taosHashCleanup(pInfo->filterCtx.colHash);
|
||||
taosArrayDestroy(pInfo->filterCtx.cInfoList);
|
||||
|
|
Loading…
Reference in New Issue