From e9db1ee7c64f9fb8f73a949cd6718321a5e88d28 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 5 Jan 2022 09:14:20 +0000 Subject: [PATCH] more progress --- include/common/tmsg.h | 6 +- source/common/src/tmsg.c | 22 ++ source/dnode/vnode/tsdb/src/tsdbMemTable.c | 321 ++++++++++++--------- 3 files changed, 209 insertions(+), 140 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bdd4a93856..0150887813 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -194,8 +194,10 @@ typedef struct { void* pMsg; } SSubmitMsgIter; -int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); -int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int tsdbInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 50aa3267cd..818245d8b8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -63,6 +63,28 @@ int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { return 0; } +int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { + if (pBlock->dataLen <= 0) return -1; + pIter->totalLen = pBlock->dataLen; + pIter->len = 0; + pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen); + return 0; +} + +SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { + SMemRow row = pIter->row; // firstly, get current row + if (row == NULL) return NULL; + + pIter->len += memRowTLen(row); + if (pIter->len >= pIter->totalLen) { // reach the end + pIter->row = NULL; + } else { + pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row + } + + return row; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c index a342a7dd04..d10952e066 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c @@ -15,9 +15,13 @@ #include "tsdbDef.h" -struct STbData { - tb_uid_t uid; -}; +typedef struct STbData { + tb_uid_t uid; + TSKEY keyMin; + TSKEY keyMax; + int64_t nrows; + SSkipList *pData; +} STbData; struct STsdbMemTable { T_REF_DECLARE() @@ -28,14 +32,20 @@ struct STsdbMemTable { SMemAllocator *pMA; // Container #if 1 - SSkipList *pData; // SSkiplist + SSkipList *pSlIdx; // SSkiplist SHashObj * pHashIdx; #else TD_SLIST(STbData) list; #endif }; -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg); +static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg); +static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows); +static STbData *tsdbNewTbData(tb_uid_t uid); +static void tsdbFreeTbData(STbData *pTbData); +static char * tsdbGetTsTupleKey(const void *data); +static int tsdbTbDataComp(const void *arg1, const void *arg2); +static char * tsdbTbDataGetUid(const void *arg); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable)); @@ -56,9 +66,9 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { } // Initialize the container - pMemTable->pData = - tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), NULL /*TODO*/, SL_DISCARD_DUP_KEY, NULL /* TODO */); - if (pMemTable->pData == NULL) { + pMemTable->pSlIdx = + tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid); + if (pMemTable->pSlIdx == NULL) { pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); free(pMemTable); return NULL; @@ -67,7 +77,7 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pMemTable->pHashIdx == NULL) { pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); - tSkipListDestroy(pMemTable->pData); + tSkipListDestroy(pMemTable->pSlIdx); free(pMemTable); return NULL; } @@ -78,7 +88,7 @@ STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); - tSkipListDestroy(pMemTable->pData); + tSkipListDestroy(pMemTable->pSlIdx); if (pMemTable->pMA) { pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA); } @@ -102,11 +112,10 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, while (true) { tsdbGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; -#if 0 - if (tsdbInsertDataToTable(pTsdb, pBlock, &affectedrows) < 0) { + if (tsdbMemTableInsertTbData(pTsdb, pBlock, &affectedrows) < 0) { return -1; } -#endif + numOfRows += pBlock->numOfRows; } @@ -189,6 +198,167 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { return 0; } +static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows) { + // STsdbMeta *pMeta = pRepo->tsdbMeta; + // int32_t points = 0; + // STable *pTable = NULL; + SSubmitBlkIter blkIter = {0}; + STsdbMemTable *pMemTable = pTsdb->mem; + void * tptr; + STbData * pTbData; + // SMemTable *pMemTable = NULL; + // STableData *pTableData = NULL; + // STsdbCfg *pCfg = &(pRepo->config); + + tptr = taosHashGet(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid)); + if (tptr == NULL) { + pTbData = tsdbNewTbData(pBlock->uid); + if (pTbData == NULL) { + return -1; + } + + // Put into hash + taosHashPut(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid), &(pTbData), sizeof(pTbData)); + + // Put into skiplist + tSkipListPut(pMemTable->pSlIdx, pTbData); + } else { + pTbData = *(STbData **)tptr; + } + + tsdbInitSubmitBlkIter(pBlock, &blkIter); + if (blkIter.row == NULL) return 0; + TSKEY firstRowKey = memRowKey(blkIter.row); + + // tsdbAllocBytes(pRepo, 0); + // pMemTable = pRepo->mem; + + // ASSERT(pMemTable != NULL); + // ASSERT(pBlock->tid < pMeta->maxTables); + + // pTable = pMeta->tables[pBlock->tid]; + + // ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); + + // if (TABLE_TID(pTable) >= pMemTable->maxTables) { + // if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { + // return -1; + // } + // } + // pTableData = pMemTable->tData[TABLE_TID(pTable)]; + + // if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { + // if (pTableData != NULL) { + // taosWLockLatch(&(pMemTable->latch)); + // pMemTable->tData[TABLE_TID(pTable)] = NULL; + // tsdbFreeTableData(pTableData); + // taosWUnLockLatch(&(pMemTable->latch)); + // } + + // pTableData = tsdbNewTableData(pCfg, pTable); + // if (pTableData == NULL) { + // tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), + // TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); + // return -1; + // } + + // pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; + // } + + // ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); + + // SMemRow lastRow = NULL; + // int64_t osize = SL_SIZE(pTableData->pData); + // tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); + // tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); + // int64_t dsize = SL_SIZE(pTableData->pData) - osize; + // (*pAffectedRows) += points; + + // if(lastRow != NULL) { + // TSKEY lastRowKey = memRowKey(lastRow); + // if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey; + // pMemTable->numOfRows += dsize; + + // if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey; + // pTableData->numOfRows += dsize; + // if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey; + // if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey; + // if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) { + // return -1; + // } + // } + + // STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1); + // pRepo->stat.pointsWritten += points * schemaNCols(pSchema); + // pRepo->stat.totalStorage += points * schemaVLen(pSchema); + + return 0; +} + +static STbData *tsdbNewTbData(tb_uid_t uid) { + STbData *pTbData = (STbData *)calloc(1, sizeof(*pTbData)); + if (pTbData == NULL) { + return NULL; + } + + pTbData->uid = uid; + pTbData->keyMin = TSKEY_MAX; + pTbData->keyMax = TSKEY_MIN; + pTbData->nrows = 0; + + // uint8_t skipListCreateFlags; + // if (pCfg->update == TD_ROW_DISCARD_UPDATE) + // skipListCreateFlags = SL_DISCARD_DUP_KEY; + // else + // skipListCreateFlags = SL_UPDATE_DUP_KEY; + + // pTableData->pData = + // tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], + // tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey); + // if (pTableData->pData == NULL) { + // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + // free(pTableData); + // return NULL; + // } + + pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY, + tsdbGetTsTupleKey); + if (pTbData->pData == NULL) { + free(pTbData); + return NULL; + } + + return pTbData; +} + +static void tsdbFreeTbData(STbData *pTbData) { + if (pTbData) { + tSkipListDestroy(pTbData->pData); + free(pTbData); + } +} + +static char *tsdbGetTsTupleKey(const void *data) { return memRowKey((SMemRow)data); } + +static int tsdbTbDataComp(const void *arg1, const void *arg2) { + STbData *pTbData1 = (STbData *)arg1; + STbData *pTbData2 = (STbData *)arg2; + + if (pTbData1->uid > pTbData2->uid) { + return 1; + } else if (pTbData1->uid == pTbData2->uid) { + return 0; + } else { + return -1; + } +} + +static char *tsdbTbDataGetUid(const void *arg) { + STbData *pTbData = (STbData *)arg; + return &(pTbData->uid); +} + +/* ------------------------ REFACTORING ------------------------ */ #if 0 int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { SMemAllocator *pMA = pMemTable->pMA; @@ -227,7 +397,6 @@ static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); -static char * tsdbGetTsTupleKey(const void *data); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); @@ -627,51 +796,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey // ---------------- LOCAL FUNCTIONS ---------------- - -static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { - STableData *pTableData = (STableData *)calloc(1, sizeof(*pTableData)); - if (pTableData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - pTableData->uid = TABLE_UID(pTable); - pTableData->keyFirst = INT64_MAX; - pTableData->keyLast = 0; - pTableData->numOfRows = 0; - - uint8_t skipListCreateFlags; - if(pCfg->update == TD_ROW_DISCARD_UPDATE) - skipListCreateFlags = SL_DISCARD_DUP_KEY; - else - skipListCreateFlags = SL_UPDATE_DUP_KEY; - - pTableData->pData = - tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], - tkeyComparFn, skipListCreateFlags, tsdbGetTsTupleKey); - if (pTableData->pData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - free(pTableData); - return NULL; - } - - T_REF_INC(pTableData); - - return pTableData; -} - -static void tsdbFreeTableData(STableData *pTableData) { - if (pTableData) { - int32_t ref = T_REF_DEC(pTableData); - if (ref == 0) { - tSkipListDestroy(pTableData->pData); - free(pTableData); - } - } -} - -static char *tsdbGetTsTupleKey(const void *data) { return memRowKeys((SMemRow)data); } - static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ASSERT(pMemTable->maxTables < maxTables); @@ -824,85 +948,6 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa pSkipList->insertHandleFn->args[7] = pLastRow; } -static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) { - - STsdbMeta *pMeta = pRepo->tsdbMeta; - int32_t points = 0; - STable *pTable = NULL; - SSubmitBlkIter blkIter = {0}; - SMemTable *pMemTable = NULL; - STableData *pTableData = NULL; - STsdbCfg *pCfg = &(pRepo->config); - - tsdbInitSubmitBlkIter(pBlock, &blkIter); - if(blkIter.row == NULL) return 0; - TSKEY firstRowKey = memRowKey(blkIter.row); - - tsdbAllocBytes(pRepo, 0); - pMemTable = pRepo->mem; - - ASSERT(pMemTable != NULL); - ASSERT(pBlock->tid < pMeta->maxTables); - - pTable = pMeta->tables[pBlock->tid]; - - ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); - - - if (TABLE_TID(pTable) >= pMemTable->maxTables) { - if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { - return -1; - } - } - pTableData = pMemTable->tData[TABLE_TID(pTable)]; - - if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - if (pTableData != NULL) { - taosWLockLatch(&(pMemTable->latch)); - pMemTable->tData[TABLE_TID(pTable)] = NULL; - tsdbFreeTableData(pTableData); - taosWUnLockLatch(&(pMemTable->latch)); - } - - pTableData = tsdbNewTableData(pCfg, pTable); - if (pTableData == NULL) { - tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); - return -1; - } - - pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; - } - - ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); - - SMemRow lastRow = NULL; - int64_t osize = SL_SIZE(pTableData->pData); - tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); - tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); - int64_t dsize = SL_SIZE(pTableData->pData) - osize; - (*pAffectedRows) += points; - - if(lastRow != NULL) { - TSKEY lastRowKey = memRowKey(lastRow); - if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey; - pMemTable->numOfRows += dsize; - - if (pTableData->keyFirst > firstRowKey) pTableData->keyFirst = firstRowKey; - pTableData->numOfRows += dsize; - if (pMemTable->keyLast < lastRowKey) pMemTable->keyLast = lastRowKey; - if (pTableData->keyLast < lastRowKey) pTableData->keyLast = lastRowKey; - if (tsdbUpdateTableLatestInfo(pRepo, pTable, lastRow) < 0) { - return -1; - } - } - - STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1); - pRepo->stat.pointsWritten += points * schemaNCols(pSchema); - pRepo->stat.totalStorage += points * schemaVLen(pSchema); - - return 0; -} static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {