Merge pull request #11067 from taosdata/feature/TD-11463-3.0
Feature/td 11463 3.0
This commit is contained in:
commit
4bad58c159
|
@ -63,7 +63,7 @@ extern "C" {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
||||||
int32_t type : 8; // column type
|
int32_t type : 8; // column type
|
||||||
int32_t bytes : 24; // column bytes (restore to int32_t in case of misuse)
|
int32_t bytes : 24; // column bytes (0~16M)
|
||||||
int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ...
|
int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ...
|
||||||
int32_t offset : 24; // point offset in STpRow after the header part.
|
int32_t offset : 24; // point offset in STpRow after the header part.
|
||||||
} STColumn;
|
} STColumn;
|
||||||
|
@ -81,12 +81,12 @@ typedef struct {
|
||||||
|
|
||||||
// ----------------- TSDB SCHEMA DEFINITION
|
// ----------------- TSDB SCHEMA DEFINITION
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t version; // version
|
|
||||||
int32_t numOfCols; // Number of columns appended
|
int32_t numOfCols; // Number of columns appended
|
||||||
int32_t tlen; // maximum length of a STpRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) +
|
schema_ver_t version; // schema version
|
||||||
// (bytes))
|
|
||||||
uint16_t flen; // First part length in a STpRow after the header part
|
uint16_t flen; // First part length in a STpRow after the header part
|
||||||
uint16_t vlen; // pure value part length, excluded the overhead (bytes only)
|
int32_t vlen; // pure value part length, excluded the overhead (bytes only)
|
||||||
|
int32_t tlen; // maximum length of a STpRow without the header part
|
||||||
|
// (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes))
|
||||||
STColumn columns[];
|
STColumn columns[];
|
||||||
} STSchema;
|
} STSchema;
|
||||||
|
|
||||||
|
@ -122,10 +122,10 @@ static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t tCols;
|
int32_t tCols;
|
||||||
int32_t nCols;
|
int32_t nCols;
|
||||||
int32_t tlen;
|
schema_ver_t version;
|
||||||
uint16_t flen;
|
uint16_t flen;
|
||||||
uint16_t vlen;
|
int32_t vlen;
|
||||||
int32_t version;
|
int32_t tlen;
|
||||||
STColumn *columns;
|
STColumn *columns;
|
||||||
} STSchemaBuilder;
|
} STSchemaBuilder;
|
||||||
|
|
||||||
|
@ -136,9 +136,9 @@ typedef struct {
|
||||||
#define TD_BITMAP_BYTES(cnt) (ceil((double)cnt / TD_VTYPE_PARTS))
|
#define TD_BITMAP_BYTES(cnt) (ceil((double)cnt / TD_VTYPE_PARTS))
|
||||||
#define TD_BIT_TO_BYTES(cnt) (ceil((double)cnt / 8))
|
#define TD_BIT_TO_BYTES(cnt) (ceil((double)cnt / 8))
|
||||||
|
|
||||||
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
|
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
|
||||||
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
|
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
|
||||||
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
|
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
|
||||||
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes);
|
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes);
|
||||||
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
|
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ typedef uint8_t TDRowValT;
|
||||||
typedef int16_t col_id_t;
|
typedef int16_t col_id_t;
|
||||||
typedef int8_t col_type_t;
|
typedef int8_t col_type_t;
|
||||||
typedef int32_t col_bytes_t;
|
typedef int32_t col_bytes_t;
|
||||||
|
typedef uint16_t schema_ver_t;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -123,7 +123,7 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
|
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||||
if (pBuilder == NULL) return -1;
|
if (pBuilder == NULL) return -1;
|
||||||
|
|
||||||
pBuilder->tCols = 256;
|
pBuilder->tCols = 256;
|
||||||
|
@ -140,7 +140,7 @@ void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
|
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||||
pBuilder->nCols = 0;
|
pBuilder->nCols = 0;
|
||||||
pBuilder->tlen = 0;
|
pBuilder->tlen = 0;
|
||||||
pBuilder->flen = 0;
|
pBuilder->flen = 0;
|
||||||
|
@ -168,6 +168,9 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col
|
||||||
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
|
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: set sma value by user input
|
||||||
|
pCol->sma = 1;
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
colSetBytes(pCol, bytes);
|
colSetBytes(pCol, bytes);
|
||||||
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
|
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
|
||||||
|
|
|
@ -67,12 +67,13 @@ typedef struct {
|
||||||
uint8_t last : 1;
|
uint8_t last : 1;
|
||||||
uint8_t blkVer : 7;
|
uint8_t blkVer : 7;
|
||||||
uint8_t numOfSubBlocks;
|
uint8_t numOfSubBlocks;
|
||||||
int16_t numOfCols; // not including timestamp column
|
col_id_t numOfCols; // not including timestamp column
|
||||||
uint32_t len; // data block length
|
uint32_t len; // data block length
|
||||||
uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
||||||
|
uint32_t algorithm : 4;
|
||||||
uint32_t reserve : 8;
|
uint32_t reserve : 8;
|
||||||
int32_t algorithm : 8;
|
col_id_t numOfBSma;
|
||||||
int32_t numOfRows : 24;
|
uint16_t numOfRows;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
uint64_t aggrStat : 1;
|
uint64_t aggrStat : 1;
|
||||||
uint64_t aggrOffset : 63;
|
uint64_t aggrOffset : 63;
|
||||||
|
|
|
@ -1207,6 +1207,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
||||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||||
SBlockData *pBlockData = NULL;
|
SBlockData *pBlockData = NULL;
|
||||||
SAggrBlkData *pAggrBlkData = NULL;
|
SAggrBlkData *pAggrBlkData = NULL;
|
||||||
|
STSchema *pSchema = pTable->pSchema;
|
||||||
int64_t offset = 0, offsetAggr = 0;
|
int64_t offset = 0, offsetAggr = 0;
|
||||||
int rowsToWrite = pDataCols->numOfRows;
|
int rowsToWrite = pDataCols->numOfRows;
|
||||||
|
|
||||||
|
@ -1225,8 +1226,10 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
||||||
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
|
pAggrBlkData = (SAggrBlkData *)(*ppExBuf);
|
||||||
|
|
||||||
// Get # of cols not all NULL(not including key column)
|
// Get # of cols not all NULL(not including key column)
|
||||||
int nColsNotAllNull = 0;
|
col_id_t nColsNotAllNull = 0;
|
||||||
|
col_id_t nColsOfBlockSma = 0;
|
||||||
for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column
|
for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column
|
||||||
|
STColumn *pColumn = pSchema->columns + ncol;
|
||||||
SDataCol *pDataCol = pDataCols->cols + ncol;
|
SDataCol *pDataCol = pDataCols->cols + ncol;
|
||||||
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
|
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
|
||||||
SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull;
|
SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsNotAllNull;
|
||||||
|
@ -1260,7 +1263,12 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
||||||
} else {
|
} else {
|
||||||
TD_SET_COL_ROWS_MISC(pBlockCol);
|
TD_SET_COL_ROWS_MISC(pBlockCol);
|
||||||
}
|
}
|
||||||
nColsNotAllNull++;
|
|
||||||
|
++nColsNotAllNull;
|
||||||
|
|
||||||
|
if (pColumn->sma) {
|
||||||
|
++nColsOfBlockSma;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
|
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
|
||||||
|
@ -1357,9 +1365,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
|
uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0;
|
||||||
if (aggrStatus > 0) {
|
if (aggrStatus > 0) {
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
|
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
|
||||||
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
|
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
|
||||||
|
|
||||||
|
@ -1378,6 +1385,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
|
||||||
pBlock->keyLen = keyLen;
|
pBlock->keyLen = keyLen;
|
||||||
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
|
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
|
||||||
pBlock->numOfCols = nColsNotAllNull;
|
pBlock->numOfCols = nColsNotAllNull;
|
||||||
|
pBlock->numOfBSma = nColsOfBlockSma;
|
||||||
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
|
||||||
pBlock->keyLast = dataColsKeyLast(pDataCols);
|
pBlock->keyLast = dataColsKeyLast(pDataCols);
|
||||||
pBlock->aggrStat = aggrStatus;
|
pBlock->aggrStat = aggrStatus;
|
||||||
|
|
|
@ -321,7 +321,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
|
size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfBSma, (uint32_t)pBlock->blkVer);
|
||||||
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
|
if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;
|
||||||
|
|
||||||
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
|
int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
|
||||||
|
|
|
@ -189,12 +189,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
||||||
#if 0
|
#if 1
|
||||||
|
|
||||||
SSmaCfg vCreateSmaReq = {0};
|
SSmaCfg vCreateSmaReq = {0};
|
||||||
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName,
|
||||||
|
vCreateSmaReq.tSma.indexUid);
|
||||||
|
|
||||||
// record current timezone of server side
|
// record current timezone of server side
|
||||||
tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
|
tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
|
||||||
|
|
|
@ -330,7 +330,6 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||||
|
|
||||||
// step 2: insert data
|
// step 2: insert data
|
||||||
STSmaDataWrapper *pSmaData = NULL;
|
|
||||||
STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
|
STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
|
||||||
STsdbCfg *pCfg = &pTsdb->config;
|
STsdbCfg *pCfg = &pTsdb->config;
|
||||||
|
|
||||||
|
@ -416,6 +415,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
col_id_t numOfCols = 4096;
|
col_id_t numOfCols = 4096;
|
||||||
ASSERT_GT(numOfCols, 0);
|
ASSERT_GT(numOfCols, 0);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
STSmaDataWrapper *pSmaData = NULL;
|
||||||
pSmaData = (STSmaDataWrapper *)buf;
|
pSmaData = (STSmaDataWrapper *)buf;
|
||||||
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
|
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
|
||||||
pSmaData->skey = skey1;
|
pSmaData->skey = skey1;
|
||||||
|
@ -459,9 +460,13 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
|
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
|
||||||
|
|
||||||
ASSERT_GE(bufSize, pSmaData->dataLen);
|
ASSERT_GE(bufSize, pSmaData->dataLen);
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
SSDataBlock *pSmaData = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// step 3: query
|
// step 3: query
|
||||||
uint32_t checkDataCnt = 0;
|
uint32_t checkDataCnt = 0;
|
||||||
|
|
Loading…
Reference in New Issue