TD-34
This commit is contained in:
parent
1579f285ce
commit
b01e8b8d11
|
@ -105,11 +105,25 @@ SDataRow tdDataRowDup(SDataRow row);
|
|||
|
||||
// ----------------- Data column structure
|
||||
typedef struct SDataCol {
|
||||
int64_t len;
|
||||
char data[];
|
||||
int32_t len;
|
||||
void * pData;
|
||||
} SDataCol;
|
||||
|
||||
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter);
|
||||
typedef struct {
|
||||
TSKEY firstKey;
|
||||
TSKEY lastKey;
|
||||
int numOfPoints;
|
||||
int numOfCols;
|
||||
void * buf;
|
||||
SDataCol cols[];
|
||||
} SDataCols;
|
||||
|
||||
#define keyCol(cols) (&((cols)->cols[0])) // Key column
|
||||
|
||||
SDataCols *tdNewDataCols(STSchema *pSchema, int nRows);
|
||||
void tdFreeDataCols(SDataCols *pCols);
|
||||
void tdResetDataCols(SDataCols *pCols);
|
||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -294,14 +294,55 @@ SDataRow tdDataRowDup(SDataRow row) {
|
|||
return trow;
|
||||
}
|
||||
|
||||
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) {
|
||||
int row = *iter;
|
||||
SDataCols *tdNewDataCols(STSchema *pSchema, int nRows) {
|
||||
int nCols = schemaNCols(pSchema);
|
||||
|
||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||
// TODO
|
||||
SDataCols *pInfo = (SDataCols *)calloc(1, sizeof(SDataCols) + sizeof(SDataCol) * nCols);
|
||||
if (pInfo == NULL) return NULL;
|
||||
|
||||
pInfo->numOfCols = nCols;
|
||||
pInfo->firstKey = INT64_MIN;
|
||||
pInfo->lastKey = INT64_MAX;
|
||||
pInfo->buf = malloc(tdMaxRowBytesFromSchema(pSchema) * nRows);
|
||||
if (pInfo->buf == NULL) {
|
||||
free(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
*iter = row + 1;
|
||||
pInfo->cols[0].pData = pInfo->buf;
|
||||
for (int i = 1; i < nCols; i++) {
|
||||
pInfo->cols[i].pData = (char *)(pInfo->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * nRows;
|
||||
}
|
||||
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
void tdFreeDataCols(SDataCols *pCols) {
|
||||
if (pCols) {
|
||||
if (pCols->buf) free(pCols->buf);
|
||||
free(pCols);
|
||||
}
|
||||
}
|
||||
|
||||
void tdResetDataCols(SDataCols *pCols) {
|
||||
pCols->firstKey = INT64_MAX;
|
||||
pCols->lastKey = INT64_MIN;
|
||||
pCols->numOfPoints = 0;
|
||||
for (int i = 0; i < pCols->numOfCols; i++) {
|
||||
pCols->cols[i].len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols, STSchema *pSchema) {
|
||||
TSKEY key = dataRowKey(row);
|
||||
if (pCols->numOfPoints == 0) pCols->firstKey = key;
|
||||
pCols->lastKey = key;
|
||||
for (int i = 0; i < pCols->numOfCols; i++) {
|
||||
SDataCol *pCol = pCols->cols + i;
|
||||
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, colOffset(schemaColAt(pSchema, i))),
|
||||
colBytes(schemaColAt(pSchema, i)));
|
||||
pCol->len += colBytes(schemaColAt(pSchema, i));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -84,7 +84,7 @@ static int tsdbOpenMetaFile(char *tsdbDir);
|
|||
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
|
||||
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
|
||||
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
|
||||
static void * tsdbCommitToFile(void *arg);
|
||||
static void * tsdbCommitData(void *arg);
|
||||
|
||||
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
|
||||
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
|
||||
|
@ -327,7 +327,7 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
|
|||
pRepo->tsdbCache->curBlock = NULL;
|
||||
|
||||
// TODO: here should set as detached or use join for memory leak
|
||||
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
|
||||
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitData, (void *)repo);
|
||||
tsdbUnLockRepo(repo);
|
||||
|
||||
return 0;
|
||||
|
@ -816,7 +816,7 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
|
|||
}
|
||||
|
||||
// Commit to file
|
||||
static void *tsdbCommitToFile(void *arg) {
|
||||
static void *tsdbCommitData(void *arg) {
|
||||
// TODO
|
||||
printf("Starting to commit....\n");
|
||||
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
||||
|
@ -894,4 +894,26 @@ static void *tsdbCommitToFile(void *arg) {
|
|||
tsdbUnLockRepo(arg);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, SSkipListIterator **iters, int fid) {
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
TSKEY minKey = 0, maxKey = 0;
|
||||
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||
|
||||
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
||||
STable *pTable = pMeta->tables[tid];
|
||||
SSkipListIterator *pIter = iters[tid];
|
||||
|
||||
if (pIter == NULL) continue;
|
||||
|
||||
// Read data
|
||||
// while () {
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue