feature<TD-2546>: solve insert time spike problem
This commit is contained in:
parent
39acac5c6b
commit
ab9988c137
|
@ -161,6 +161,11 @@ _err:
|
|||
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
|
||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
|
||||
SMemTable *pIMem = pRepo->imem;
|
||||
tsdbLockRepo(pRepo);
|
||||
pRepo->imem = NULL;
|
||||
tsdbUnlockRepo(pRepo);
|
||||
tsdbUnRefMemTable(pRepo, pIMem);
|
||||
sem_post(&(pRepo->readyToCommit));
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "tsdbMain.h"
|
||||
|
||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||
#define TSDB_MAX_INSERT_BATCH 512
|
||||
|
||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||
|
@ -205,7 +206,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||
if (pRepo->mem == NULL) return 0;
|
||||
|
||||
SMemTable *pIMem = pRepo->imem;
|
||||
ASSERT(pRepo->imem == NULL);
|
||||
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
|
||||
|
@ -220,8 +221,6 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
|||
tsdbScheduleCommit(pRepo);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -606,19 +605,13 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
|
|||
STable * pTable = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
void ** rows = NULL;
|
||||
void * rows[TSDB_MAX_INSERT_BATCH] = {0};
|
||||
int rowCounter = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
|
||||
if (rows == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
|
||||
|
@ -632,9 +625,18 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
|
|||
if (rows[rowCounter] != NULL) {
|
||||
rowCounter++;
|
||||
}
|
||||
|
||||
if (rowCounter == TSDB_MAX_INSERT_BATCH) {
|
||||
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
rowCounter = 0;
|
||||
memset(rows, 0, sizeof(rows));
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
|
||||
if (rowCounter > 0 && tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -642,11 +644,9 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
|
|||
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
|
||||
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
|
||||
|
||||
free(rows);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
free(rows);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue