more progress
This commit is contained in:
parent
c7c47788f3
commit
de32cab384
|
@ -36,6 +36,7 @@ target_sources(
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
"src/tsdb/tsdbCommit.c"
|
"src/tsdb/tsdbCommit.c"
|
||||||
|
"src/tsdb/tsdbCommit2.c"
|
||||||
"src/tsdb/tsdbFile.c"
|
"src/tsdb/tsdbFile.c"
|
||||||
"src/tsdb/tsdbFS.c"
|
"src/tsdb/tsdbFS.c"
|
||||||
"src/tsdb/tsdbOpen.c"
|
"src/tsdb/tsdbOpen.c"
|
||||||
|
|
|
@ -46,6 +46,10 @@ void tsdbMemTableDestroy2(SMemTable *pMemTable);
|
||||||
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
|
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
|
||||||
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
|
|
||||||
|
// tsdbCommit2.c ==============================================================================================
|
||||||
|
int32_t tsdbBegin2(STsdb *pTsdb);
|
||||||
|
int32_t tsdbCommit2(STsdb *pTsdb);
|
||||||
|
|
||||||
// tsdbMemTable ================
|
// tsdbMemTable ================
|
||||||
typedef struct STsdbRow STsdbRow;
|
typedef struct STsdbRow STsdbRow;
|
||||||
typedef struct STbData STbData;
|
typedef struct STbData STbData;
|
||||||
|
@ -877,6 +881,16 @@ struct SDelOp {
|
||||||
SDelOp *pNext;
|
SDelOp *pNext;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SMemTable {
|
||||||
|
STsdb *pTsdb;
|
||||||
|
int32_t nRef;
|
||||||
|
TSDBKEY minKey;
|
||||||
|
TSDBKEY maxKey;
|
||||||
|
int64_t nRows;
|
||||||
|
SArray *aSkmInfo;
|
||||||
|
SArray *aMemData;
|
||||||
|
};
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
|
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||||
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
TSDBKEY *pKey1 = (TSDBKEY *)p1;
|
||||||
TSDBKEY *pKey2 = (TSDBKEY *)p2;
|
TSDBKEY *pKey2 = (TSDBKEY *)p2;
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SMemTable *pMemTable;
|
||||||
|
SArray *aBlkIdx;
|
||||||
|
} SCommitH;
|
||||||
|
|
||||||
|
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb);
|
||||||
|
static int32_t tsdbEndCommit(SCommitH *pCHandle);
|
||||||
|
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid);
|
||||||
|
|
||||||
|
int32_t tsdbBegin2(STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
ASSERT(pTsdb->mem == NULL);
|
||||||
|
code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem);
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbCommit2(STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCommitH ch = {0};
|
||||||
|
|
||||||
|
// start to commit
|
||||||
|
code = tsdbStartCommit(&ch, pTsdb);
|
||||||
|
if (code) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit
|
||||||
|
int32_t sfid; // todo
|
||||||
|
int32_t efid; // todo
|
||||||
|
for (int32_t fid = sfid; fid <= efid; fid++) {
|
||||||
|
code = tsdbCommitToFile(&ch, fid);
|
||||||
|
if (code) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// end commit
|
||||||
|
code = tsdbEndCommit(&ch);
|
||||||
|
if (code) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
// TODO: rollback
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
|
||||||
|
pTsdb->imem = pTsdb->mem;
|
||||||
|
pTsdb->mem = NULL;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbEndCommit(SCommitH *pCHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSKEY fidSKey;
|
||||||
|
TSKEY fidEKey;
|
||||||
|
|
||||||
|
// check if there are data in the time range
|
||||||
|
for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) {
|
||||||
|
/* code */
|
||||||
|
}
|
||||||
|
|
||||||
|
// has data, do commit to file
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -43,14 +43,10 @@ struct SMemData {
|
||||||
SMemSkipList sl;
|
SMemSkipList sl;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SMemTable {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
tb_uid_t uid;
|
||||||
int32_t nRef;
|
STSchema *pTSchema;
|
||||||
TSDBKEY minKey;
|
} SSkmInfo;
|
||||||
TSDBKEY maxKey;
|
|
||||||
int64_t nRows;
|
|
||||||
SArray *pArray; // SArray<SMemData>
|
|
||||||
};
|
|
||||||
|
|
||||||
#define SL_MAX_LEVEL 5
|
#define SL_MAX_LEVEL 5
|
||||||
|
|
||||||
|
@ -85,8 +81,8 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
|
||||||
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
|
||||||
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
|
||||||
pMemTable->nRows = 0;
|
pMemTable->nRows = 0;
|
||||||
pMemTable->pArray = taosArrayInit(512, sizeof(SMemData *));
|
pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *));
|
||||||
if (pMemTable->pArray == NULL) {
|
if (pMemTable->aMemData == NULL) {
|
||||||
taosMemoryFree(pMemTable);
|
taosMemoryFree(pMemTable);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -101,7 +97,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbMemTableDestroy2(SMemTable *pMemTable) {
|
void tsdbMemTableDestroy2(SMemTable *pMemTable) {
|
||||||
taosArrayDestroyEx(pMemTable->pArray, NULL /*TODO*/);
|
taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/);
|
||||||
taosMemoryFree(pMemTable);
|
taosMemoryFree(pMemTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,9 +192,9 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
|
||||||
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
|
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
|
||||||
|
|
||||||
// get
|
// get
|
||||||
idx = taosArraySearchIdx(pMemTable->pArray, &pMemDataT, memDataPCmprFn, TD_GE);
|
idx = taosArraySearchIdx(pMemTable->aMemData, &pMemDataT, memDataPCmprFn, TD_GE);
|
||||||
if (idx >= 0) {
|
if (idx >= 0) {
|
||||||
pMemData = (SMemData *)taosArrayGet(pMemTable->pArray, idx);
|
pMemData = (SMemData *)taosArrayGet(pMemTable->aMemData, idx);
|
||||||
if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit;
|
if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +226,7 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
|
||||||
}
|
}
|
||||||
|
|
||||||
if (idx < 0) idx = 0;
|
if (idx < 0) idx = 0;
|
||||||
if (taosArrayInsert(pMemTable->pArray, idx, &pMemData) == NULL) {
|
if (taosArrayInsert(pMemTable->aMemData, idx, &pMemData) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue