Merge pull request #2951 from taosdata/hotfix/sync_loss_data
Hotfix/sync loss data
This commit is contained in:
commit
a674c324c8
|
@ -292,7 +292,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
|
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
|
||||||
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
|
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
|
||||||
|
|
||||||
#define TSDB_MIN_TOTAL_BLOCKS 2
|
#define TSDB_MIN_TOTAL_BLOCKS 3
|
||||||
#define TSDB_MAX_TOTAL_BLOCKS 10000
|
#define TSDB_MAX_TOTAL_BLOCKS 10000
|
||||||
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
|
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
|
||||||
|
|
||||||
|
|
|
@ -83,7 +83,7 @@ int checkTcpPort(info_s *info) {
|
||||||
struct sockaddr_in serverAddr;
|
struct sockaddr_in serverAddr;
|
||||||
char sendbuf[BUFFER_SIZE];
|
char sendbuf[BUFFER_SIZE];
|
||||||
char recvbuf[BUFFER_SIZE];
|
char recvbuf[BUFFER_SIZE];
|
||||||
int iDataNum;
|
int iDataNum = 0;
|
||||||
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
printf("socket() fail: %s\n", strerror(errno));
|
printf("socket() fail: %s\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -149,7 +149,7 @@ int checkUdpPort(info_s *info) {
|
||||||
struct sockaddr_in serverAddr;
|
struct sockaddr_in serverAddr;
|
||||||
char sendbuf[BUFFER_SIZE];
|
char sendbuf[BUFFER_SIZE];
|
||||||
char recvbuf[BUFFER_SIZE];
|
char recvbuf[BUFFER_SIZE];
|
||||||
int iDataNum;
|
int iDataNum = 0;
|
||||||
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
|
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
|
||||||
perror("socket");
|
perror("socket");
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -149,9 +149,9 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
|
||||||
|
|
||||||
if (cmd == MONITOR_CMD_CREATE_DB) {
|
if (cmd == MONITOR_CMD_CREATE_DB) {
|
||||||
snprintf(sql, SQL_LENGTH,
|
snprintf(sql, SQL_LENGTH,
|
||||||
"create database if not exists %s replica 1 days 10 keep 30 cache 1 "
|
"create database if not exists %s replica 1 days 10 keep 30 cache %d "
|
||||||
"blocks 2 maxtables 16 precision 'us'",
|
"blocks %d maxtables 16 precision 'us'",
|
||||||
tsMonitorDbName);
|
tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS);
|
||||||
} else if (cmd == MONITOR_CMD_CREATE_MT_DN) {
|
} else if (cmd == MONITOR_CMD_CREATE_MT_DN) {
|
||||||
snprintf(sql, SQL_LENGTH,
|
snprintf(sql, SQL_LENGTH,
|
||||||
"create table if not exists %s.dn(ts timestamp"
|
"create table if not exists %s.dn(ts timestamp"
|
||||||
|
|
|
@ -1007,9 +1007,15 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
||||||
|
|
||||||
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
|
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
|
||||||
pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
|
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||||
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
||||||
|
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||||
|
} else {
|
||||||
|
tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||||
|
pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
||||||
|
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = terrno;
|
int32_t code = terrno;
|
||||||
if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
|
if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) {
|
||||||
|
|
|
@ -123,6 +123,7 @@ typedef struct {
|
||||||
int32_t maxTables;
|
int32_t maxTables;
|
||||||
STableData** tData;
|
STableData** tData;
|
||||||
SList* actList;
|
SList* actList;
|
||||||
|
SList* extraBuffList;
|
||||||
SList* bufBlockList;
|
SList* bufBlockList;
|
||||||
} SMemTable;
|
} SMemTable;
|
||||||
|
|
||||||
|
@ -392,6 +393,8 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------ tsdbBuffer.c
|
// ------------------ tsdbBuffer.c
|
||||||
|
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||||
|
|
||||||
STsdbBufPool* tsdbNewBufPool();
|
STsdbBufPool* tsdbNewBufPool();
|
||||||
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
||||||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||||
|
@ -415,7 +418,7 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
if (node == NULL) return NULL;
|
if (node == NULL) return NULL;
|
||||||
|
|
||||||
return SL_GET_NODE_DATA(node);
|
return *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||||
|
@ -425,6 +428,19 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||||
return dataRowKey(row);
|
return dataRowKey(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
|
||||||
|
ASSERT(pRepo != NULL);
|
||||||
|
if (pRepo->mem == NULL) return NULL;
|
||||||
|
|
||||||
|
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
|
||||||
|
if (pNode == NULL) return NULL;
|
||||||
|
|
||||||
|
STsdbBufBlock* pBufBlock = NULL;
|
||||||
|
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
|
||||||
|
|
||||||
|
return pBufBlock;
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------ tsdbFile.c
|
// ------------------ tsdbFile.c
|
||||||
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||||
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
|
#define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
|
||||||
|
@ -523,6 +539,7 @@ char* tsdbGetDataDirName(char* rootDir);
|
||||||
int tsdbGetNextMaxTables(int tid);
|
int tsdbGetNextMaxTables(int tid);
|
||||||
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
||||||
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
||||||
|
int tsdbCheckCommit(STsdbRepo* pRepo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -192,6 +192,8 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
||||||
|
|
||||||
|
if (tsdbCheckCommit(pRepo) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,6 +389,21 @@ int tsdbGetNextMaxTables(int tid) {
|
||||||
return maxTables + 1;
|
return maxTables + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbCheckCommit(STsdbRepo *pRepo) {
|
||||||
|
ASSERT(pRepo->mem != NULL);
|
||||||
|
STsdbCfg *pCfg = &(pRepo->config);
|
||||||
|
|
||||||
|
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
|
ASSERT(pBufBlock != NULL);
|
||||||
|
if ((pRepo->mem->extraBuffList != NULL) ||
|
||||||
|
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
|
||||||
|
// trigger commit
|
||||||
|
if (tsdbAsyncCommit(pRepo) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
|
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
|
||||||
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
|
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
|
||||||
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
|
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||||
|
|
||||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||||
|
@ -45,7 +43,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
SMemTable * pMemTable = pRepo->mem;
|
SMemTable * pMemTable = pRepo->mem;
|
||||||
STableData *pTableData = NULL;
|
STableData *pTableData = NULL;
|
||||||
SSkipList * pSList = NULL;
|
SSkipList * pSList = NULL;
|
||||||
int bytes = 0;
|
|
||||||
|
|
||||||
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||||
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
||||||
|
@ -55,27 +52,39 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
|
|
||||||
tSkipListNewNodeInfo(pSList, &level, &headSize);
|
tSkipListNewNodeInfo(pSList, &level, &headSize);
|
||||||
|
|
||||||
bytes = headSize + dataRowLen(row);
|
SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *));
|
||||||
SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes);
|
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno));
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
|
||||||
|
if (pRow == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
||||||
|
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
|
||||||
|
free(pNode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pNode->level = level;
|
pNode->level = level;
|
||||||
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
dataRowCpy(pRow, row);
|
||||||
|
*(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow;
|
||||||
|
|
||||||
// Operations above may change pRepo->mem, retake those values
|
// Operations above may change pRepo->mem, retake those values
|
||||||
ASSERT(pRepo->mem != NULL);
|
ASSERT(pRepo->mem != NULL);
|
||||||
pMemTable = pRepo->mem;
|
pMemTable = pRepo->mem;
|
||||||
|
|
||||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
|
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
|
||||||
|
tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||||
|
|
||||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||||
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
|
if (pTableData != NULL) {
|
||||||
taosWLockLatch(&(pMemTable->latch));
|
taosWLockLatch(&(pMemTable->latch));
|
||||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||||
tsdbFreeTableData(pTableData);
|
tsdbFreeTableData(pTableData);
|
||||||
|
@ -87,7 +96,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
||||||
" to table %s while create new table data object since %s",
|
" to table %s while create new table data object since %s",
|
||||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
||||||
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
|
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +107,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
||||||
|
|
||||||
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
|
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
|
||||||
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
|
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||||
|
free(pNode);
|
||||||
} else {
|
} else {
|
||||||
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
||||||
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
||||||
|
@ -189,44 +200,59 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
|
||||||
|
|
||||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
STsdbBufBlock *pBufBlock = NULL;
|
||||||
|
void * ptr = NULL;
|
||||||
|
|
||||||
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
|
// Either allocate from buffer blocks or from SYSTEM memory pool
|
||||||
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) { // need to commit mem
|
if (pRepo->mem == NULL) {
|
||||||
if (tsdbAsyncCommit(pRepo) < 0) return NULL;
|
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
|
||||||
} else {
|
if (pMemTable == NULL) return NULL;
|
||||||
|
pRepo->mem = pMemTable;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pRepo->mem != NULL);
|
||||||
|
|
||||||
|
pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
|
if ((pRepo->mem->extraBuffList != NULL) ||
|
||||||
|
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) {
|
||||||
|
// allocate from SYSTEM buffer pool
|
||||||
|
if (pRepo->mem->extraBuffList == NULL) {
|
||||||
|
pRepo->mem->extraBuffList = tdListNew(0);
|
||||||
|
if (pRepo->mem->extraBuffList == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pRepo->mem->extraBuffList != NULL);
|
||||||
|
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
|
||||||
|
if (pNode == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pNode->next = pNode->prev = NULL;
|
||||||
|
tdListAppend(pRepo->mem->extraBuffList, pNode);
|
||||||
|
ptr = (void *)(pNode->data);
|
||||||
|
tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
|
||||||
|
} else { // allocate from TSDB buffer pool
|
||||||
|
if (pBufBlock == NULL || pBufBlock->remain < bytes) {
|
||||||
|
ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3);
|
||||||
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
||||||
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
||||||
tdListAppendNode(pRepo->mem->bufBlockList, pNode);
|
tdListAppendNode(pRepo->mem->bufBlockList, pNode);
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
|
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
|
||||||
}
|
pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
}
|
|
||||||
|
|
||||||
if (pRepo->mem == NULL) {
|
|
||||||
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
|
|
||||||
if (pMemTable == NULL) return NULL;
|
|
||||||
|
|
||||||
if (tsdbLockRepo(pRepo) < 0) {
|
|
||||||
tsdbFreeMemTable(pMemTable);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo);
|
ASSERT(pBufBlock->remain >= bytes);
|
||||||
tdListAppendNode(pMemTable->bufBlockList, pNode);
|
ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
|
||||||
pRepo->mem = pMemTable;
|
pBufBlock->offset += bytes;
|
||||||
|
pBufBlock->remain -= bytes;
|
||||||
if (tsdbUnlockRepo(pRepo) < 0) return NULL;
|
tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||||
|
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||||
}
|
}
|
||||||
|
|
||||||
pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
|
||||||
ASSERT(pBufBlock->remain >= bytes);
|
|
||||||
void *ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset);
|
|
||||||
pBufBlock->offset += bytes;
|
|
||||||
pBufBlock->remain -= bytes;
|
|
||||||
|
|
||||||
tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
|
||||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
|
||||||
|
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,27 +353,23 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------- LOCAL FUNCTIONS ----------------
|
// ---------------- LOCAL FUNCTIONS ----------------
|
||||||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
|
|
||||||
ASSERT(pRepo != NULL);
|
|
||||||
if (pRepo->mem == NULL) return NULL;
|
|
||||||
|
|
||||||
SListNode *pNode = listTail(pRepo->mem->bufBlockList);
|
|
||||||
if (pNode == NULL) return NULL;
|
|
||||||
|
|
||||||
STsdbBufBlock *pBufBlock = NULL;
|
|
||||||
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));
|
|
||||||
|
|
||||||
return pBufBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
||||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
ASSERT(pRepo->mem != NULL);
|
||||||
ASSERT(pBufBlock != NULL);
|
if (pRepo->mem->extraBuffList == NULL) {
|
||||||
pBufBlock->offset -= bytes;
|
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
pBufBlock->remain += bytes;
|
ASSERT(pBufBlock != NULL);
|
||||||
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
pBufBlock->offset -= bytes;
|
||||||
tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
pBufBlock->remain += bytes;
|
||||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
||||||
|
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||||
|
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||||
|
} else {
|
||||||
|
SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -sizeof(SListNode));
|
||||||
|
ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
|
||||||
|
tdListPopNode(pRepo->mem->extraBuffList, pNode);
|
||||||
|
free(pNode);
|
||||||
|
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
|
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
|
||||||
|
@ -396,6 +418,7 @@ static void tsdbFreeMemTable(SMemTable* pMemTable) {
|
||||||
ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
|
ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0));
|
||||||
ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));
|
ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0));
|
||||||
|
|
||||||
|
tdListFree(pMemTable->extraBuffList);
|
||||||
tdListFree(pMemTable->bufBlockList);
|
tdListFree(pMemTable->bufBlockList);
|
||||||
tdListFree(pMemTable->actList);
|
tdListFree(pMemTable->actList);
|
||||||
taosTFree(pMemTable->tData);
|
taosTFree(pMemTable->tData);
|
||||||
|
@ -416,7 +439,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
||||||
pTableData->numOfRows = 0;
|
pTableData->numOfRows = 0;
|
||||||
|
|
||||||
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey);
|
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey);
|
||||||
if (pTableData->pData == NULL) {
|
if (pTableData->pData == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -436,7 +459,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
|
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
|
||||||
|
|
||||||
static void *tsdbCommitData(void *arg) {
|
static void *tsdbCommitData(void *arg) {
|
||||||
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
||||||
|
|
|
@ -120,20 +120,23 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||||
tsdbUnlockRepoMeta(pRepo);
|
tsdbUnlockRepoMeta(pRepo);
|
||||||
|
|
||||||
// Write to memtable action
|
// Write to memtable action
|
||||||
int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0;
|
// TODO: refactor duplicate codes
|
||||||
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table);
|
int tlen = 0;
|
||||||
int tlen = tlen1 + tlen2;
|
void *pBuf = NULL;
|
||||||
void *buf = tsdbAllocBytes(pRepo, tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (newSuper) {
|
if (newSuper) {
|
||||||
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super);
|
tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, super);
|
||||||
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1);
|
pBuf = tsdbAllocBytes(pRepo, tlen);
|
||||||
buf = pBuf;
|
if (pBuf == NULL) goto _err;
|
||||||
|
void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, super);
|
||||||
|
ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen);
|
||||||
}
|
}
|
||||||
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, table);
|
tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table);
|
||||||
|
pBuf = tsdbAllocBytes(pRepo, tlen);
|
||||||
|
if (pBuf == NULL) goto _err;
|
||||||
|
void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, table);
|
||||||
|
ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen);
|
||||||
|
|
||||||
|
if (tsdbCheckCommit(pRepo) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -182,6 +185,8 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
|
||||||
tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
|
tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
|
||||||
free(tbname);
|
free(tbname);
|
||||||
|
|
||||||
|
if (tsdbCheckCommit(pRepo) < 0) goto _err;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -405,6 +410,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
}
|
}
|
||||||
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
|
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
|
||||||
|
|
||||||
|
if (tsdbCheckCommit(pRepo) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -343,7 +343,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
assert(node != NULL);
|
assert(node != NULL);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
|
@ -356,7 +356,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
assert(node != NULL);
|
assert(node != NULL);
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
||||||
|
@ -378,14 +378,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) {
|
||||||
if (pCheckInfo->iter) {
|
if (pCheckInfo->iter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
rmem = SL_GET_NODE_DATA(node);
|
rmem = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iiter) {
|
if (pCheckInfo->iiter) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
rimem = SL_GET_NODE_DATA(node);
|
rimem = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1184,8 +1184,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
* copy them all to result buffer, since it may be overlapped with file data block.
|
* copy them all to result buffer, since it may be overlapped with file data block.
|
||||||
*/
|
*/
|
||||||
if (node == NULL ||
|
if (node == NULL ||
|
||||||
((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = tsArray[pos];
|
cur->win.skey = tsArray[pos];
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
|
|
||||||
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
||||||
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
||||||
SSubmitBlk *pBlock = pMsg->blocks;
|
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
|
||||||
pBlock->uid = pInfo->uid;
|
pBlock->uid = pInfo->uid;
|
||||||
pBlock->tid = pInfo->tid;
|
pBlock->tid = pInfo->tid;
|
||||||
pBlock->sversion = pInfo->sversion;
|
pBlock->sversion = pInfo->sversion;
|
||||||
|
|
|
@ -10,7 +10,7 @@ sleep 3000
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
print ======== step1
|
print ======== step1
|
||||||
sql create database db blocks 2
|
sql create database db blocks 3
|
||||||
sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||||
|
|
||||||
$tbPrefix = db.t
|
$tbPrefix = db.t
|
||||||
|
|
|
@ -17,7 +17,7 @@ print ========== prepare data
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 3000
|
sleep 3000
|
||||||
sql connect
|
sql connect
|
||||||
sql create database db blocks 2 cache 1
|
sql create database db blocks 3 cache 1
|
||||||
sql use db
|
sql use db
|
||||||
|
|
||||||
print ========== step1
|
print ========== step1
|
||||||
|
|
|
@ -18,7 +18,7 @@ print ========== prepare data
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 3000
|
sleep 3000
|
||||||
sql connect
|
sql connect
|
||||||
sql create database db blocks 2 cache 1 maxTables $maxTables
|
sql create database db blocks 3 cache 1 maxTables $maxTables
|
||||||
sql use db
|
sql use db
|
||||||
|
|
||||||
print ========== step1
|
print ========== step1
|
||||||
|
|
|
@ -16,7 +16,7 @@ print ========== prepare data
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 3000
|
sleep 3000
|
||||||
sql connect
|
sql connect
|
||||||
sql create database db blocks 2 cache 1 maxTables $maxTables
|
sql create database db blocks 3 cache 1 maxTables $maxTables
|
||||||
sql use db
|
sql use db
|
||||||
|
|
||||||
print ========== step1
|
print ========== step1
|
||||||
|
|
|
@ -26,7 +26,7 @@ sql create dnode $hostname3
|
||||||
system sh/exec.sh -n dnode3 -s start
|
system sh/exec.sh -n dnode3 -s start
|
||||||
|
|
||||||
print ======== step1
|
print ======== step1
|
||||||
sql create database db replica 3 blocks 2
|
sql create database db replica 3 blocks 3
|
||||||
sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||||
|
|
||||||
$tbPrefix = db.t
|
$tbPrefix = db.t
|
||||||
|
|
Loading…
Reference in New Issue