more work
This commit is contained in:
parent
f0caae86af
commit
bc326593d6
|
@ -64,6 +64,7 @@ typedef struct SDelFWriter SDelFWriter;
|
||||||
typedef struct SDelFReader SDelFReader;
|
typedef struct SDelFReader SDelFReader;
|
||||||
typedef struct SRowIter SRowIter;
|
typedef struct SRowIter SRowIter;
|
||||||
typedef struct STsdbFS STsdbFS;
|
typedef struct STsdbFS STsdbFS;
|
||||||
|
typedef struct SRowMerger SRowMerger;
|
||||||
|
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
|
|
||||||
|
@ -86,6 +87,11 @@ int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||||
// SRowIter
|
// SRowIter
|
||||||
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
SColVal *tRowIterNext(SRowIter *pIter);
|
SColVal *tRowIterNext(SRowIter *pIter);
|
||||||
|
// SRowMerger
|
||||||
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
void tRowMergerClear(SRowMerger *pMerger);
|
||||||
|
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
||||||
|
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow);
|
||||||
// TABLEID
|
// TABLEID
|
||||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||||
// TSDBKEY
|
// TSDBKEY
|
||||||
|
@ -429,6 +435,7 @@ struct SDelIdx {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDelFile {
|
struct SDelFile {
|
||||||
|
int64_t commitID;
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
TSKEY maxKey;
|
TSKEY maxKey;
|
||||||
int64_t minVersion;
|
int64_t minVersion;
|
||||||
|
@ -462,25 +469,21 @@ struct SHeadFile {
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
int32_t nRef;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDataFile {
|
struct SDataFile {
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
int32_t nRef;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SLastFile {
|
struct SLastFile {
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
int32_t nRef;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSmaFile {
|
struct SSmaFile {
|
||||||
int64_t commitID;
|
int64_t commitID;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
int32_t nRef;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SDFileSet {
|
struct SDFileSet {
|
||||||
|
@ -490,7 +493,10 @@ struct SDFileSet {
|
||||||
SDataFile *pDataFile;
|
SDataFile *pDataFile;
|
||||||
SLastFile *pLastFile;
|
SLastFile *pLastFile;
|
||||||
SSmaFile *pSmaFile;
|
SSmaFile *pSmaFile;
|
||||||
int32_t nRef;
|
// SHeadFile headFile;
|
||||||
|
// SDataFile dataFile;
|
||||||
|
// SLastFile lastFile;
|
||||||
|
// SSmaFile smaFile;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRowIter {
|
struct SRowIter {
|
||||||
|
@ -499,6 +505,11 @@ struct SRowIter {
|
||||||
SColVal colVal;
|
SColVal colVal;
|
||||||
int32_t i;
|
int32_t i;
|
||||||
};
|
};
|
||||||
|
struct SRowMerger {
|
||||||
|
STSchema *pTSchema;
|
||||||
|
int64_t version;
|
||||||
|
SArray *pArray; // SArray<SColVal>
|
||||||
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -896,7 +896,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
|
||||||
|
|
||||||
// start ====================
|
// start ====================
|
||||||
code = tsdbCommitDataStart(pCommitter);
|
code = tsdbCommitDataStart(pCommitter);
|
||||||
if (code) return code;
|
if (code) goto _err;
|
||||||
|
|
||||||
// impl ====================
|
// impl ====================
|
||||||
pCommitter->nextKey = pMemTable->minKey;
|
pCommitter->nextKey = pMemTable->minKey;
|
||||||
|
|
|
@ -586,6 +586,83 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SRowMerger ======================================================
|
||||||
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = tsdbRowKey(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
STColumn *pTColumn;
|
||||||
|
|
||||||
|
pMerger->pTSchema = pTSchema;
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
|
||||||
|
if (pMerger->pArray == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts
|
||||||
|
pTColumn = &pTSchema->columns[0];
|
||||||
|
|
||||||
|
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// other
|
||||||
|
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); }
|
||||||
|
|
||||||
|
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = tsdbRowKey(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
|
||||||
|
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
|
||||||
|
|
||||||
|
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
||||||
|
tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal);
|
||||||
|
if (pColVal->isNone) continue;
|
||||||
|
|
||||||
|
// SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
|
||||||
|
if (key.version > pMerger->version) {
|
||||||
|
// forward merge (todo)
|
||||||
|
} else if (key.version < pMerger->version) {
|
||||||
|
// backward merge (todo)
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// delete skyline ======================================================
|
// delete skyline ======================================================
|
||||||
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
|
static int32_t tsdbMergeSkyline(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
Loading…
Reference in New Issue