more work
This commit is contained in:
parent
5b2637e532
commit
fe27ee2654
|
@ -310,33 +310,13 @@ typedef struct {
|
||||||
SBlock blocks[];
|
SBlock blocks[];
|
||||||
} SBlockInfo;
|
} SBlockInfo;
|
||||||
|
|
||||||
#ifdef TD_REFACTOR_3
|
|
||||||
typedef struct {
|
|
||||||
int16_t colId;
|
|
||||||
uint16_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows
|
|
||||||
uint16_t reserve : 15;
|
|
||||||
int32_t len;
|
|
||||||
uint32_t type : 8;
|
|
||||||
uint32_t offset : 24;
|
|
||||||
int64_t sum;
|
|
||||||
int64_t max;
|
|
||||||
int64_t min;
|
|
||||||
int16_t maxIndex;
|
|
||||||
int16_t minIndex;
|
|
||||||
int16_t numOfNull;
|
|
||||||
uint8_t offsetH;
|
|
||||||
char padding[1];
|
|
||||||
} SBlockCol;
|
|
||||||
#else
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
uint16_t type : 6;
|
uint16_t type : 6;
|
||||||
uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length
|
uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length
|
||||||
uint32_t len; // data length + bitmap length
|
uint32_t len; // data length + bitmap length
|
||||||
uint32_t offset;
|
uint32_t offset;
|
||||||
} SBlockColV0;
|
} SBlockCol;
|
||||||
|
|
||||||
#define SBlockCol SBlockColV0 // latest SBlockCol definition
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int16_t colId;
|
int16_t colId;
|
||||||
|
@ -346,11 +326,7 @@ typedef struct {
|
||||||
int64_t sum;
|
int64_t sum;
|
||||||
int64_t max;
|
int64_t max;
|
||||||
int64_t min;
|
int64_t min;
|
||||||
} SAggrBlkColV0;
|
} SAggrBlkCol;
|
||||||
|
|
||||||
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Code here just for back-ward compatibility
|
// Code here just for back-ward compatibility
|
||||||
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
|
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
|
||||||
|
@ -411,8 +387,7 @@ struct SReadH {
|
||||||
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
|
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
|
||||||
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
|
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
|
||||||
|
|
||||||
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
|
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
|
||||||
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
|
||||||
|
|
||||||
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
||||||
switch (blkVer) {
|
switch (blkVer) {
|
||||||
|
@ -422,7 +397,7 @@ static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
|
||||||
|
|
||||||
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
|
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
|
||||||
switch (blkVer) {
|
switch (blkVer) {
|
||||||
|
@ -520,11 +495,11 @@ static FORCE_INLINE void *taosTZfree(void *ptr) {
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
||||||
|
|
||||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) {
|
||||||
if (key < 0) {
|
if (key < 0) {
|
||||||
return (int)((key + 1) / tsTickPerMin[precision] / days - 1);
|
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);
|
||||||
} else {
|
} else {
|
||||||
return (int)((key / tsTickPerMin[precision] / days));
|
return (int)((key / tsTickPerMin[precision] / minutes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -877,12 +852,21 @@ struct SDelOp {
|
||||||
SDelOp *pNext;
|
SDelOp *pNext;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
tb_uid_t suid;
|
||||||
|
tb_uid_t uid;
|
||||||
|
int64_t version;
|
||||||
|
TSKEY sKey;
|
||||||
|
TSKEY eKey;
|
||||||
|
} SDelInfo;
|
||||||
|
|
||||||
struct SMemTable {
|
struct SMemTable {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
int32_t nRef;
|
int32_t nRef;
|
||||||
TSDBKEY minKey;
|
TSDBKEY minKey;
|
||||||
TSDBKEY maxKey;
|
TSDBKEY maxKey;
|
||||||
int64_t nRows;
|
int64_t nRows;
|
||||||
|
int64_t nDelOp;
|
||||||
SArray *aSkmInfo;
|
SArray *aSkmInfo;
|
||||||
SArray *aMemData;
|
SArray *aMemData;
|
||||||
};
|
};
|
||||||
|
|
|
@ -17,13 +17,22 @@
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMemTable *pMemTable;
|
SMemTable *pMemTable;
|
||||||
|
int32_t minutes;
|
||||||
|
int8_t precision;
|
||||||
|
int32_t sfid;
|
||||||
|
int32_t efid;
|
||||||
SReadH readh;
|
SReadH readh;
|
||||||
|
SDFileSet wSet;
|
||||||
|
SArray *aDelInfo;
|
||||||
SArray *aBlkIdx;
|
SArray *aBlkIdx;
|
||||||
|
SArray *aSupBlk;
|
||||||
|
SArray *aSubBlk;
|
||||||
} SCommitH;
|
} SCommitH;
|
||||||
|
|
||||||
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb);
|
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb);
|
||||||
static int32_t tsdbEndCommit(SCommitH *pCHandle);
|
static int32_t tsdbEndCommit(SCommitH *pCHandle);
|
||||||
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid);
|
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid);
|
||||||
|
static int32_t tsdbCommitDelete(SCommitH *pCHandle);
|
||||||
|
|
||||||
int32_t tsdbBegin2(STsdb *pTsdb) {
|
int32_t tsdbBegin2(STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -50,15 +59,18 @@ int32_t tsdbCommit2(STsdb *pTsdb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit
|
// commit
|
||||||
int32_t sfid; // todo
|
for (int32_t fid = ch.sfid; fid <= ch.efid; fid++) {
|
||||||
int32_t efid; // todo
|
|
||||||
for (int32_t fid = sfid; fid <= efid; fid++) {
|
|
||||||
code = tsdbCommitToFile(&ch, fid);
|
code = tsdbCommitToFile(&ch, fid);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbCommitDelete(&ch);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// end commit
|
// end commit
|
||||||
code = tsdbEndCommit(&ch);
|
code = tsdbEndCommit(&ch);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -75,17 +87,76 @@ _err:
|
||||||
|
|
||||||
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
|
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SMemTable *pMemTable = (SMemTable *)pTsdb->mem;
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode));
|
||||||
|
|
||||||
|
// switch to commit
|
||||||
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
|
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
|
||||||
pTsdb->imem = pTsdb->mem;
|
pTsdb->imem = pTsdb->mem;
|
||||||
pTsdb->mem = NULL;
|
pTsdb->mem = NULL;
|
||||||
// TODO
|
|
||||||
|
// open handle
|
||||||
|
pCHandle->pMemTable = pMemTable;
|
||||||
|
pCHandle->minutes = pTsdb->keepCfg.days;
|
||||||
|
pCHandle->precision = pTsdb->keepCfg.precision;
|
||||||
|
pCHandle->sfid = TSDB_KEY_FID(pMemTable->minKey.ts, pCHandle->minutes, pCHandle->precision);
|
||||||
|
pCHandle->efid = TSDB_KEY_FID(pMemTable->maxKey.ts, pCHandle->minutes, pCHandle->precision);
|
||||||
|
|
||||||
|
code = tsdbInitReadH(&pCHandle->readh, pTsdb);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pCHandle->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
|
||||||
|
if (pCHandle->aBlkIdx == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pCHandle->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
|
||||||
|
if (pCHandle->aSupBlk == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pCHandle->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
|
||||||
|
if (pCHandle->aSubBlk == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// start FS transaction
|
||||||
|
tsdbStartFSTxn(pTsdb, 0, 0);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbEndCommit(SCommitH *pCHandle) {
|
static int32_t tsdbEndCommit(SCommitH *pCHandle) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
// TODO
|
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
|
||||||
|
SMemTable *pMemTable = (SMemTable *)pTsdb->imem;
|
||||||
|
|
||||||
|
// end transaction
|
||||||
|
code = tsdbEndFSTxn(pTsdb);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// close handle
|
||||||
|
taosArrayClear(pCHandle->aSubBlk);
|
||||||
|
taosArrayClear(pCHandle->aSupBlk);
|
||||||
|
taosArrayClear(pCHandle->aBlkIdx);
|
||||||
|
tsdbDestroyReadH(&pCHandle->readh);
|
||||||
|
|
||||||
|
// destroy memtable (todo: unref it)
|
||||||
|
pTsdb->imem = NULL;
|
||||||
|
tsdbMemTableDestroy2(pMemTable);
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d commit over", TD_VID(pTsdb->pVnode));
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,3 +267,62 @@ static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) {
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t delInfoCmprFn(const void *p1, const void *p2) {
|
||||||
|
SDelInfo *pDelInfo1 = (SDelInfo *)p1;
|
||||||
|
SDelInfo *pDelInfo2 = (SDelInfo *)p2;
|
||||||
|
|
||||||
|
if (pDelInfo1->suid < pDelInfo2->suid) {
|
||||||
|
return -1;
|
||||||
|
} else if (pDelInfo1->suid > pDelInfo2->suid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDelInfo1->uid < pDelInfo2->uid) {
|
||||||
|
return -1;
|
||||||
|
} else if (pDelInfo1->uid > pDelInfo2->uid) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDelInfo1->version < pDelInfo2->version) {
|
||||||
|
return -1;
|
||||||
|
} else if (pDelInfo1->version > pDelInfo2->version) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int32_t tsdbCommitDelete(SCommitH *pCHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SDelInfo delInfo;
|
||||||
|
SMemData *pMemData;
|
||||||
|
|
||||||
|
if (pCHandle->pMemTable->nDelOp == 0) goto _exit;
|
||||||
|
|
||||||
|
// load del array (todo)
|
||||||
|
|
||||||
|
// loop to append SDelInfo
|
||||||
|
for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) {
|
||||||
|
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
|
||||||
|
|
||||||
|
for (SDelOp *pDelOp = pMemData->delOpHead; pDelOp; pDelOp = pDelOp->pNext) {
|
||||||
|
delInfo = (SDelInfo){.suid = pMemData->suid,
|
||||||
|
.uid = pMemData->uid,
|
||||||
|
.version = pDelOp->version,
|
||||||
|
.sKey = pDelOp->sKey,
|
||||||
|
.eKey = pDelOp->eKey};
|
||||||
|
if (taosArrayPush(pCHandle->aDelInfo, &delInfo) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySort(pCHandle->aDelInfo, delInfoCmprFn);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -59,6 +59,7 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
|
||||||
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
||||||
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
||||||
pMemTable->nRows = 0;
|
pMemTable->nRows = 0;
|
||||||
|
pMemTable->nDelOp = 0;
|
||||||
pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *));
|
pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *));
|
||||||
if (pMemTable->aMemData == NULL) {
|
if (pMemTable->aMemData == NULL) {
|
||||||
taosMemoryFree(pMemTable);
|
taosMemoryFree(pMemTable);
|
||||||
|
@ -149,6 +150,8 @@ int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_ui
|
||||||
// update the state of pMemTable, pMemData, last and lastrow (todo)
|
// update the state of pMemTable, pMemData, last and lastrow (todo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMemTable->nDelOp++;
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
|
tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
|
||||||
" since %s",
|
" since %s",
|
||||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue