Merge pull request #2198 from taosdata/feature/usagedev
Feature/usagedev
This commit is contained in:
commit
9226ae393e
|
@ -69,7 +69,8 @@ typedef struct {
|
||||||
int version; // version
|
int version; // version
|
||||||
int numOfCols; // Number of columns appended
|
int numOfCols; // Number of columns appended
|
||||||
int tlen; // maximum length of a SDataRow without the header part
|
int tlen; // maximum length of a SDataRow without the header part
|
||||||
int flen; // First part length in a SDataRow after the header part
|
int16_t flen; // First part length in a SDataRow after the header part
|
||||||
|
int16_t vlen; // pure value part length, excluded the overhead
|
||||||
STColumn columns[];
|
STColumn columns[];
|
||||||
} STSchema;
|
} STSchema;
|
||||||
|
|
||||||
|
@ -77,6 +78,7 @@ typedef struct {
|
||||||
#define schemaVersion(s) ((s)->version)
|
#define schemaVersion(s) ((s)->version)
|
||||||
#define schemaTLen(s) ((s)->tlen)
|
#define schemaTLen(s) ((s)->tlen)
|
||||||
#define schemaFLen(s) ((s)->flen)
|
#define schemaFLen(s) ((s)->flen)
|
||||||
|
#define schemaVLen(s) ((s)->vlen)
|
||||||
#define schemaColAt(s, i) ((s)->columns + i)
|
#define schemaColAt(s, i) ((s)->columns + i)
|
||||||
#define tdFreeSchema(s) tfree((s))
|
#define tdFreeSchema(s) tfree((s))
|
||||||
|
|
||||||
|
@ -105,7 +107,8 @@ typedef struct {
|
||||||
int tCols;
|
int tCols;
|
||||||
int nCols;
|
int nCols;
|
||||||
int tlen;
|
int tlen;
|
||||||
int flen;
|
int16_t flen;
|
||||||
|
int16_t vlen;
|
||||||
int version;
|
int version;
|
||||||
STColumn *columns;
|
STColumn *columns;
|
||||||
} STSchemaBuilder;
|
} STSchemaBuilder;
|
||||||
|
|
|
@ -100,6 +100,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
|
||||||
pBuilder->nCols = 0;
|
pBuilder->nCols = 0;
|
||||||
pBuilder->tlen = 0;
|
pBuilder->tlen = 0;
|
||||||
pBuilder->flen = 0;
|
pBuilder->flen = 0;
|
||||||
|
pBuilder->vlen = 0;
|
||||||
pBuilder->version = version;
|
pBuilder->version = version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,10 +125,12 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int3
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
colSetBytes(pCol, bytes);
|
colSetBytes(pCol, bytes);
|
||||||
pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
|
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
|
||||||
|
pBuilder->vlen += bytes - sizeof(VarDataLenT);
|
||||||
} else {
|
} else {
|
||||||
colSetBytes(pCol, TYPE_BYTES[type]);
|
colSetBytes(pCol, TYPE_BYTES[type]);
|
||||||
pBuilder->tlen += TYPE_BYTES[type];
|
pBuilder->tlen += TYPE_BYTES[type];
|
||||||
|
pBuilder->vlen += TYPE_BYTES[type];
|
||||||
}
|
}
|
||||||
|
|
||||||
pBuilder->nCols++;
|
pBuilder->nCols++;
|
||||||
|
@ -150,6 +153,7 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
|
||||||
schemaNCols(pSchema) = pBuilder->nCols;
|
schemaNCols(pSchema) = pBuilder->nCols;
|
||||||
schemaTLen(pSchema) = pBuilder->tlen;
|
schemaTLen(pSchema) = pBuilder->tlen;
|
||||||
schemaFLen(pSchema) = pBuilder->flen;
|
schemaFLen(pSchema) = pBuilder->flen;
|
||||||
|
schemaVLen(pSchema) = pBuilder->vlen;
|
||||||
|
|
||||||
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
|
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,13 @@ typedef struct {
|
||||||
int8_t compression;
|
int8_t compression;
|
||||||
} STsdbCfg;
|
} STsdbCfg;
|
||||||
|
|
||||||
|
// --------- TSDB REPOSITORY USAGE STATISTICS
|
||||||
|
typedef struct {
|
||||||
|
int64_t totalStorage; // total bytes occupie
|
||||||
|
int64_t compStorage;
|
||||||
|
int64_t pointsWritten; // total data points written
|
||||||
|
} STsdbStat;
|
||||||
|
|
||||||
typedef void TsdbRepoT; // use void to hide implementation details from outside
|
typedef void TsdbRepoT; // use void to hide implementation details from outside
|
||||||
|
|
||||||
void tsdbSetDefaultCfg(STsdbCfg *pCfg);
|
void tsdbSetDefaultCfg(STsdbCfg *pCfg);
|
||||||
|
@ -306,6 +313,15 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, uint64_t uid, STableGroupInfo *pGr
|
||||||
*/
|
*/
|
||||||
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
|
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the statistics of repo usage
|
||||||
|
* @param repo. point to the tsdbrepo
|
||||||
|
* @param totalPoints. total data point written
|
||||||
|
* @param totalStorage. total bytes took by the tsdb
|
||||||
|
* @param compStorage. total bytes took by the tsdb after compressed
|
||||||
|
*/
|
||||||
|
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -357,6 +357,8 @@ typedef struct STsdbRepo {
|
||||||
|
|
||||||
STsdbAppH appH;
|
STsdbAppH appH;
|
||||||
|
|
||||||
|
STsdbStat stat;
|
||||||
|
|
||||||
// The meter meta handle of this TSDB repository
|
// The meter meta handle of this TSDB repository
|
||||||
STsdbMeta *tsdbMeta;
|
STsdbMeta *tsdbMeta;
|
||||||
|
|
||||||
|
|
|
@ -953,6 +953,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
||||||
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
int64_t points = 0;
|
||||||
|
|
||||||
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
||||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
||||||
|
@ -964,7 +965,9 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
||||||
|
|
||||||
// Check schema version
|
// Check schema version
|
||||||
int32_t tversion = pBlock->sversion;
|
int32_t tversion = pBlock->sversion;
|
||||||
int16_t nversion = schemaVersion(tsdbGetTableSchema(pMeta, pTable));
|
STSchema * pSchema = tsdbGetTableSchema(pMeta, pTable);
|
||||||
|
ASSERT(pSchema != NULL);
|
||||||
|
int16_t nversion = schemaVersion(pSchema);
|
||||||
if (tversion > nversion) {
|
if (tversion > nversion) {
|
||||||
tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.",
|
tsdbTrace("vgId:%d table:%s tid:%d server schema version %d is older than clien version %d, try to config.",
|
||||||
pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion);
|
pRepo->config.tsdbId, varDataVal(pTable->name), pTable->tableId.tid, nversion, tversion);
|
||||||
|
@ -1014,7 +1017,10 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
(*affectedrows)++;
|
(*affectedrows)++;
|
||||||
|
points++;
|
||||||
}
|
}
|
||||||
|
atomic_fetch_add_64(&(pRepo->stat.pointsWritten), points * (pSchema->numOfCols));
|
||||||
|
atomic_fetch_add_64(&(pRepo->stat.totalStorage), points * pSchema->vlen);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1381,3 +1387,11 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t
|
||||||
|
|
||||||
return magic;
|
return magic;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage){
|
||||||
|
ASSERT(repo != NULL);
|
||||||
|
STsdbRepo * pRepo = repo;
|
||||||
|
*totalPoints = pRepo->stat.pointsWritten;
|
||||||
|
*totalStorage = pRepo->stat.totalStorage;
|
||||||
|
*compStorage = pRepo->stat.compStorage;
|
||||||
|
}
|
|
@ -381,16 +381,18 @@ void *vnodeGetWal(void *pVnode) {
|
||||||
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
|
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING) return;
|
if (pVnode->status == TAOS_VN_STATUS_DELETING) return;
|
||||||
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
|
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
|
||||||
|
int64_t totalStorage, compStorage, pointsWritten = 0;
|
||||||
|
tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
|
||||||
|
|
||||||
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
|
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
|
||||||
pLoad->vgId = htonl(pVnode->vgId);
|
pLoad->vgId = htonl(pVnode->vgId);
|
||||||
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
|
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
|
||||||
pLoad->totalStorage = htobe64(pLoad->totalStorage);
|
pLoad->totalStorage = htobe64(totalStorage);
|
||||||
pLoad->compStorage = htobe64(pLoad->compStorage);
|
pLoad->compStorage = htobe64(compStorage);
|
||||||
pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
|
pLoad->pointsWritten = htobe64(pointsWritten);
|
||||||
pLoad->status = pVnode->status;
|
pLoad->status = pVnode->status;
|
||||||
pLoad->role = pVnode->role;
|
pLoad->role = pVnode->role;
|
||||||
pLoad->replica = pVnode->syncCfg.replica;
|
pLoad->replica = pVnode->syncCfg.replica;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeBuildStatusMsg(void *param) {
|
void vnodeBuildStatusMsg(void *param) {
|
||||||
|
|
Loading…
Reference in New Issue