refact more
This commit is contained in:
parent
705310a901
commit
1c3e3db167
|
@ -36,12 +36,10 @@ target_sources(
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
"src/tsdb/tsdbCommit.c"
|
"src/tsdb/tsdbCommit.c"
|
||||||
# "src/tsdb/tsdbCommit2.c"
|
|
||||||
"src/tsdb/tsdbFile.c"
|
"src/tsdb/tsdbFile.c"
|
||||||
"src/tsdb/tsdbFS.c"
|
"src/tsdb/tsdbFS.c"
|
||||||
"src/tsdb/tsdbOpen.c"
|
"src/tsdb/tsdbOpen.c"
|
||||||
"src/tsdb/tsdbMemTable.c"
|
"src/tsdb/tsdbMemTable.c"
|
||||||
# "src/tsdb/tsdbMemTable2.c"
|
|
||||||
"src/tsdb/tsdbRead.c"
|
"src/tsdb/tsdbRead.c"
|
||||||
"src/tsdb/tsdbReadImpl.c"
|
"src/tsdb/tsdbReadImpl.c"
|
||||||
"src/tsdb/tsdbWrite.c"
|
"src/tsdb/tsdbWrite.c"
|
||||||
|
|
|
@ -58,9 +58,6 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, ST
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
|
bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow);
|
||||||
|
|
||||||
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
|
|
||||||
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
|
||||||
|
|
||||||
// tsdbFile.c ==============================================================================================
|
// tsdbFile.c ==============================================================================================
|
||||||
typedef int32_t TSDB_FILE_T;
|
typedef int32_t TSDB_FILE_T;
|
||||||
typedef struct SDFInfo SDFInfo;
|
typedef struct SDFInfo SDFInfo;
|
||||||
|
|
|
@ -93,6 +93,9 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
|
||||||
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
|
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
|
||||||
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
||||||
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
||||||
|
static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
|
||||||
|
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
|
||||||
|
SMergeInfo *pMergeInfo);
|
||||||
|
|
||||||
int32_t tsdbBegin(STsdb *pTsdb) {
|
int32_t tsdbBegin(STsdb *pTsdb) {
|
||||||
if (!pTsdb) return 0;
|
if (!pTsdb) return 0;
|
||||||
|
@ -1658,4 +1661,171 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
|
||||||
|
bool merge) {
|
||||||
|
if (pCols) {
|
||||||
|
if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
|
||||||
|
*ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
|
||||||
|
if (*ppSchema == NULL) {
|
||||||
|
ASSERT(false);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
|
||||||
|
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
|
||||||
|
SMergeInfo *pMergeInfo) {
|
||||||
|
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
||||||
|
if (pIter == NULL) return 0;
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
TSKEY rowKey = 0;
|
||||||
|
TSKEY fKey = 0;
|
||||||
|
// only fetch lastKey from mem data as file data not used in this function actually
|
||||||
|
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
bool isRowDel = false;
|
||||||
|
int filterIter = 0;
|
||||||
|
STSRow *row = NULL;
|
||||||
|
SMergeInfo mInfo;
|
||||||
|
|
||||||
|
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
|
||||||
|
// query handle)
|
||||||
|
|
||||||
|
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
|
||||||
|
|
||||||
|
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
|
||||||
|
pMergeInfo->keyFirst = INT64_MAX;
|
||||||
|
pMergeInfo->keyLast = INT64_MIN;
|
||||||
|
if (pCols) tdResetDataCols(pCols);
|
||||||
|
|
||||||
|
row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
||||||
|
rowKey = INT64_MAX;
|
||||||
|
isRowDel = false;
|
||||||
|
} else {
|
||||||
|
rowKey = TD_ROW_KEY(row);
|
||||||
|
isRowDel = TD_ROW_IS_DELETED(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filterIter >= nFilterKeys) {
|
||||||
|
fKey = INT64_MAX;
|
||||||
|
} else {
|
||||||
|
fKey = tdGetKey(filterKeys[filterIter]);
|
||||||
|
}
|
||||||
|
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
|
||||||
|
// 2. rowKey - would dup since Multi-Version supported
|
||||||
|
while (true) {
|
||||||
|
if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
|
||||||
|
|
||||||
|
if (fKey < rowKey) {
|
||||||
|
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
|
||||||
|
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
|
||||||
|
|
||||||
|
filterIter++;
|
||||||
|
if (filterIter >= nFilterKeys) {
|
||||||
|
fKey = INT64_MAX;
|
||||||
|
} else {
|
||||||
|
fKey = tdGetKey(filterKeys[filterIter]);
|
||||||
|
}
|
||||||
|
#if 1
|
||||||
|
} else if (fKey > rowKey) {
|
||||||
|
if (isRowDel) {
|
||||||
|
// TODO: support delete function
|
||||||
|
pMergeInfo->rowsDeleteFailed++;
|
||||||
|
} else {
|
||||||
|
if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
|
||||||
|
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
||||||
|
|
||||||
|
if (lastKey != rowKey) {
|
||||||
|
pMergeInfo->rowsInserted++;
|
||||||
|
pMergeInfo->nOperations++;
|
||||||
|
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
||||||
|
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
||||||
|
if (pCols) {
|
||||||
|
if (lastKey != TSKEY_INITIAL_VAL) {
|
||||||
|
++pCols->numOfRows;
|
||||||
|
}
|
||||||
|
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
||||||
|
}
|
||||||
|
lastKey = rowKey;
|
||||||
|
} else {
|
||||||
|
if (keepDup) {
|
||||||
|
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
|
||||||
|
} else {
|
||||||
|
// discard
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbTbDataIterNext(pIter);
|
||||||
|
row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
||||||
|
rowKey = INT64_MAX;
|
||||||
|
isRowDel = false;
|
||||||
|
} else {
|
||||||
|
rowKey = TD_ROW_KEY(row);
|
||||||
|
isRowDel = TD_ROW_IS_DELETED(row);
|
||||||
|
}
|
||||||
|
} else { // fkey == rowKey
|
||||||
|
if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
|
||||||
|
ASSERT(!keepDup);
|
||||||
|
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
||||||
|
pMergeInfo->rowsDeleteSucceed++;
|
||||||
|
pMergeInfo->nOperations++;
|
||||||
|
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
||||||
|
} else {
|
||||||
|
if (keepDup) {
|
||||||
|
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
||||||
|
if (lastKey != rowKey) {
|
||||||
|
pMergeInfo->rowsUpdated++;
|
||||||
|
pMergeInfo->nOperations++;
|
||||||
|
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
||||||
|
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
||||||
|
if (pCols) {
|
||||||
|
if (lastKey != TSKEY_INITIAL_VAL) {
|
||||||
|
++pCols->numOfRows;
|
||||||
|
}
|
||||||
|
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
||||||
|
}
|
||||||
|
lastKey = rowKey;
|
||||||
|
} else {
|
||||||
|
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
|
||||||
|
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbTbDataIterNext(pIter);
|
||||||
|
row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
||||||
|
rowKey = INT64_MAX;
|
||||||
|
isRowDel = false;
|
||||||
|
} else {
|
||||||
|
rowKey = TD_ROW_KEY(row);
|
||||||
|
isRowDel = TD_ROW_IS_DELETED(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
filterIter++;
|
||||||
|
if (filterIter >= nFilterKeys) {
|
||||||
|
fKey = INT64_MAX;
|
||||||
|
} else {
|
||||||
|
fKey = tdGetKey(filterKeys[filterIter]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
|
||||||
|
++pCols->numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
|
@ -1,436 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMemTable2 *pMemTable;
|
|
||||||
int32_t minutes;
|
|
||||||
int8_t precision;
|
|
||||||
TSKEY nCommitKey;
|
|
||||||
int32_t fid;
|
|
||||||
TSKEY minKey;
|
|
||||||
TSKEY maxKey;
|
|
||||||
SReadH readh;
|
|
||||||
SDFileSet wSet;
|
|
||||||
SArray *aBlkIdx;
|
|
||||||
SArray *aSupBlk;
|
|
||||||
SArray *aSubBlk;
|
|
||||||
SArray *aDelInfo;
|
|
||||||
} SCommitH;
|
|
||||||
|
|
||||||
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb);
|
|
||||||
static int32_t tsdbCommitEnd(SCommitH *pCHandle);
|
|
||||||
static int32_t tsdbCommitImpl(SCommitH *pCHandle);
|
|
||||||
|
|
||||||
int32_t tsdbBegin2(STsdb *pTsdb) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
ASSERT(pTsdb->mem == NULL);
|
|
||||||
code = tsdbMemTableCreate2(pTsdb, (SMemTable2 **)&pTsdb->mem);
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbCommit2(STsdb *pTsdb) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SCommitH ch = {0};
|
|
||||||
|
|
||||||
// start to commit
|
|
||||||
code = tsdbCommitStart(&ch, pTsdb);
|
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit
|
|
||||||
code = tsdbCommitImpl(&ch);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// end commit
|
|
||||||
code = tsdbCommitEnd(&ch);
|
|
||||||
if (code) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->mem;
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode));
|
|
||||||
|
|
||||||
// switch to commit
|
|
||||||
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
|
|
||||||
pTsdb->imem = pTsdb->mem;
|
|
||||||
pTsdb->mem = NULL;
|
|
||||||
|
|
||||||
// open handle
|
|
||||||
pCHandle->pMemTable = pMemTable;
|
|
||||||
pCHandle->minutes = pTsdb->keepCfg.days;
|
|
||||||
pCHandle->precision = pTsdb->keepCfg.precision;
|
|
||||||
pCHandle->nCommitKey = pMemTable->minKey.ts;
|
|
||||||
|
|
||||||
code = tsdbInitReadH(&pCHandle->readh, pTsdb);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pCHandle->aBlkIdx = taosArrayInit(0, sizeof(SBlockIdx));
|
|
||||||
if (pCHandle->aBlkIdx == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pCHandle->aSupBlk = taosArrayInit(0, sizeof(SBlock));
|
|
||||||
if (pCHandle->aSupBlk == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pCHandle->aSubBlk = taosArrayInit(0, sizeof(SBlock));
|
|
||||||
if (pCHandle->aSubBlk == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pCHandle->aDelInfo = taosArrayInit(0, sizeof(SDelInfo));
|
|
||||||
if (pCHandle->aDelInfo == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// start FS transaction
|
|
||||||
tsdbStartFSTxn(pTsdb, 0, 0);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitEnd(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
|
|
||||||
SMemTable2 *pMemTable = (SMemTable2 *)pTsdb->imem;
|
|
||||||
|
|
||||||
// end transaction
|
|
||||||
code = tsdbEndFSTxn(pTsdb);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// close handle
|
|
||||||
taosArrayClear(pCHandle->aDelInfo);
|
|
||||||
taosArrayClear(pCHandle->aSubBlk);
|
|
||||||
taosArrayClear(pCHandle->aSupBlk);
|
|
||||||
taosArrayClear(pCHandle->aBlkIdx);
|
|
||||||
tsdbDestroyReadH(&pCHandle->readh);
|
|
||||||
|
|
||||||
// destroy memtable (todo: unref it)
|
|
||||||
pTsdb->imem = NULL;
|
|
||||||
tsdbMemTableDestroy2(pMemTable);
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d commit over", TD_VID(pTsdb->pVnode));
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableStart(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTableEnd(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SMemDataIter iter = {0};
|
|
||||||
|
|
||||||
// commit table start
|
|
||||||
code = tsdbCommitTableStart(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit table impl
|
|
||||||
if (pMemData && pBlockIdx) {
|
|
||||||
// TODO
|
|
||||||
} else if (pMemData) {
|
|
||||||
// TODO
|
|
||||||
} else {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit table end
|
|
||||||
code = tsdbCommitTableEnd(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) {
|
|
||||||
TABLEID *pId1 = (TABLEID *)p1;
|
|
||||||
TABLEID *pId2 = (TABLEID *)p2;
|
|
||||||
|
|
||||||
if (pId1->suid < pId2->suid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pId1->suid > pId2->suid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pId1->uid < pId2->uid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pId1->uid > pId2->uid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileStart(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
|
|
||||||
SDFileSet *pSet = NULL;
|
|
||||||
|
|
||||||
taosArrayClear(pCHandle->aBlkIdx);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFile(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SMemData *pMemData;
|
|
||||||
SBlockIdx *pBlockIdx;
|
|
||||||
int32_t iMemData;
|
|
||||||
int32_t nMemData;
|
|
||||||
int32_t iBlockIdx;
|
|
||||||
int32_t nBlockIdx;
|
|
||||||
|
|
||||||
// commit file start
|
|
||||||
code = tsdbCommitFileStart(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit file impl
|
|
||||||
iMemData = 0;
|
|
||||||
nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
|
|
||||||
iBlockIdx = 0;
|
|
||||||
nBlockIdx = 0; // todo
|
|
||||||
|
|
||||||
for (;;) {
|
|
||||||
if (iMemData >= nMemData && iBlockIdx >= nBlockIdx) break;
|
|
||||||
|
|
||||||
pMemData = NULL;
|
|
||||||
pBlockIdx = NULL;
|
|
||||||
if (iMemData < nMemData) {
|
|
||||||
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
|
|
||||||
}
|
|
||||||
if (iBlockIdx < nBlockIdx) {
|
|
||||||
// pBlockIdx = ;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMemData && pBlockIdx) {
|
|
||||||
int32_t c = tsdbTableIdCmprFn(pMemData, pBlockIdx);
|
|
||||||
if (c < 0) {
|
|
||||||
iMemData++;
|
|
||||||
pBlockIdx = NULL;
|
|
||||||
} else if (c == 0) {
|
|
||||||
iMemData++;
|
|
||||||
iBlockIdx++;
|
|
||||||
} else {
|
|
||||||
iBlockIdx++;
|
|
||||||
pMemData = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pMemData) {
|
|
||||||
iMemData++;
|
|
||||||
} else {
|
|
||||||
iBlockIdx++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbCommitTable(pCHandle, pMemData, pBlockIdx);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit file end
|
|
||||||
code = tsdbCommitFileEnd(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitData(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t fid;
|
|
||||||
|
|
||||||
if (pCHandle->pMemTable->nRows == 0) goto _exit;
|
|
||||||
|
|
||||||
// loop to commit to each file
|
|
||||||
for (;;) {
|
|
||||||
if (pCHandle->nCommitKey == TSKEY_MAX) break;
|
|
||||||
|
|
||||||
pCHandle->fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision);
|
|
||||||
tsdbGetFidKeyRange(pCHandle->minutes, pCHandle->precision, pCHandle->fid, &pCHandle->minKey, &pCHandle->maxKey);
|
|
||||||
code = tsdbCommitFile(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t delInfoCmprFn(const void *p1, const void *p2) {
|
|
||||||
SDelInfo *pDelInfo1 = (SDelInfo *)p1;
|
|
||||||
SDelInfo *pDelInfo2 = (SDelInfo *)p2;
|
|
||||||
|
|
||||||
if (pDelInfo1->suid < pDelInfo2->suid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pDelInfo1->suid > pDelInfo2->suid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDelInfo1->uid < pDelInfo2->uid) {
|
|
||||||
return -1;
|
|
||||||
} else if (pDelInfo1->uid > pDelInfo2->uid) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDelInfo1->version < pDelInfo2->version) {
|
|
||||||
return -1;
|
|
||||||
} else if (pDelInfo1->version > pDelInfo2->version) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static int32_t tsdbCommitDelete(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SDelInfo delInfo;
|
|
||||||
SMemData *pMemData;
|
|
||||||
|
|
||||||
if (pCHandle->pMemTable->nDelOp == 0) goto _exit;
|
|
||||||
|
|
||||||
// load del array (todo)
|
|
||||||
|
|
||||||
// loop to append SDelInfo
|
|
||||||
for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) {
|
|
||||||
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
|
|
||||||
|
|
||||||
for (SDelOp *pDelOp = pMemData->delOpHead; pDelOp; pDelOp = pDelOp->pNext) {
|
|
||||||
delInfo = (SDelInfo){.suid = pMemData->suid,
|
|
||||||
.uid = pMemData->uid,
|
|
||||||
.version = pDelOp->version,
|
|
||||||
.sKey = pDelOp->sKey,
|
|
||||||
.eKey = pDelOp->eKey};
|
|
||||||
if (taosArrayPush(pCHandle->aDelInfo, &delInfo) == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArraySort(pCHandle->aDelInfo, delInfoCmprFn);
|
|
||||||
|
|
||||||
// write to new file
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitCache(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitImpl(SCommitH *pCHandle) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// commit data
|
|
||||||
code = tsdbCommitData(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit delete
|
|
||||||
code = tsdbCommitDelete(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// commit cache if need (todo)
|
|
||||||
if (0) {
|
|
||||||
code = tsdbCommitCache(pCHandle);
|
|
||||||
if (code) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
return code;
|
|
||||||
}
|
|
|
@ -188,23 +188,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
|
|
||||||
bool merge) {
|
|
||||||
if (pCols) {
|
|
||||||
if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
|
|
||||||
*ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
|
|
||||||
if (*ppSchema == NULL) {
|
|
||||||
ASSERT(false);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
|
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -310,166 +293,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This is an important function to load data or try to load data from memory skiplist iterator.
|
|
||||||
*
|
|
||||||
* This function load memory data until:
|
|
||||||
* 1. iterator ends
|
|
||||||
* 2. data key exceeds maxKey
|
|
||||||
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
|
|
||||||
* 4. operations in pCols not exceeds its max capacity if pCols is given
|
|
||||||
*
|
|
||||||
* The function tries to procceed AS MUCH AS POSSIBLE.
|
|
||||||
*/
|
|
||||||
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
|
|
||||||
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
|
|
||||||
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
|
|
||||||
if (pIter == NULL) return 0;
|
|
||||||
STSchema *pSchema = NULL;
|
|
||||||
TSKEY rowKey = 0;
|
|
||||||
TSKEY fKey = 0;
|
|
||||||
// only fetch lastKey from mem data as file data not used in this function actually
|
|
||||||
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
bool isRowDel = false;
|
|
||||||
int filterIter = 0;
|
|
||||||
STSRow *row = NULL;
|
|
||||||
SMergeInfo mInfo;
|
|
||||||
|
|
||||||
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
|
|
||||||
// query handle)
|
|
||||||
|
|
||||||
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
|
|
||||||
|
|
||||||
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
|
|
||||||
pMergeInfo->keyFirst = INT64_MAX;
|
|
||||||
pMergeInfo->keyLast = INT64_MIN;
|
|
||||||
if (pCols) tdResetDataCols(pCols);
|
|
||||||
|
|
||||||
row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
|
||||||
rowKey = INT64_MAX;
|
|
||||||
isRowDel = false;
|
|
||||||
} else {
|
|
||||||
rowKey = TD_ROW_KEY(row);
|
|
||||||
isRowDel = TD_ROW_IS_DELETED(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (filterIter >= nFilterKeys) {
|
|
||||||
fKey = INT64_MAX;
|
|
||||||
} else {
|
|
||||||
fKey = tdGetKey(filterKeys[filterIter]);
|
|
||||||
}
|
|
||||||
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
|
|
||||||
// 2. rowKey - would dup since Multi-Version supported
|
|
||||||
while (true) {
|
|
||||||
if (fKey == INT64_MAX && rowKey == INT64_MAX) break;
|
|
||||||
|
|
||||||
if (fKey < rowKey) {
|
|
||||||
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
|
|
||||||
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
|
|
||||||
|
|
||||||
filterIter++;
|
|
||||||
if (filterIter >= nFilterKeys) {
|
|
||||||
fKey = INT64_MAX;
|
|
||||||
} else {
|
|
||||||
fKey = tdGetKey(filterKeys[filterIter]);
|
|
||||||
}
|
|
||||||
#if 1
|
|
||||||
} else if (fKey > rowKey) {
|
|
||||||
if (isRowDel) {
|
|
||||||
// TODO: support delete function
|
|
||||||
pMergeInfo->rowsDeleteFailed++;
|
|
||||||
} else {
|
|
||||||
if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
|
|
||||||
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
|
||||||
|
|
||||||
if (lastKey != rowKey) {
|
|
||||||
pMergeInfo->rowsInserted++;
|
|
||||||
pMergeInfo->nOperations++;
|
|
||||||
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
|
||||||
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
|
||||||
if (pCols) {
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++pCols->numOfRows;
|
|
||||||
}
|
|
||||||
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
|
||||||
}
|
|
||||||
lastKey = rowKey;
|
|
||||||
} else {
|
|
||||||
if (keepDup) {
|
|
||||||
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
|
|
||||||
} else {
|
|
||||||
// discard
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbTbDataIterNext(pIter);
|
|
||||||
row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
|
||||||
rowKey = INT64_MAX;
|
|
||||||
isRowDel = false;
|
|
||||||
} else {
|
|
||||||
rowKey = TD_ROW_KEY(row);
|
|
||||||
isRowDel = TD_ROW_IS_DELETED(row);
|
|
||||||
}
|
|
||||||
} else { // fkey == rowKey
|
|
||||||
if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
|
|
||||||
ASSERT(!keepDup);
|
|
||||||
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
|
||||||
pMergeInfo->rowsDeleteSucceed++;
|
|
||||||
pMergeInfo->nOperations++;
|
|
||||||
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
|
||||||
} else {
|
|
||||||
if (keepDup) {
|
|
||||||
if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
|
|
||||||
if (lastKey != rowKey) {
|
|
||||||
pMergeInfo->rowsUpdated++;
|
|
||||||
pMergeInfo->nOperations++;
|
|
||||||
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
|
||||||
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
|
||||||
if (pCols) {
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++pCols->numOfRows;
|
|
||||||
}
|
|
||||||
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
|
|
||||||
}
|
|
||||||
lastKey = rowKey;
|
|
||||||
} else {
|
|
||||||
tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
|
|
||||||
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbTbDataIterNext(pIter);
|
|
||||||
row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
|
|
||||||
rowKey = INT64_MAX;
|
|
||||||
isRowDel = false;
|
|
||||||
} else {
|
|
||||||
rowKey = TD_ROW_KEY(row);
|
|
||||||
isRowDel = TD_ROW_IS_DELETED(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
filterIter++;
|
|
||||||
if (filterIter >= nFilterKeys) {
|
|
||||||
fKey = INT64_MAX;
|
|
||||||
} else {
|
|
||||||
fKey = tdGetKey(filterKeys[filterIter]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
|
|
||||||
++pCols->numOfRows;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
|
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t idx = 0;
|
int32_t idx = 0;
|
||||||
|
|
Loading…
Reference in New Issue