TD-90
This commit is contained in:
parent
5b0534b4f4
commit
b3a0c4291e
|
@ -248,7 +248,7 @@ void tdResetDataCols(SDataCols *pCols);
|
|||
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||
void tdFreeDataCols(SDataCols *pCols);
|
||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
|
||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
|
||||
|
|
|
@ -265,25 +265,29 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
|
|||
}
|
||||
}
|
||||
|
||||
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
||||
char *ptr = NULL;
|
||||
switch (pCol->type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pCol->len = 0;
|
||||
for (int i = 0; i < nEle; i++) {
|
||||
pCol->dataOff[i] = pCol->len;
|
||||
ptr = (char *)pCol->pData + pCol->len;
|
||||
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
|
||||
setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes);
|
||||
pCol->len += varDataTLen(ptr);
|
||||
}
|
||||
void dataColSetNullAt(SDataCol *pCol, int index) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pCol->dataOff[index] = pCol->len;
|
||||
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
|
||||
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
|
||||
setNull(varDataVal(ptr), pCol->type, pCol->bytes);
|
||||
pCol->len += varDataTLen(ptr);
|
||||
} else {
|
||||
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
|
||||
pCol->len += TYPE_BYTES[pCol->type];
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
||||
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
||||
break;
|
||||
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pCol->len = 0;
|
||||
for (int i = 0; i < nEle; i++) {
|
||||
dataColSetNullAt(pCol, i);
|
||||
}
|
||||
} else {
|
||||
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
||||
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -380,14 +384,32 @@ void tdResetDataCols(SDataCols *pCols) {
|
|||
}
|
||||
}
|
||||
|
||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
|
||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
|
||||
ASSERT(dataColsKeyLast(pCols) < dataRowKey(row));
|
||||
|
||||
for (int i = 0; i < pCols->numOfCols; i++) {
|
||||
SDataCol *pCol = pCols->cols + i;
|
||||
void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
|
||||
int rcol = 0;
|
||||
int dcol = 0;
|
||||
|
||||
dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
|
||||
while (dcol < pCols->numOfCols) {
|
||||
SDataCol *pDataCol = &(pCols->cols[dcol]);
|
||||
if (rcol >= schemaNCols(pSchema)) {
|
||||
dataColSetNullAt(pDataCol, pCols->numOfRows);
|
||||
dcol++;
|
||||
continue;
|
||||
}
|
||||
|
||||
STColumn *pRowCol = schemaColAt(pSchema, rcol);
|
||||
if (pRowCol->colId == pDataCol->colId) {
|
||||
dataColAppendVal(pDataCol, tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset), pCols->numOfRows,
|
||||
pCols->maxPoints);
|
||||
dcol++;
|
||||
rcol++;
|
||||
} else if (pRowCol->colId < pDataCol->colId) {
|
||||
rcol++;
|
||||
} else {
|
||||
dataColSetNullAt(pDataCol, pCols->numOfRows);
|
||||
dcol++;
|
||||
}
|
||||
}
|
||||
pCols->numOfRows++;
|
||||
}
|
||||
|
|
|
@ -123,7 +123,6 @@ typedef struct STableIndexElem {
|
|||
|
||||
STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo);
|
||||
int32_t tsdbFreeMeta(STsdbMeta *pMeta);
|
||||
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
||||
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
|
||||
|
||||
// ---- Operation on STable
|
||||
|
@ -503,14 +502,16 @@ int tsdbWriteCompInfo(SRWHelper *pHelper);
|
|||
int tsdbWriteCompIdx(SRWHelper *pHelper);
|
||||
|
||||
// --------- Other functions need to further organize
|
||||
void tsdbFitRetention(STsdbRepo *pRepo);
|
||||
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
|
||||
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
|
||||
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
|
||||
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg);
|
||||
int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
void tsdbFitRetention(STsdbRepo *pRepo);
|
||||
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
|
||||
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
|
||||
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
|
||||
int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg);
|
||||
int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version);
|
||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
||||
|
||||
#define DEFAULT_TAG_INDEX_COLUMN 0 // skip list built based on the first column of tags
|
||||
|
||||
|
|
|
@ -974,9 +974,10 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||
static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||
ASSERT(maxRowsToRead > 0);
|
||||
if (pIter == NULL) return 0;
|
||||
STSchema *pSchema = NULL;
|
||||
|
||||
int numOfRows = 0;
|
||||
|
||||
|
@ -989,7 +990,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
|||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
|
||||
tdAppendDataRowToDataCol(row, pCols);
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
|
||||
if (pSchema == NULL) {
|
||||
// TODO: deal with the error here
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pCols);
|
||||
numOfRows++;
|
||||
} while (tSkipListIterNext(pIter));
|
||||
|
||||
|
@ -1139,7 +1148,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||
int nLoop = 0;
|
||||
while (true) {
|
||||
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols);
|
||||
assert(rowsRead >= 0);
|
||||
if (pDataCols->numOfRows == 0) break;
|
||||
nLoop++;
|
||||
|
|
|
@ -248,6 +248,32 @@ STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||
if (*(int16_t *)key1 < (*(STSchema **)key2)->version) {
|
||||
return -1;
|
||||
} else if (*(int16_t *)key1 > (*(STSchema **)key2)->version) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version) {
|
||||
STable *pSearchTable = NULL;
|
||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
pSearchTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||
} else {
|
||||
pSearchTable = pTable;
|
||||
}
|
||||
ASSERT(pSearchTable != NULL);
|
||||
|
||||
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
|
||||
tsdbCompareSchemaVersion, TD_EQ);
|
||||
if (ptr == NULL) return NULL;
|
||||
|
||||
return (STSchema *)ptr;
|
||||
}
|
||||
|
||||
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
return pTable->tagSchema;
|
||||
|
@ -392,9 +418,21 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
|
|||
isChanged = true;
|
||||
}
|
||||
|
||||
{
|
||||
// TODO: try to update the data schema
|
||||
STSchema *pTSchema = tsdbGetTableSchema(pMeta, pTable);
|
||||
if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) {
|
||||
if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
|
||||
pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema);
|
||||
} else {
|
||||
ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
|
||||
STSchema *tSchema = tdDupSchema(pCfg->schema);
|
||||
tdFreeSchema(pTable->schema[0]);
|
||||
memmove(pTable->schema, pTable->schema+1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
|
||||
pTable->schema[pTable->numOfSchemas-1] = tSchema;
|
||||
}
|
||||
|
||||
isChanged = true;
|
||||
}
|
||||
|
||||
if (isChanged) {
|
||||
char *buf = malloc(1024 * 1024);
|
||||
int bufLen = 0;
|
||||
|
|
Loading…
Reference in New Issue