more code
This commit is contained in:
parent
5a56e39613
commit
e22e4f626a
|
@ -15,692 +15,6 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
#define TSDB_ITER_TYPE_MEM 0x0
|
|
||||||
#define TSDB_ITER_TYPE_DAT 0x1
|
|
||||||
#define TSDB_ITER_TYPE_STT 0x2
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
} SMemDIter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SDataFReader *pReader;
|
|
||||||
SArray *aBlockIdx; // SArray<SBlockIdx>
|
|
||||||
SMapData mDataBlk; // SMapData<SDataBlk>
|
|
||||||
SBlockData bData;
|
|
||||||
int32_t iBlockIdx;
|
|
||||||
int32_t iDataBlk;
|
|
||||||
int32_t iRow;
|
|
||||||
} SDataDIter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SDataFReader *pReader;
|
|
||||||
int32_t iStt;
|
|
||||||
SArray *aSttBlk; // SArray<SSttBlk>
|
|
||||||
SBlockData bData;
|
|
||||||
int32_t iSttBlk;
|
|
||||||
int32_t iRow;
|
|
||||||
} SSttDIter;
|
|
||||||
|
|
||||||
typedef struct STsdbDataIter {
|
|
||||||
struct STsdbDataIter *next;
|
|
||||||
|
|
||||||
int32_t flag;
|
|
||||||
SRowInfo rowInfo;
|
|
||||||
SRBTreeNode n;
|
|
||||||
char handle[];
|
|
||||||
} STsdbDataIter;
|
|
||||||
|
|
||||||
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))
|
|
||||||
|
|
||||||
#define TSDB_FLG_DEEP_COMPACT 0x1
|
|
||||||
|
|
||||||
// ITER =========================
|
|
||||||
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId);
|
|
||||||
|
|
||||||
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
|
|
||||||
const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
|
|
||||||
const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));
|
|
||||||
|
|
||||||
return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
|
|
||||||
if (pIter == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
*ppIter = NULL;
|
|
||||||
} else {
|
|
||||||
*ppIter = pIter;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
|
|
||||||
if (NULL == pIter) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
pIter->flag = TSDB_ITER_TYPE_DAT;
|
|
||||||
|
|
||||||
SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
|
|
||||||
pDataDIter->pReader = pReader;
|
|
||||||
pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
if (pDataDIter->aBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
code = tBlockDataCreate(&pDataDIter->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pDataDIter->iBlockIdx = -1;
|
|
||||||
pDataDIter->iDataBlk = 0;
|
|
||||||
pDataDIter->iRow = 0;
|
|
||||||
|
|
||||||
code = tsdbDataIterNext(pIter, NULL);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
_clear_exit:
|
|
||||||
*ppIter = NULL;
|
|
||||||
if (pIter) {
|
|
||||||
tBlockDataDestroy(&pDataDIter->bData);
|
|
||||||
tMapDataClear(&pDataDIter->mDataBlk);
|
|
||||||
taosArrayDestroy(pDataDIter->aBlockIdx);
|
|
||||||
taosMemoryFree(pIter);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*ppIter = pIter;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
|
|
||||||
if (pIter == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
pIter->flag = TSDB_ITER_TYPE_STT;
|
|
||||||
|
|
||||||
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
|
|
||||||
pSttDIter->pReader = pReader;
|
|
||||||
pSttDIter->iStt = iStt;
|
|
||||||
pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
|
||||||
if (pSttDIter->aSttBlk == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;
|
|
||||||
|
|
||||||
code = tBlockDataCreate(&pSttDIter->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pSttDIter->iSttBlk = -1;
|
|
||||||
pSttDIter->iRow = -1;
|
|
||||||
|
|
||||||
code = tsdbDataIterNext(pIter, NULL);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
_clear_exit:
|
|
||||||
*ppIter = NULL;
|
|
||||||
if (pIter) {
|
|
||||||
tBlockDataDestroy(&pSttDIter->bData);
|
|
||||||
taosArrayDestroy(pSttDIter->aSttBlk);
|
|
||||||
taosMemoryFree(pIter);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
*ppIter = pIter;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbDataIterClose(STsdbDataIter *pIter) {
|
|
||||||
if (pIter == NULL) return;
|
|
||||||
|
|
||||||
if (pIter->flag & TSDB_ITER_TYPE_MEM) {
|
|
||||||
ASSERT(0);
|
|
||||||
} else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
|
|
||||||
ASSERT(0);
|
|
||||||
} else if (pIter->flag & TSDB_ITER_TYPE_STT) {
|
|
||||||
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
|
|
||||||
|
|
||||||
tBlockDataDestroy(&pSttDIter->bData);
|
|
||||||
taosArrayDestroy(pSttDIter->aSttBlk);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
if (pIter->flag & TSDB_ITER_TYPE_MEM) {
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
} else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
} else if (pIter->flag & TSDB_ITER_TYPE_STT) {
|
|
||||||
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (++pSttDIter->iRow >= pSttDIter->bData.nRow) {
|
|
||||||
for (;;) {
|
|
||||||
if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
|
|
||||||
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk);
|
|
||||||
|
|
||||||
// check exclusion
|
|
||||||
if (pExcludeTableId) {
|
|
||||||
if (pExcludeTableId->uid) { // exclude (suid, uid)
|
|
||||||
if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue;
|
|
||||||
} else { // exclude (suid, *)
|
|
||||||
if (pSttBlk->suid == pExcludeTableId->suid) continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pIter->rowInfo.suid = pSttBlk->suid;
|
|
||||||
pSttDIter->iRow = 0;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// iter end, all set 0 and exit
|
|
||||||
pIter->rowInfo.suid = 0;
|
|
||||||
pIter->rowInfo.uid = 0;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
|
|
||||||
pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
|
|
||||||
|
|
||||||
// check exclusion
|
|
||||||
if (pExcludeTableId) {
|
|
||||||
if (pExcludeTableId->uid) { // exclude (suid, uid)
|
|
||||||
if (pIter->rowInfo.uid == pExcludeTableId->uid) continue;
|
|
||||||
} else { // exclude (suid, *)
|
|
||||||
if (pIter->rowInfo.suid == pExcludeTableId->suid) continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// COMPACT =========================
|
|
||||||
|
|
||||||
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
|
|
||||||
code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
|
||||||
|
|
||||||
code = tsdbFSCommit(pTsdb);
|
|
||||||
if (code) {
|
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
|
|
||||||
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
} else {
|
|
||||||
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
|
|
||||||
if (pNode) {
|
|
||||||
STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
|
|
||||||
|
|
||||||
int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
|
|
||||||
ASSERT(c);
|
|
||||||
|
|
||||||
if (c > 0) {
|
|
||||||
tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCompactor->pIter == NULL) {
|
|
||||||
SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
|
|
||||||
if (pNode) {
|
|
||||||
pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
|
|
||||||
tRBTreeDrop(&pCompactor->rtree, pNode);
|
|
||||||
|
|
||||||
if (pExcludeTableId) {
|
|
||||||
if (pExcludeTableId->uid) {
|
|
||||||
if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue;
|
|
||||||
} else {
|
|
||||||
if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) {
|
|
||||||
if (pDelIdx1->suid < pDelIdx2->suid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pDelIdx1->suid > pDelIdx2->suid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDelIdx1->uid < pDelIdx2->uid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pDelIdx1->uid > pDelIdx2->uid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
TABLEID excludeTableId;
|
|
||||||
TABLEID *pExcludeTableId = NULL;
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
code = tsdbCompactNextRowImpl(pCompactor, pExcludeTableId);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// check if the table of the row exists
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo;
|
|
||||||
|
|
||||||
if (pRowInfo->uid != pCompactor->tbSkm.uid) {
|
|
||||||
SMetaInfo info;
|
|
||||||
if (pRowInfo->suid) { // child table
|
|
||||||
|
|
||||||
// check if super table exists
|
|
||||||
if (pRowInfo->suid != pCompactor->tbSkm.suid) {
|
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
|
||||||
excludeTableId.suid = pRowInfo->suid;
|
|
||||||
excludeTableId.uid = 0;
|
|
||||||
pExcludeTableId = &excludeTableId;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// super table exists
|
|
||||||
pCompactor->tbSkm.suid = pRowInfo->suid;
|
|
||||||
pCompactor->tbSkm.uid = 0;
|
|
||||||
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
|
||||||
pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
|
|
||||||
if (pCompactor->tbSkm.pTSchema == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if table exists
|
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
|
||||||
excludeTableId.suid = pRowInfo->suid;
|
|
||||||
excludeTableId.uid = pRowInfo->uid;
|
|
||||||
pExcludeTableId = &excludeTableId;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// table exists
|
|
||||||
pCompactor->tbSkm.uid = pRowInfo->uid;
|
|
||||||
} else { // normal table
|
|
||||||
// check if table exists
|
|
||||||
if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
|
|
||||||
excludeTableId.suid = pRowInfo->suid;
|
|
||||||
excludeTableId.uid = pRowInfo->uid;
|
|
||||||
pExcludeTableId = &excludeTableId;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// table exists
|
|
||||||
pCompactor->tbSkm.suid = pRowInfo->suid;
|
|
||||||
pCompactor->tbSkm.uid = pRowInfo->uid;
|
|
||||||
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
|
||||||
|
|
||||||
pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
|
|
||||||
if (pCompactor->tbSkm.pTSchema == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// load delData and build the skyline
|
|
||||||
if (pCompactor->pDelFReader) {
|
|
||||||
SDelIdx *pDelIdx =
|
|
||||||
taosArraySearch(pCompactor->aDelIdx, &(SDelIdx){.suid = pRowInfo->suid, .uid = pRowInfo->uid},
|
|
||||||
(__compar_fn_t)tDelIdxCmprFn, TD_EQ);
|
|
||||||
if (pDelIdx) {
|
|
||||||
code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1,
|
|
||||||
pCompactor->aSkyLine);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
|
|
||||||
pCompactor->iKey = 0;
|
|
||||||
pCompactor->sKey.version = 0;
|
|
||||||
pCompactor->sKey.ts = pCompactor->aTSDBKEY[0].ts;
|
|
||||||
} else {
|
|
||||||
pCompactor->aTSDBKEY = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pRowInfo->uid == pCompactor->tbSkm.uid);
|
|
||||||
|
|
||||||
// check if the row is deleted
|
|
||||||
if (pCompactor->aTSDBKEY && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// iter end, just break out
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
if (pCompactor->pIter == NULL) {
|
|
||||||
code = tsdbCompactNextRow(pCompactor);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCompactor->pIter) {
|
|
||||||
ASSERT(pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid);
|
|
||||||
ASSERT(pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid);
|
|
||||||
*ppRowInfo = &pCompactor->pIter->rowInfo;
|
|
||||||
*ppTSchema = pCompactor->tbSkm.pTSchema;
|
|
||||||
} else {
|
|
||||||
*ppRowInfo = NULL;
|
|
||||||
*ppTSchema = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
STsdb *pTsdb = pCompactor->pTsdb;
|
|
||||||
|
|
||||||
// reader
|
|
||||||
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
// open iters
|
|
||||||
STsdbDataIter *pIter;
|
|
||||||
|
|
||||||
pCompactor->iterList = NULL;
|
|
||||||
tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
|
|
||||||
|
|
||||||
code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pIter) {
|
|
||||||
pIter->next = pCompactor->iterList;
|
|
||||||
pCompactor->iterList = pIter;
|
|
||||||
tRBTreePut(&pCompactor->rtree, &pIter->n);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
|
|
||||||
code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pIter) {
|
|
||||||
pIter->next = pCompactor->iterList;
|
|
||||||
pCompactor->iterList = pIter;
|
|
||||||
tRBTreePut(&pCompactor->rtree, &pIter->n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pCompactor->pIter = NULL;
|
|
||||||
tBlockDataReset(&pCompactor->bData);
|
|
||||||
|
|
||||||
// writer
|
|
||||||
SDFileSet wSet = {.diskId = (SDiskID){0}, // TODO
|
|
||||||
.fid = pCompactor->pDFileSet->fid,
|
|
||||||
.pHeadF = &(SHeadFile){.commitID = pCompactor->cid},
|
|
||||||
.pDataF = &(SDataFile){.commitID = pCompactor->cid},
|
|
||||||
.pSmaF = &(SSmaFile){.commitID = pCompactor->cid},
|
|
||||||
.nSttF = 1,
|
|
||||||
.aSttF = {&(SSttFile){.commitID = pCompactor->cid}}};
|
|
||||||
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, &wSet);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (pCompactor->aBlockIdx == NULL) {
|
|
||||||
pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
if (pCompactor->aBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArrayClear(pCompactor->aBlockIdx);
|
|
||||||
}
|
|
||||||
|
|
||||||
tMapDataReset(&pCompactor->mDataBlk);
|
|
||||||
|
|
||||||
if (pCompactor->aSttBlk == NULL) {
|
|
||||||
pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
|
||||||
if (pCompactor->aSttBlk == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosArrayClear(pCompactor->aSttBlk);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
|
|
||||||
for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
|
|
||||||
STsdbDataIter *pIterNext = pIter->next;
|
|
||||||
tsdbDataIterClose(pIter);
|
|
||||||
pIter = pIterNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
|
|
||||||
pCompactor->tbSkm.pTSchema = NULL;
|
|
||||||
|
|
||||||
tsdbDataFReaderClose(&pCompactor->pReader);
|
|
||||||
}
|
|
||||||
|
|
||||||
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
|
|
||||||
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
|
|
||||||
static int32_t tsdbCompactWriteBlockData(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
SBlockData *pBData = &pCompactor->bData;
|
|
||||||
|
|
||||||
if (pBData->nRow == 0) goto _exit;
|
|
||||||
|
|
||||||
if (pBData->uid && pBData->nRow >= pCompactor->minRows) { // write to .data file
|
|
||||||
code = tsdbWriteDataBlock(pCompactor->pWriter, pBData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
pCompactor->tableId.suid = pBData->suid;
|
|
||||||
pCompactor->tableId.uid = pBData->uid;
|
|
||||||
} else { // write to .stt file
|
|
||||||
code = tsdbWriteSttBlock(pCompactor->pWriter, pBData, pCompactor->aSttBlk, pCompactor->cmprAlg);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
tBlockDataClear(&pCompactor->bData);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
if (pCompactor->mDataBlk.nItem == 0) goto _exit;
|
|
||||||
|
|
||||||
SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
|
|
||||||
if (pBlockIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlockIdx->suid = pCompactor->tableId.suid;
|
|
||||||
pBlockIdx->uid = pCompactor->tableId.uid;
|
|
||||||
|
|
||||||
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
tMapDataReset(&pCompactor->mDataBlk);
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
|
|
||||||
tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo);
|
extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo);
|
||||||
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
|
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
|
||||||
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
|
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
|
||||||
|
@ -743,6 +57,48 @@ typedef struct {
|
||||||
SBlockData sData;
|
SBlockData sData;
|
||||||
} STsdbCompactor;
|
} STsdbCompactor;
|
||||||
|
|
||||||
|
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
STsdb *pTsdb = pCompactor->pTsdb;
|
||||||
|
|
||||||
|
code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
code = tsdbFSCommit(pTsdb);
|
||||||
|
if (code) {
|
||||||
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
STsdb *pTsdb = pCompactor->pTsdb;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) {
|
static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1253,20 +609,18 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL);
|
code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
#endif
|
|
||||||
_exit:
|
_exit:
|
||||||
// // commit/abort compact
|
// commit/abort compact
|
||||||
// if (code) {
|
if (code) {
|
||||||
// tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino,
|
||||||
// tsdbAbortCompact(pCompactor);
|
tstrerror(code), pCompactor->commitID);
|
||||||
// } else {
|
tsdbAbortCompact(pCompactor);
|
||||||
// tsdbCommitCompact(pCompactor);
|
} else {
|
||||||
// }
|
tsdbCommitCompact(pCompactor);
|
||||||
|
}
|
||||||
tsdbEndCompact(pCompactor);
|
tsdbEndCompact(pCompactor);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue