make skiplist node malloc
This commit is contained in:
parent
a896c60fa5
commit
cae5edeae8
|
@ -415,7 +415,7 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
if (node == NULL) return NULL;
|
if (node == NULL) return NULL;
|
||||||
|
|
||||||
return SL_GET_NODE_DATA(node);
|
return *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||||
|
|
|
@ -45,7 +45,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
SMemTable * pMemTable = pRepo->mem;
|
SMemTable * pMemTable = pRepo->mem;
|
||||||
STableData *pTableData = NULL;
|
STableData *pTableData = NULL;
|
||||||
SSkipList * pSList = NULL;
|
SSkipList * pSList = NULL;
|
||||||
int bytes = 0;
|
|
||||||
|
|
||||||
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||||
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
||||||
|
@ -55,27 +54,39 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
|
|
||||||
tSkipListNewNodeInfo(pSList, &level, &headSize);
|
tSkipListNewNodeInfo(pSList, &level, &headSize);
|
||||||
|
|
||||||
bytes = headSize + dataRowLen(row);
|
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *));
|
||||||
SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes);
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
|
||||||
|
if (pRow == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
||||||
|
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
|
||||||
|
free(pNode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pNode->level = level;
|
pNode->level = level;
|
||||||
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
dataRowCpy(pRow, row);
|
||||||
|
*(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
|
||||||
|
|
||||||
// Operations above may change pRepo->mem, retake those values
|
// Operations above may change pRepo->mem, retake those values
|
||||||
ASSERT(pRepo->mem != NULL);
|
ASSERT(pRepo->mem != NULL);
|
||||||
pMemTable = pRepo->mem;
|
pMemTable = pRepo->mem;
|
||||||
|
|
||||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
|
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
|
||||||
|
tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||||
|
|
||||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||||
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
|
if (pTableData != NULL) {
|
||||||
taosWLockLatch(&(pMemTable->latch));
|
taosWLockLatch(&(pMemTable->latch));
|
||||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||||
tsdbFreeTableData(pTableData);
|
tsdbFreeTableData(pTableData);
|
||||||
|
@ -87,7 +98,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
||||||
" to table %s while create new table data object since %s",
|
" to table %s while create new table data object since %s",
|
||||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
||||||
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
|
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +109,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
||||||
|
|
||||||
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
|
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
|
||||||
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
|
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
} else {
|
} else {
|
||||||
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
||||||
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
||||||
|
@ -416,7 +429,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
||||||
pTableData->numOfRows = 0;
|
pTableData->numOfRows = 0;
|
||||||
|
|
||||||
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey);
|
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey);
|
||||||
if (pTableData->pData == NULL) {
|
if (pTableData->pData == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -436,7 +449,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
|
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
|
||||||
|
|
||||||
static void *tsdbCommitData(void *arg) {
|
static void *tsdbCommitData(void *arg) {
|
||||||
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
||||||
|
|
|
@ -343,7 +343,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
assert(node != NULL);
|
assert(node != NULL);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
|
@ -356,7 +356,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
assert(node != NULL);
|
assert(node != NULL);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
|
@ -378,14 +378,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
|
||||||
if (pCheckInfo->iter) {
|
if (pCheckInfo->iter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
rmem = SL_GET_NODE_DATA(node);
|
rmem = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iiter) {
|
if (pCheckInfo->iiter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
rimem = SL_GET_NODE_DATA(node);
|
rimem = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1184,8 +1184,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
* copy them all to result buffer, since it may be overlapped with file data block.
|
* copy them all to result buffer, since it may be overlapped with file data block.
|
||||||
*/
|
*/
|
||||||
if (node == NULL ||
|
if (node == NULL ||
|
||||||
((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = tsArray[pos];
|
cur->win.skey = tsArray[pos];
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
|
|
||||||
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
||||||
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
||||||
SSubmitBlk *pBlock = pMsg->blocks;
|
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
|
||||||
pBlock->uid = pInfo->uid;
|
pBlock->uid = pInfo->uid;
|
||||||
pBlock->tid = pInfo->tid;
|
pBlock->tid = pInfo->tid;
|
||||||
pBlock->sversion = pInfo->sversion;
|
pBlock->sversion = pInfo->sversion;
|
||||||
|
|
Loading…
Reference in New Issue