refact more work
This commit is contained in:
parent
39ea124bb6
commit
c6106be5da
|
@ -342,9 +342,6 @@ struct STbData {
|
|||
tb_uid_t uid;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t minVersion;
|
||||
int64_t maxVersion;
|
||||
int32_t maxSkmVer;
|
||||
SDelData *pHead;
|
||||
SDelData *pTail;
|
||||
SMemSkipList sl;
|
||||
|
@ -358,8 +355,6 @@ struct SMemTable {
|
|||
volatile int32_t nRef;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t minVersion;
|
||||
int64_t maxVersion;
|
||||
int64_t nRow;
|
||||
int64_t nDel;
|
||||
struct {
|
||||
|
|
|
@ -430,17 +430,64 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
|
||||
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
|
||||
int32_t code = 0;
|
||||
STSchema *pTSchema = NULL;
|
||||
SSkmDbKey skmDbKey = {.uid = suid ? suid : uid, .sver = sver};
|
||||
|
||||
void *pData = NULL;
|
||||
int nData = 0;
|
||||
SSkmDbKey skmDbKey;
|
||||
if (sver <= 0) {
|
||||
SMetaInfo info;
|
||||
if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
|
||||
sver = info.skmVer;
|
||||
} else {
|
||||
TBC *pSkmDbC = NULL;
|
||||
int c;
|
||||
|
||||
// query
|
||||
skmDbKey.uid = suid ? suid : uid;
|
||||
skmDbKey.sver = INT32_MAX;
|
||||
|
||||
tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
|
||||
metaRLock(pMeta);
|
||||
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), &pData, &nData) < 0) {
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
|
||||
if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
|
||||
metaULock(pMeta);
|
||||
goto _err;
|
||||
tdbTbcClose(pSkmDbC);
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
ASSERT(c);
|
||||
|
||||
if (c < 0) {
|
||||
tdbTbcMoveToPrev(pSkmDbC);
|
||||
}
|
||||
|
||||
const void *pKey = NULL;
|
||||
int32_t nKey = 0;
|
||||
tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);
|
||||
|
||||
if (((SSkmDbKey *)pKey)->uid != uid) {
|
||||
metaULock(pMeta);
|
||||
tdbTbcClose(pSkmDbC);
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
sver = ((SSkmDbKey *)pKey)->sver;
|
||||
|
||||
metaULock(pMeta);
|
||||
tdbTbcClose(pSkmDbC);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(sver > 0);
|
||||
|
||||
skmDbKey.uid = suid ? suid : uid;
|
||||
skmDbKey.sver = sver;
|
||||
metaRLock(pMeta);
|
||||
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
|
||||
metaULock(pMeta);
|
||||
code = TSDB_CODE_NOT_FOUND;
|
||||
goto _exit;
|
||||
}
|
||||
metaULock(pMeta);
|
||||
|
||||
|
@ -462,15 +509,13 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
|
|||
SSchema *pSchema = pSchemaWrapper->pSchema + i;
|
||||
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
|
||||
}
|
||||
pTSchema = tdGetSchemaFromBuilder(&sb);
|
||||
STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
|
||||
tdDestroyTSchemaBuilder(&sb);
|
||||
|
||||
*ppTSchema = pTSchema;
|
||||
taosMemoryFree(pSchemaWrapper->pSchema);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
*ppTSchema = NULL;
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -290,23 +290,19 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
|
||||
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pCommitter->skmTable.pTSchema) {
|
||||
if (pCommitter->skmTable.suid == suid) {
|
||||
if (suid == 0) {
|
||||
if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
|
||||
if (suid) {
|
||||
if (pCommitter->skmTable.suid == suid) goto _exit;
|
||||
} else {
|
||||
if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
|
||||
}
|
||||
}
|
||||
if (pCommitter->skmTable.uid == uid) goto _exit;
|
||||
}
|
||||
|
||||
pCommitter->skmTable.suid = suid;
|
||||
pCommitter->skmTable.uid = uid;
|
||||
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
|
||||
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
|
||||
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema);
|
||||
if (code) goto _exit;
|
||||
|
||||
_exit:
|
||||
|
@ -360,7 +356,7 @@ static int32_t tsdbCommitterNextLastRow(SCommitter *pCommitter) {
|
|||
int64_t suid = pBlockL->suid;
|
||||
int64_t uid = pBlockL->maxUid;
|
||||
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TODO*/);
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid);
|
||||
if (code) goto _exit;
|
||||
|
||||
code = tBlockDataInit(pBlockDatal, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema);
|
||||
|
@ -978,7 +974,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
|
|||
pBlock = NULL;
|
||||
}
|
||||
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer /*TODO*/);
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid);
|
||||
if (code) goto _err;
|
||||
|
||||
tMapDataReset(&pCommitter->dWriter.mBlock);
|
||||
|
@ -1160,7 +1156,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
|
|||
|
||||
// set block data schema if need
|
||||
if (pBlockDataW->suid == 0 && pBlockDataW->uid == 0) {
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid, 1 /*TOOD*/);
|
||||
code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tBlockDataInit(pBlockDataW, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema);
|
||||
|
|
|
@ -46,8 +46,6 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
|
|||
pMemTable->nRef = 1;
|
||||
pMemTable->minKey = TSKEY_MAX;
|
||||
pMemTable->maxKey = TSKEY_MIN;
|
||||
pMemTable->minVersion = VERSION_MAX;
|
||||
pMemTable->maxVersion = VERSION_MIN;
|
||||
pMemTable->nRow = 0;
|
||||
pMemTable->nDel = 0;
|
||||
pMemTable->nTbData = 0;
|
||||
|
@ -180,10 +178,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
|||
pTbData->pTail = pDelData;
|
||||
}
|
||||
|
||||
// update the state of pMemTable and other (todo)
|
||||
|
||||
pMemTable->minVersion = TMIN(pMemTable->minVersion, version);
|
||||
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
|
||||
pMemTable->nDel++;
|
||||
|
||||
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
|
||||
|
@ -368,9 +362,6 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
|
|||
pTbData->uid = uid;
|
||||
pTbData->minKey = TSKEY_MAX;
|
||||
pTbData->maxKey = TSKEY_MIN;
|
||||
pTbData->minVersion = VERSION_MAX;
|
||||
pTbData->maxVersion = VERSION_MIN;
|
||||
pTbData->maxSkmVer = -1;
|
||||
pTbData->pHead = NULL;
|
||||
pTbData->pTail = NULL;
|
||||
pTbData->sl.seed = taosRand();
|
||||
|
@ -615,15 +606,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
|
|||
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
|
||||
}
|
||||
|
||||
pTbData->minVersion = TMIN(pTbData->minVersion, version);
|
||||
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
|
||||
pTbData->maxSkmVer = TMAX(pTbData->maxSkmVer, pMsgIter->sversion);
|
||||
|
||||
// SMemTable
|
||||
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
|
||||
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
|
||||
pMemTable->minVersion = TMIN(pMemTable->minVersion, pTbData->minVersion);
|
||||
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, pTbData->maxVersion);
|
||||
pMemTable->nRow += nRow;
|
||||
|
||||
pRsp->numOfRows = nRow;
|
||||
|
|
Loading…
Reference in New Issue