fix: concurrent r/w meta
This commit is contained in:
parent
f57a652aa7
commit
ae509e874e
|
@ -19,9 +19,13 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
||||||
memset(pReader, 0, sizeof(*pReader));
|
memset(pReader, 0, sizeof(*pReader));
|
||||||
pReader->flags = flags;
|
pReader->flags = flags;
|
||||||
pReader->pMeta = pMeta;
|
pReader->pMeta = pMeta;
|
||||||
|
metaRLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaReaderClear(SMetaReader *pReader) {
|
void metaReaderClear(SMetaReader *pReader) {
|
||||||
|
if (pReader->pMeta) {
|
||||||
|
metaULock(pReader->pMeta);
|
||||||
|
}
|
||||||
tDecoderClear(&pReader->coder);
|
tDecoderClear(&pReader->coder);
|
||||||
tdbFree(pReader->pBuf);
|
tdbFree(pReader->pBuf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -439,29 +439,36 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
metaWLock(pMeta);
|
||||||
|
|
||||||
// save to table.db
|
// save to table.db
|
||||||
if (metaSaveToTbDb(pMeta, pME) < 0) return -1;
|
if (metaSaveToTbDb(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
// update uid.idx
|
// update uid.idx
|
||||||
if (metaUpdateUidIdx(pMeta, pME) < 0) return -1;
|
if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
// update name.idx
|
// update name.idx
|
||||||
if (metaUpdateNameIdx(pMeta, pME) < 0) return -1;
|
if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
if (pME->type == TSDB_CHILD_TABLE) {
|
if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
// update ctb.idx
|
// update ctb.idx
|
||||||
if (metaUpdateCtbIdx(pMeta, pME) < 0) return -1;
|
if (metaUpdateCtbIdx(pMeta, pME) < 0) goto _err;
|
||||||
|
|
||||||
// update tag.idx
|
// update tag.idx
|
||||||
if (metaUpdateTagIdx(pMeta, pME) < 0) return -1;
|
if (metaUpdateTagIdx(pMeta, pME) < 0) goto _err;
|
||||||
} else {
|
} else {
|
||||||
// update schema.db
|
// update schema.db
|
||||||
if (metaSaveToSkmDb(pMeta, pME) < 0) return -1;
|
if (metaSaveToSkmDb(pMeta, pME) < 0) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pME->type != TSDB_SUPER_TABLE) {
|
if (pME->type != TSDB_SUPER_TABLE) {
|
||||||
if (metaUpdateTtlIdx(pMeta, pME) < 0) return -1;
|
if (metaUpdateTtlIdx(pMeta, pME) < 0) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metaULock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
metaULock(pMeta);
|
||||||
|
return -1;
|
||||||
}
|
}
|
|
@ -3870,6 +3870,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
||||||
|
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||||
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
||||||
|
metaReaderClear(&mr);
|
||||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||||
goto _error;
|
goto _error;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3880,6 +3881,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
||||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
|
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
|
||||||
reqId);
|
reqId);
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
|
||||||
|
metaReaderClear(&mr);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue