diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0150887813..b463d71f4b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -194,10 +194,10 @@ typedef struct { void* pMsg; } SSubmitMsgIter; -int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); -int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int tsdbInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); -SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter* pIter); +int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +SMemRow tGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 818245d8b8..10eecc1b77 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,7 +27,7 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" -int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { +int tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; @@ -44,7 +44,7 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { return 0; } -int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { +int tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { if (pIter->len == 0) { pIter->len += sizeof(SSubmitMsg); } else { @@ -63,7 +63,7 @@ int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { return 0; } -int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { +int tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pBlock->dataLen <= 0) return -1; pIter->totalLen = pBlock->dataLen; pIter->len = 0; @@ -71,18 +71,18 @@ int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { return 0; } -SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SMemRow row = pIter->row; // firstly, get current row - if (row == NULL) return NULL; +SMemRow tGetSubmitBlkNext(SSubmitBlkIter *pIter) { + SMemRow row = pIter->row; - pIter->len += memRowTLen(row); - if (pIter->len >= pIter->totalLen) { // reach the end - pIter->row = NULL; + if (pIter->len >= pIter->totalLen) { + return NULL; } else { - pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row + pIter->len += memRowTLen(row); + if (pIter->len < pIter->totalLen) { + pIter->row = POINTER_SHIFT(row, memRowTLen(row)); + } + return row; } - - return row; } int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c index d10952e066..1bfba55c86 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c @@ -108,9 +108,9 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, return -1; } - tsdbInitSubmitMsgIter(pMsg, &msgIter); + tInitSubmitMsgIter(pMsg, &msgIter); while (true) { - tsdbGetSubmitMsgNext(&msgIter, &pBlock); + tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; if (tsdbMemTableInsertTbData(pTsdb, pBlock, &affectedrows) < 0) { return -1; @@ -142,9 +142,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { - if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; pBlock->uid = htobe64(pBlock->uid); @@ -206,6 +206,10 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p STsdbMemTable *pMemTable = pTsdb->mem; void * tptr; STbData * pTbData; + SMemRow row; + TSKEY keyMin; + TSKEY keyMax; + // SMemTable *pMemTable = NULL; // STableData *pTableData = NULL; // STsdbCfg *pCfg = &(pRepo->config); @@ -226,46 +230,22 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p pTbData = *(STbData **)tptr; } - tsdbInitSubmitBlkIter(pBlock, &blkIter); + tInitSubmitBlkIter(pBlock, &blkIter); if (blkIter.row == NULL) return 0; - TSKEY firstRowKey = memRowKey(blkIter.row); + keyMin = memRowKey(blkIter.row); - // tsdbAllocBytes(pRepo, 0); - // pMemTable = pRepo->mem; + tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext); - // ASSERT(pMemTable != NULL); - // ASSERT(pBlock->tid < pMeta->maxTables); + // Set statistics + keyMax = memRowKey(blkIter.row); - // pTable = pMeta->tables[pBlock->tid]; + pTbData->nrows += pBlock->numOfRows; + if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin; + if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax; - // ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); - - // if (TABLE_TID(pTable) >= pMemTable->maxTables) { - // if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { - // return -1; - // } - // } - // pTableData = pMemTable->tData[TABLE_TID(pTable)]; - - // if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - // if (pTableData != NULL) { - // taosWLockLatch(&(pMemTable->latch)); - // pMemTable->tData[TABLE_TID(pTable)] = NULL; - // tsdbFreeTableData(pTableData); - // taosWUnLockLatch(&(pMemTable->latch)); - // } - - // pTableData = tsdbNewTableData(pCfg, pTable); - // if (pTableData == NULL) { - // tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), - // TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); - // return -1; - // } - - // pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; - // } - - // ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); + pMemTable->nRow += pBlock->numOfRows; + if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; + if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; // SMemRow lastRow = NULL; // int64_t osize = SL_SIZE(pTableData->pData); @@ -632,45 +612,6 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) { return 0; } -int tsdbAsyncCommit(STsdbRepo *pRepo) { - tsem_wait(&(pRepo->readyToCommit)); - - ASSERT(pRepo->imem == NULL); - if (pRepo->mem == NULL) { - tsem_post(&(pRepo->readyToCommit)); - return 0; - } - - if (pRepo->code != TSDB_CODE_SUCCESS) { - tsdbWarn("vgId:%d try to commit when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno)); - } - - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS); - if (tsdbLockRepo(pRepo) < 0) return -1; - pRepo->imem = pRepo->mem; - pRepo->mem = NULL; - tsdbScheduleCommit(pRepo, COMMIT_REQ); - if (tsdbUnlockRepo(pRepo) < 0) return -1; - - return 0; -} - -int tsdbSyncCommit(STsdbRepo *repo) { - STsdbRepo *pRepo = repo; - - tsdbAsyncCommit(pRepo); - tsem_wait(&(pRepo->readyToCommit)); - tsem_post(&(pRepo->readyToCommit)); - - if (pRepo->code != TSDB_CODE_SUCCESS) { - terrno = pRepo->code; - return -1; - } else { - terrno = TSDB_CODE_SUCCESS; - return 0; - } -} - /** * This is an important function to load data or try to load data from memory skiplist iterator. * @@ -834,28 +775,6 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * return 0; } -static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pBlock->dataLen <= 0) return -1; - pIter->totalLen = pBlock->dataLen; - pIter->len = 0; - pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen); - return 0; -} - -static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SMemRow row = pIter->row; // firstly, get current row - if (row == NULL) return NULL; - - pIter->len += memRowTLen(row); - if (pIter->len >= pIter->totalLen) { // reach the end - pIter->row = NULL; - } else { - pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row - } - - return row; -} - static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now) { TSKEY rowKey = memRowKey(row);