This commit is contained in:
Hongze Cheng 2020-06-18 09:24:09 +00:00
parent 40b48c8980
commit f890114a41
1 changed files with 29 additions and 7 deletions

View File

@ -110,21 +110,38 @@ _err:
int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return -1;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pMeta == NULL) return -1; uint64_t uid = tableId.uid;
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); STable *pTable = tsdbGetTableByUid(pMeta, uid);
if (pTable == NULL) { if (pTable == NULL) {
tsdbError("vgId:%d, failed to drop table since table not exists! tid:%d, uid:" PRId64, pRepo->config.tsdbId, tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRId64, REPO_ID(pRepo), tableId.tid,
tableId.tid, tableId.uid); uid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) {
if (pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle);
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, tTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
ASSERT(buf != NULL);
tsdbInsertTableAct(pRepo, TSDB_DROP_META, buf, tTable);
tsdbRemoveTableFromMeta(pRepo, tTable, true);
}
}
tsdbRemoveTableFromMeta(pRepo, pTable, true);
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name), tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
tableId.tid, tableId.uid); tableId.tid, tableId.uid);
tsdbRemoveTableFromMeta(pRepo, pTable, true);
return 0; return 0;
} }
@ -528,6 +545,9 @@ void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); }
void tsdbUnRefTable(STable *pTable) { void tsdbUnRefTable(STable *pTable) {
if (T_REF_DEC(pTable) == 0) { if (T_REF_DEC(pTable) == 0) {
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
tsdbUnRefTable(pTable->pSuper);
}
tsdbFreeTable(pTable); tsdbFreeTable(pTable);
} }
} }
@ -814,6 +834,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
} }
if (rmFromIdx) tsdbUnlockRepoMeta(pRepo); if (rmFromIdx) tsdbUnlockRepoMeta(pRepo);
tsdbUnRefTable(pTable);
} }
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
@ -840,6 +861,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *)); memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode); tSkipListPut(pSTable->pIndex, pNode);
T_REF_INC(pSTable);
return 0; return 0;
} }