more progress

This commit is contained in:
Hongze Cheng 2022-01-05 09:14:20 +00:00
parent 71358c8709
commit e9db1ee7c6
3 changed files with 209 additions and 140 deletions

View File

@ -194,8 +194,10 @@ typedef struct {
void* pMsg;
} SSubmitMsgIter;
int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tsdbInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter* pIter);
typedef struct {
int32_t index; // index of failed block in submit blocks

View File

@ -63,6 +63,28 @@ int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
return 0;
}
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen);
return 0;
}
SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SMemRow row = pIter->row; // firstly, get current row
if (row == NULL) return NULL;
pIter->len += memRowTLen(row);
if (pIter->len >= pIter->totalLen) { // reach the end
pIter->row = NULL;
} else {
pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row
}
return row;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0;

View File

@ -15,9 +15,13 @@
#include "tsdbDef.h"
struct STbData {
tb_uid_t uid;
};
typedef struct STbData {
tb_uid_t uid;
TSKEY keyMin;
TSKEY keyMax;
int64_t nrows;
SSkipList *pData;
} STbData;
struct STsdbMemTable {
T_REF_DECLARE()
@ -28,14 +32,20 @@ struct STsdbMemTable {
SMemAllocator *pMA;
// Container
#if 1
SSkipList *pData; // SSkiplist<STbData>
SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj * pHashIdx;
#else
TD_SLIST(STbData) list;
#endif
};
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg);
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg);
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
static STbData *tsdbNewTbData(tb_uid_t uid);
static void tsdbFreeTbData(STbData *pTbData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char * tsdbTbDataGetUid(const void *arg);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable));
@ -56,9 +66,9 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
}
// Initialize the container
pMemTable->pData =
tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), NULL /*TODO*/, SL_DISCARD_DUP_KEY, NULL /* TODO */);
if (pMemTable->pData == NULL) {
pMemTable->pSlIdx =
tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
if (pMemTable->pSlIdx == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
free(pMemTable);
return NULL;
@ -67,7 +77,7 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pMemTable->pHashIdx == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
tSkipListDestroy(pMemTable->pData);
tSkipListDestroy(pMemTable->pSlIdx);
free(pMemTable);
return NULL;
}
@ -78,7 +88,7 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
if (pMemTable) {
taosHashCleanup(pMemTable->pHashIdx);
tSkipListDestroy(pMemTable->pData);
tSkipListDestroy(pMemTable->pSlIdx);
if (pMemTable->pMA) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
}
@ -102,11 +112,10 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
#if 0
if (tsdbInsertDataToTable(pTsdb, pBlock, &affectedrows) < 0) {
if (tsdbMemTableInsertTbData(pTsdb, pBlock, &affectedrows) < 0) {
return -1;
}
#endif
numOfRows += pBlock->numOfRows;
}
@ -189,6 +198,167 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
return 0;
}
static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// int32_t points = 0;
// STable *pTable = NULL;
SSubmitBlkIter blkIter = {0};
STsdbMemTable *pMemTable = pTsdb->mem;
void * tptr;
STbData * pTbData;
// SMemTable *pMemTable = NULL;
// STableData *pTableData = NULL;
// STsdbCfg *pCfg = &(pRepo->config);
tptr = taosHashGet(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid));
if (tptr == NULL) {
pTbData = tsdbNewTbData(pBlock->uid);
if (pTbData == NULL) {
return -1;
}
// Put into hash
taosHashPut(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid), &(pTbData), sizeof(pTbData));
// Put into skiplist
tSkipListPut(pMemTable->pSlIdx, pTbData);
} else {
pTbData = *(STbData **)tptr;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
if (blkIter.row == NULL) return 0;
TSKEY firstRowKey = memRowKey(blkIter.row);
// tsdbAllocBytes(pRepo, 0);
// pMemTable = pRepo->mem;
// ASSERT(pMemTable != NULL);
// ASSERT(pBlock->tid < pMeta->maxTables);
// pTable = pMeta->tables[pBlock->tid];
// ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
// if (TABLE_TID(pTable) >= pMemTable->maxTables) {
// if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
// return -1;
// }
// }
// pTableData = pMemTable->tData[TABLE_TID(pTable)];
// if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
// if (pTableData != NULL) {
// taosWLockLatch(&(pMemTable->latch));
// pMemTable->tData[TABLE_TID(pTable)] = NULL;
// tsdbFreeTableData(pTableData);
// taosWUnLockLatch(&(pMemTable->latch));
// }
// pTableData = tsdbNewTableData(pCfg, pTable);
// if (pTableData == NULL) {
// tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
// TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
// return -1;
// }
// pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
// }
// ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
// SMemRow lastRow = NULL;
// int64_t osize = SL_SIZE(pTableData->pData);
// tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
// tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
// int64_t dsize = SL_SIZE(pTableData->pData) - osize;
// (*pAffectedRows) += points;
// if(lastRow != NULL) {
// TSKEY lastRowKey = memRowKey(lastRow);
// if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
// pMemTable->numOfRows += dsize;
// if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey;
// pTableData->numOfRows += dsize;
// if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey;
// if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey;
// if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) {
// return -1;
// }
// }
// STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1);
// pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
// pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
static STbData *tsdbNewTbData(tb_uid_t uid) {
STbData *pTbData = (STbData *)calloc(1, sizeof(*pTbData));
if (pTbData == NULL) {
return NULL;
}
pTbData->uid = uid;
pTbData->keyMin = TSKEY_MAX;
pTbData->keyMax = TSKEY_MIN;
pTbData->nrows = 0;
// uint8_t skipListCreateFlags;
// if (pCfg->update == TD_ROW_DISCARD_UPDATE)
// skipListCreateFlags = SL_DISCARD_DUP_KEY;
// else
// skipListCreateFlags = SL_UPDATE_DUP_KEY;
// pTableData->pData =
// tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
// tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey);
// if (pTableData->pData == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// free(pTableData);
// return NULL;
// }
pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
tsdbGetTsTupleKey);
if (pTbData->pData == NULL) {
free(pTbData);
return NULL;
}
return pTbData;
}
static void tsdbFreeTbData(STbData *pTbData) {
if (pTbData) {
tSkipListDestroy(pTbData->pData);
free(pTbData);
}
}
static char *tsdbGetTsTupleKey(const void *data) { return memRowKey((SMemRow)data); }
static int tsdbTbDataComp(const void *arg1, const void *arg2) {
STbData *pTbData1 = (STbData *)arg1;
STbData *pTbData2 = (STbData *)arg2;
if (pTbData1->uid > pTbData2->uid) {
return 1;
} else if (pTbData1->uid == pTbData2->uid) {
return 0;
} else {
return -1;
}
}
static char *tsdbTbDataGetUid(const void *arg) {
STbData *pTbData = (STbData *)arg;
return &(pTbData->uid);
}
/* ------------------------ REFACTORING ------------------------ */
#if 0
int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) {
SMemAllocator *pMA = pMemTable->pMA;
@ -227,7 +397,6 @@ static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
@ -627,51 +796,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
// ---------------- LOCAL FUNCTIONS ----------------
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData));
if (pTableData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pTableData->uid = TABLE_UID(pTable);
pTableData->keyFirst = INT64_MAX;
pTableData->keyLast = 0;
pTableData->numOfRows = 0;
uint8_t skipListCreateFlags;
if(pCfg->update == TD_ROW_DISCARD_UPDATE)
skipListCreateFlags = SL_DISCARD_DUP_KEY;
else
skipListCreateFlags = SL_UPDATE_DUP_KEY;
pTableData->pData =
tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP],
tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey);
if (pTableData->pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pTableData);
return NULL;
}
T_REF_INC(pTableData);
return pTableData;
}
static void tsdbFreeTableData(STableData *pTableData) {
if (pTableData) {
int32_t ref = T_REF_DEC(pTableData);
if (ref == 0) {
tSkipListDestroy(pTableData->pData);
free(pTableData);
}
}
}
static char *tsdbGetTsTupleKey(const void *data) { return memRowKeys((SMemRow)data); }
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables);
@ -824,85 +948,6 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa
pSkipList->insertHandleFn->args[7] = pLastRow;
}
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
int32_t points = 0;
STable *pTable = NULL;
SSubmitBlkIter blkIter = {0};
SMemTable *pMemTable = NULL;
STableData *pTableData = NULL;
STsdbCfg *pCfg = &(pRepo->config);
tsdbInitSubmitBlkIter(pBlock, &blkIter);
if(blkIter.row == NULL) return 0;
TSKEY firstRowKey = memRowKey(blkIter.row);
tsdbAllocBytes(pRepo, 0);
pMemTable = pRepo->mem;
ASSERT(pMemTable != NULL);
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
return -1;
}
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
SMemRow lastRow = NULL;
int64_t osize = SL_SIZE(pTableData->pData);
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
(*pAffectedRows) += points;
if(lastRow != NULL) {
TSKEY lastRowKey = memRowKey(lastRow);
if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey;
pTableData->numOfRows += dsize;
if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey;
if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey;
if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) {
return -1;
}
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {