refact
This commit is contained in:
parent
0db40add06
commit
381cd7e4d7
|
@ -526,68 +526,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) {
|
|||
pSnapshot->omem = NULL;
|
||||
}
|
||||
|
||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbBufBlock *pBufBlock = NULL;
|
||||
void * ptr = NULL;
|
||||
|
||||
// Either allocate from buffer blocks or from SYSTEM memory pool
|
||||
if (pRepo->mem == NULL) {
|
||||
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
|
||||
if (pMemTable == NULL) return NULL;
|
||||
pRepo->mem = pMemTable;
|
||||
}
|
||||
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
|
||||
pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
if ((pRepo->mem->extraBuffList != NULL) ||
|
||||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) {
|
||||
// allocate from SYSTEM buffer pool
|
||||
if (pRepo->mem->extraBuffList == NULL) {
|
||||
pRepo->mem->extraBuffList = tdListNew(0);
|
||||
if (pRepo->mem->extraBuffList == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pRepo->mem->extraBuffList != NULL);
|
||||
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
|
||||
if (pNode == NULL) {
|
||||
if (listNEles(pRepo->mem->extraBuffList) == 0) {
|
||||
tdListFree(pRepo->mem->extraBuffList);
|
||||
pRepo->mem->extraBuffList = NULL;
|
||||
}
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNode->next = pNode->prev = NULL;
|
||||
tdListAppendNode(pRepo->mem->extraBuffList, pNode);
|
||||
ptr = (void *)(pNode->data);
|
||||
tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
|
||||
} else { // allocate from TSDB buffer pool
|
||||
if (pBufBlock == NULL || pBufBlock->remain < bytes) {
|
||||
ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3);
|
||||
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
||||
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
||||
tdListAppendNode(pRepo->mem->bufBlockList, pNode);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
|
||||
pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
}
|
||||
|
||||
ASSERT(pBufBlock->remain >= bytes);
|
||||
ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
|
||||
pBufBlock->offset += bytes;
|
||||
pBufBlock->remain -= bytes;
|
||||
tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||
}
|
||||
|
||||
return ptr;
|
||||
}
|
||||
|
||||
int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
|
||||
ASSERT(pRepo->config_changed == true);
|
||||
tsem_wait(&(pRepo->readyToCommit));
|
||||
|
@ -736,29 +674,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
|
||||
ASSERT(pMemTable->maxTables < maxTables);
|
||||
|
||||
STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
|
||||
if (pTableData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy((void *)pTableData, (void *)pMemTable->tData, sizeof(STableData *) * pMemTable->maxTables);
|
||||
|
||||
STableData **tData = pMemTable->tData;
|
||||
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->maxTables = maxTables;
|
||||
pMemTable->tData = pTableData;
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
|
||||
tfree(tData);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
|
||||
if (pCols) {
|
||||
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
|
||||
|
@ -867,44 +782,6 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa
|
|||
pSkipList->insertHandleFn->args[7] = pLastRow;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pIter->totalLen = pMsg->length;
|
||||
pIter->len = 0;
|
||||
pIter->pMsg = pMsg;
|
||||
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||
if (pIter->len == 0) {
|
||||
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||
} else {
|
||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||
}
|
||||
|
||||
if (pIter->len > pIter->totalLen) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
*pPBlock = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
|
||||
ASSERT(pTable != NULL);
|
||||
|
||||
|
@ -976,7 +853,6 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
|
||||
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data,
|
||||
memRowVersion(row));
|
||||
|
|
Loading…
Reference in New Issue