finish read block sma api
This commit is contained in:
parent
99fb3c678c
commit
1acbb50f24
|
@ -35,7 +35,6 @@ extern "C" {
|
|||
typedef struct TSDBROW TSDBROW;
|
||||
typedef struct TABLEID TABLEID;
|
||||
typedef struct TSDBKEY TSDBKEY;
|
||||
typedef struct KEYINFO KEYINFO;
|
||||
typedef struct SDelData SDelData;
|
||||
typedef struct SDelIdx SDelIdx;
|
||||
typedef struct STbData STbData;
|
||||
|
@ -43,7 +42,6 @@ typedef struct SMemTable SMemTable;
|
|||
typedef struct STbDataIter STbDataIter;
|
||||
typedef struct STable STable;
|
||||
typedef struct SMapData SMapData;
|
||||
typedef struct SBlockSMA SBlockSMA;
|
||||
typedef struct SBlockIdx SBlockIdx;
|
||||
typedef struct SBlock SBlock;
|
||||
typedef struct SBlockStatis SBlockStatis;
|
||||
|
@ -106,14 +104,6 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
|||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
||||
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
|
||||
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
|
||||
// KEYINFO
|
||||
#define tKEYINFOInit() \
|
||||
((KEYINFO){.maxKey = {.ts = TSKEY_MIN, .version = -1}, \
|
||||
.minKey = {.ts = TSKEY_MAX, .version = INT64_MAX}, \
|
||||
.minVerion = INT64_MAX, \
|
||||
.maxVersion = -1})
|
||||
int32_t tPutKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
|
||||
int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
|
||||
// SBlockCol
|
||||
int32_t tPutBlockCol(uint8_t *p, void *ph);
|
||||
int32_t tGetBlockCol(uint8_t *p, void *ph);
|
||||
|
@ -123,6 +113,7 @@ void tBlockReset(SBlock *pBlock);
|
|||
int32_t tPutBlock(uint8_t *p, void *ph);
|
||||
int32_t tGetBlock(uint8_t *p, void *ph);
|
||||
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
||||
bool tBlockHasSma(SBlock *pBlock);
|
||||
// SBlockIdx
|
||||
// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
|
||||
void tBlockIdxReset(SBlockIdx *pBlockIdx);
|
||||
|
@ -173,6 +164,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
|
|||
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
|
||||
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
|
||||
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
|
||||
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
|
||||
// tsdbMemTable ==============================================================================================
|
||||
// SMemTable
|
||||
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
|
||||
|
@ -230,7 +222,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
|
|||
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
|
||||
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
|
||||
uint8_t **ppBuf1, uint8_t **ppBuf2);
|
||||
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf);
|
||||
// SDelFWriter
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter *pWriter, int8_t sync);
|
||||
|
@ -287,13 +279,6 @@ struct TSDBKEY {
|
|||
TSKEY ts;
|
||||
};
|
||||
|
||||
struct KEYINFO {
|
||||
TSDBKEY minKey;
|
||||
TSDBKEY maxKey;
|
||||
int64_t minVerion;
|
||||
int64_t maxVersion;
|
||||
};
|
||||
|
||||
typedef struct SMemSkipListNode SMemSkipListNode;
|
||||
struct SMemSkipListNode {
|
||||
int8_t level;
|
||||
|
@ -477,21 +462,6 @@ struct SDelFile {
|
|||
int64_t offset;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int16_t colId;
|
||||
int16_t maxIndex;
|
||||
int16_t minIndex;
|
||||
int16_t numOfNull;
|
||||
int64_t sum;
|
||||
int64_t max;
|
||||
int64_t min;
|
||||
} SColSMA;
|
||||
|
||||
struct SBlockSMA {
|
||||
int32_t nCol;
|
||||
SColSMA *aColSMA;
|
||||
};
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct SBlockDataHdr {
|
||||
uint32_t delimiter;
|
||||
|
|
|
@ -1139,9 +1139,55 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) {
|
||||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
TdFilePtr pFD = pReader->pSmaFD;
|
||||
int64_t offset = pBlock->aSubBlock[0].offset;
|
||||
int64_t size = pBlock->aSubBlock[0].nSma * sizeof(SColumnDataAgg) + sizeof(TSCKSUM);
|
||||
uint8_t *pBuf = NULL;
|
||||
int64_t n;
|
||||
|
||||
ASSERT(tBlockHasSma(pBlock));
|
||||
|
||||
if (!ppBuf) ppBuf = &pBuf;
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// lseek
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// read
|
||||
n = taosReadFile(pFD, *ppBuf, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check
|
||||
if (!taosCheckChecksumWhole(NULL, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// decode
|
||||
taosArrayClear(aColumnDataAgg);
|
||||
for (int32_t iSma = 0; iSma < pBlock->aSubBlock[0].nSma; iSma++) {
|
||||
if (taosArrayPush(aColumnDataAgg, &((SColumnDataAgg *)(*ppBuf))[iSma]) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1699,24 +1745,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
|
||||
SColVal colVal;
|
||||
SColVal *pColVal = &colVal;
|
||||
|
||||
*pColAgg = (SColumnDataAgg){.colId = pColData->cid};
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
tColDataGetValue(pColData, iVal, pColVal);
|
||||
|
||||
if (pColVal->isNone || pColVal->isNull) {
|
||||
pColAgg->numOfNull++;
|
||||
} else {
|
||||
// TODO:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbWriteBlockSMA(TdFilePtr pFD, SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t **ppBuf) {
|
||||
static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t **ppBuf) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
SColData *pColData;
|
||||
|
@ -1843,7 +1872,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
|
||||
if (pBlock->nSubBlock > 1 || pBlock->last || pBlock->hasDup) goto _exit;
|
||||
|
||||
code = tsdbWriteBlockSMA(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1);
|
||||
code = tsdbWriteBlockSma(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pSubBlock->nSma > 0) {
|
||||
|
|
|
@ -353,6 +353,14 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tBlockHasSma(SBlock *pBlock) {
|
||||
if (pBlock->nSubBlock > 1) return false;
|
||||
if (pBlock->last) return false;
|
||||
if (pBlock->hasDup) return false;
|
||||
|
||||
return pBlock->aSubBlock[0].nSma > 0;
|
||||
}
|
||||
|
||||
// SBlockCol ======================================================
|
||||
int32_t tPutBlockCol(uint8_t *p, void *ph) {
|
||||
int32_t n = 0;
|
||||
|
@ -769,29 +777,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
|||
return code;
|
||||
}
|
||||
|
||||
// KEYINFO ======================================================
|
||||
int32_t tPutKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutTSDBKEY(p ? p + n : p, &pKeyInfo->minKey);
|
||||
n += tPutTSDBKEY(p ? p + n : p, &pKeyInfo->maxKey);
|
||||
n += tPutI64v(p ? p + n : p, pKeyInfo->minVerion);
|
||||
n += tPutI64v(p ? p + n : p, pKeyInfo->maxVersion);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetTSDBKEY(p + n, &pKeyInfo->minKey);
|
||||
n += tGetTSDBKEY(p + n, &pKeyInfo->maxKey);
|
||||
n += tGetI64v(p + n, &pKeyInfo->minVerion);
|
||||
n += tGetI64v(p + n, &pKeyInfo->maxVersion);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
// SColData ========================================
|
||||
void tColDataReset(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
|
||||
pColData->cid = cid;
|
||||
|
@ -1192,3 +1177,63 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
|
|||
*ppColData = *(SColData **)p;
|
||||
}
|
||||
}
|
||||
|
||||
// ALGORITHM ==============================
|
||||
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
|
||||
SColVal colVal;
|
||||
SColVal *pColVal = &colVal;
|
||||
|
||||
*pColAgg = (SColumnDataAgg){.colId = pColData->cid};
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
tColDataGetValue(pColData, iVal, pColVal);
|
||||
|
||||
if (pColVal->isNone || pColVal->isNull) {
|
||||
pColAgg->numOfNull++;
|
||||
} else {
|
||||
switch (pColData->type) {
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue