commit
acf1d6313f
|
@ -48,6 +48,8 @@ static int tsdbEncodeTable(void **buf, STable *pTable);
|
||||||
static void * tsdbDecodeTable(void *buf, STable **pRTable);
|
static void * tsdbDecodeTable(void *buf, STable **pRTable);
|
||||||
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
|
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
|
||||||
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
|
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
|
||||||
|
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
||||||
|
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||||
|
|
||||||
// ------------------ OUTER FUNCTIONS ------------------
|
// ------------------ OUTER FUNCTIONS ------------------
|
||||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||||
|
@ -117,7 +119,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
|
||||||
|
|
||||||
STable *pTable = tsdbGetTableByUid(pMeta, uid);
|
STable *pTable = tsdbGetTableByUid(pMeta, uid);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRId64, REPO_ID(pRepo), tableId.tid,
|
tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRIu64, REPO_ID(pRepo), tableId.tid,
|
||||||
uid);
|
uid);
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -132,30 +134,26 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) {
|
// Write to KV store first
|
||||||
if (pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle);
|
if (tsdbRemoveTableFromStore(pRepo, pTable) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
// Remove table from Meta
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
if (tsdbRmTableFromMeta(pRepo, pTable) < 0) {
|
||||||
while (tSkipListIterNext(pIter)) {
|
tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno));
|
||||||
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
goto _err;
|
||||||
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
|
|
||||||
int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, tTable);
|
|
||||||
void *buf = tsdbAllocBytes(pRepo, tlen);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
tsdbInsertTableAct(pRepo, TSDB_DROP_META, buf, tTable);
|
|
||||||
tsdbRemoveTableFromMeta(pRepo, tTable, false, true);
|
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
|
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
|
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
|
||||||
free(tbname);
|
free(tbname);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tfree(tbname);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) {
|
void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) {
|
||||||
|
@ -555,15 +553,16 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
void tsdbRefTable(STable *pTable) {
|
void tsdbRefTable(STable *pTable) {
|
||||||
int16_t ref = T_REF_INC(pTable);
|
int16_t ref = T_REF_INC(pTable);
|
||||||
tsdbTrace("ref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref);
|
UNUSED(ref);
|
||||||
|
// tsdbTrace("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbUnRefTable(STable *pTable) {
|
void tsdbUnRefTable(STable *pTable) {
|
||||||
int16_t ref = T_REF_DEC(pTable);
|
int16_t ref = T_REF_DEC(pTable);
|
||||||
tsdbTrace("unref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref);
|
tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
||||||
|
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
tsdbTrace("destroy table:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid);
|
// tsdbTrace("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
|
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
|
||||||
tsdbUnRefTable(pTable->pSuper);
|
tsdbUnRefTable(pTable->pSuper);
|
||||||
|
@ -1164,8 +1163,16 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
|
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
|
||||||
int tlen = sizeof(SListNode) + sizeof(SActObj);
|
int tlen = 0;
|
||||||
if (act == TSDB_UPDATE_META) tlen += (sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM));
|
if (act == TSDB_UPDATE_META) {
|
||||||
|
tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM);
|
||||||
|
} else {
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
tlen = (sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1);
|
||||||
|
} else {
|
||||||
|
tlen = sizeof(SListNode) + sizeof(SActObj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
@ -1191,3 +1198,60 @@ static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable
|
||||||
|
|
||||||
return pBuf;
|
return pBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
|
||||||
|
int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, pTable);
|
||||||
|
void *buf = tsdbAllocBytes(pRepo, tlen);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
|
||||||
|
void *pBuf = buf;
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
||||||
|
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
|
||||||
|
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
}
|
||||||
|
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable);
|
||||||
|
|
||||||
|
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbWLockRepoMeta(pRepo);
|
||||||
|
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
||||||
|
tsdbRemoveTableFromMeta(pRepo, tTable, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbRemoveTableFromMeta(pRepo, pTable, false, false);
|
||||||
|
|
||||||
|
tsdbUnlockRepoMeta(pRepo);
|
||||||
|
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if ((TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) && pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle);
|
||||||
|
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue