diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index c3a808b765..dcbf96b495 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -292,7 +292,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 -#define TSDB_MIN_TOTAL_BLOCKS 2 +#define TSDB_MIN_TOTAL_BLOCKS 3 #define TSDB_MAX_TOTAL_BLOCKS 10000 #define TSDB_DEFAULT_TOTAL_BLOCKS 6 diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index 4b72c5c859..102f9b9d89 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -83,7 +83,7 @@ int checkTcpPort(info_s *info) { struct sockaddr_in serverAddr; char sendbuf[BUFFER_SIZE]; char recvbuf[BUFFER_SIZE]; - int iDataNum; + int iDataNum = 0; if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("socket() fail: %s\n", strerror(errno)); return -1; @@ -149,7 +149,7 @@ int checkUdpPort(info_s *info) { struct sockaddr_in serverAddr; char sendbuf[BUFFER_SIZE]; char recvbuf[BUFFER_SIZE]; - int iDataNum; + int iDataNum = 0; if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { perror("socket"); return -1; diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index b31fc368af..8bf9277a43 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -149,9 +149,9 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { if (cmd == MONITOR_CMD_CREATE_DB) { snprintf(sql, SQL_LENGTH, - "create database if not exists %s replica 1 days 10 keep 30 cache 1 " - "blocks 2 maxtables 16 precision 'us'", - tsMonitorDbName); + "create database if not exists %s replica 1 days 10 keep 30 cache %d " + "blocks %d maxtables 16 precision 'us'", + tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS); } else if (cmd == MONITOR_CMD_CREATE_MT_DN) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn(ts timestamp" diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 29afe8d2ab..37bf9c3f34 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1007,9 +1007,15 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { terrno = 0; pConn = rpcProcessMsgHead(pRpc, pRecv); - tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", - pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, - pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); + if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { + tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); + } else { + tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); + } int32_t code = terrno; if (code != TSDB_CODE_RPC_ALREADY_PROCESSED) { diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5a06dfa65a..2f90fa3921 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -123,6 +123,7 @@ typedef struct { int32_t maxTables; STableData** tData; SList* actList; + SList* extraBuffList; SList* bufBlockList; } SMemTable; @@ -392,6 +393,8 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } // ------------------ tsdbBuffer.c +#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold + STsdbBufPool* tsdbNewBufPool(); void tsdbFreeBufPool(STsdbBufPool* pBufPool); int tsdbOpenBufPool(STsdbRepo* pRepo); @@ -415,7 +418,7 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { SSkipListNode* node = tSkipListIterGet(pIter); if (node == NULL) return NULL; - return SL_GET_NODE_DATA(node); + return *(SDataRow *)SL_GET_NODE_DATA(node); } static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { @@ -425,6 +428,19 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { return dataRowKey(row); } +static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { + ASSERT(pRepo != NULL); + if (pRepo->mem == NULL) return NULL; + + SListNode* pNode = listTail(pRepo->mem->bufBlockList); + if (pNode == NULL) return NULL; + + STsdbBufBlock* pBufBlock = NULL; + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); + + return pBufBlock; +} + // ------------------ tsdbFile.c #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) @@ -523,6 +539,7 @@ char* tsdbGetDataDirName(char* rootDir); int tsdbGetNextMaxTables(int tid); STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 1486adb65b..46801d0788 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -192,6 +192,8 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * } if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); + + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; } @@ -387,6 +389,21 @@ int tsdbGetNextMaxTables(int tid) { return maxTables + 1; } +int tsdbCheckCommit(STsdbRepo *pRepo) { + ASSERT(pRepo->mem != NULL); + STsdbCfg *pCfg = &(pRepo->config); + + STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + ASSERT(pBufBlock != NULL); + if ((pRepo->mem->extraBuffList != NULL) || + ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { + // trigger commit + if (tsdbAsyncCommit(pRepo) < 0) return -1; + } + + return 0; +} + STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 44f787a8c0..990db76b7e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -18,8 +18,6 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 -static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); - static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); @@ -45,7 +43,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { SMemTable * pMemTable = pRepo->mem; STableData *pTableData = NULL; SSkipList * pSList = NULL; - int bytes = 0; if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL && pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) { @@ -55,27 +52,39 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { tSkipListNewNodeInfo(pSList, &level, &headSize); - bytes = headSize + dataRowLen(row); - SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes); + SSkipListNode *pNode = (SSkipListNode *)malloc(headSize + sizeof(SDataRow *)); if (pNode == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), bytes, tstrerror(terrno)); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + + void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); + if (pRow == NULL) { + tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); + free(pNode); + return -1; + } + pNode->level = level; - dataRowCpy(SL_GET_NODE_DATA(pNode), row); + dataRowCpy(pRow, row); + *(SDataRow *)SL_GET_NODE_DATA(pNode) = pRow; // Operations above may change pRepo->mem, retake those values ASSERT(pRepo->mem != NULL); pMemTable = pRepo->mem; if (TABLE_TID(pTable) >= pMemTable->maxTables) { - if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;; + if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { + tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); + free(pNode); + return -1; + } } pTableData = pMemTable->tData[TABLE_TID(pTable)]; if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem) + if (pTableData != NULL) { taosWLockLatch(&(pMemTable->latch)); pMemTable->tData[TABLE_TID(pTable)] = NULL; tsdbFreeTableData(pTableData); @@ -87,7 +96,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while create new table data object since %s", REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); - tsdbFreeBytes(pRepo, (void *)pNode, bytes); + tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); + free(pNode); return -1; } @@ -97,7 +107,8 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); if (tSkipListPut(pTableData->pData, pNode) == NULL) { - tsdbFreeBytes(pRepo, (void *)pNode, bytes); + tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); + free(pNode); } else { if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; @@ -189,44 +200,59 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + STsdbBufBlock *pBufBlock = NULL; + void * ptr = NULL; - if (pBufBlock != NULL && pBufBlock->remain < bytes) { - if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) { // need to commit mem - if (tsdbAsyncCommit(pRepo) < 0) return NULL; - } else { + // Either allocate from buffer blocks or from SYSTEM memory pool + if (pRepo->mem == NULL) { + SMemTable *pMemTable = tsdbNewMemTable(pRepo); + if (pMemTable == NULL) return NULL; + pRepo->mem = pMemTable; + } + + ASSERT(pRepo->mem != NULL); + + pBufBlock = tsdbGetCurrBufBlock(pRepo); + if ((pRepo->mem->extraBuffList != NULL) || + ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < bytes))) { + // allocate from SYSTEM buffer pool + if (pRepo->mem->extraBuffList == NULL) { + pRepo->mem->extraBuffList = tdListNew(0); + if (pRepo->mem->extraBuffList == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + } + + ASSERT(pRepo->mem->extraBuffList != NULL); + SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + pNode->next = pNode->prev = NULL; + tdListAppend(pRepo->mem->extraBuffList, pNode); + ptr = (void *)(pNode->data); + tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes); + } else { // allocate from TSDB buffer pool + if (pBufBlock == NULL || pBufBlock->remain < bytes) { + ASSERT(listNEles(pRepo->mem->bufBlockList) < pCfg->totalBlocks / 3); if (tsdbLockRepo(pRepo) < 0) return NULL; SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); tdListAppendNode(pRepo->mem->bufBlockList, pNode); if (tsdbUnlockRepo(pRepo) < 0) return NULL; - } - } - - if (pRepo->mem == NULL) { - SMemTable *pMemTable = tsdbNewMemTable(pRepo); - if (pMemTable == NULL) return NULL; - - if (tsdbLockRepo(pRepo) < 0) { - tsdbFreeMemTable(pMemTable); - return NULL; + pBufBlock = tsdbGetCurrBufBlock(pRepo); } - SListNode *pNode = tsdbAllocBufBlockFromPool(pRepo); - tdListAppendNode(pMemTable->bufBlockList, pNode); - pRepo->mem = pMemTable; - - if (tsdbUnlockRepo(pRepo) < 0) return NULL; + ASSERT(pBufBlock->remain >= bytes); + ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset); + pBufBlock->offset += bytes; + pBufBlock->remain -= bytes; + tsdbTrace("vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, + listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); } - pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock->remain >= bytes); - void *ptr = POINTER_SHIFT(pBufBlock->data, pBufBlock->offset); - pBufBlock->offset += bytes; - pBufBlock->remain -= bytes; - - tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); - return ptr; } @@ -327,27 +353,23 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } // ---------------- LOCAL FUNCTIONS ---------------- -static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { - ASSERT(pRepo != NULL); - if (pRepo->mem == NULL) return NULL; - - SListNode *pNode = listTail(pRepo->mem->bufBlockList); - if (pNode == NULL) return NULL; - - STsdbBufBlock *pBufBlock = NULL; - tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock)); - - return pBufBlock; -} - static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock != NULL); - pBufBlock->offset -= bytes; - pBufBlock->remain += bytes; - ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); - tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); + ASSERT(pRepo->mem != NULL); + if (pRepo->mem->extraBuffList == NULL) { + STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + ASSERT(pBufBlock != NULL); + pBufBlock->offset -= bytes; + pBufBlock->remain += bytes; + ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); + tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, + listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); + } else { + SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -sizeof(SListNode)); + ASSERT(listTail(pRepo->mem->extraBuffList) == pNode); + tdListPopNode(pRepo->mem->extraBuffList, pNode); + free(pNode); + tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes); + } } static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { @@ -396,6 +418,7 @@ static void tsdbFreeMemTable(SMemTable* pMemTable) { ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0)); ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0)); + tdListFree(pMemTable->extraBuffList); tdListFree(pMemTable->bufBlockList); tdListFree(pMemTable->actList); taosTFree(pMemTable->tData); @@ -416,7 +439,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->numOfRows = 0; pTableData->pData = tSkipListCreate(TSDB_DATA_SKIPLIST_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 0, tsdbGetTsTupleKey); + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, 1, tsdbGetTsTupleKey); if (pTableData->pData == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -436,7 +459,7 @@ static void tsdbFreeTableData(STableData *pTableData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } +static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static void *tsdbCommitData(void *arg) { STsdbRepo * pRepo = (STsdbRepo *)arg; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 8db6a4a32a..c0d58f6332 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -120,20 +120,23 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { tsdbUnlockRepoMeta(pRepo); // Write to memtable action - int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; - int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); - int tlen = tlen1 + tlen2; - void *buf = tsdbAllocBytes(pRepo, tlen); - if (buf == NULL) { - goto _err; - } - + // TODO: refactor duplicate codes + int tlen = 0; + void *pBuf = NULL; if (newSuper) { - void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, super); - ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1); - buf = pBuf; + tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, super); + pBuf = tsdbAllocBytes(pRepo, tlen); + if (pBuf == NULL) goto _err; + void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, super); + ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen); } - tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, table); + tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, table); + pBuf = tsdbAllocBytes(pRepo, tlen); + if (pBuf == NULL) goto _err; + void *tBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, pBuf, table); + ASSERT(POINTER_DISTANCE(tBuf, pBuf) == tlen); + + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; @@ -182,6 +185,8 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); free(tbname); + if (tsdbCheckCommit(pRepo) < 0) goto _err; + return 0; _err: @@ -405,6 +410,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { } tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); + if (tsdbCheckCommit(pRepo) < 0) return -1; + return 0; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index ccc631fb58..a34216d2fb 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -343,7 +343,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - SDataRow row = SL_GET_NODE_DATA(node); + SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); @@ -356,7 +356,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - SDataRow row = SL_GET_NODE_DATA(node); + SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node); TSKEY key = dataRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo); @@ -378,14 +378,14 @@ SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order) { if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = SL_GET_NODE_DATA(node); + rmem = *(SDataRow *)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = SL_GET_NODE_DATA(node); + rimem = *(SDataRow *)SL_GET_NODE_DATA(node); } } @@ -1184,8 +1184,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { + ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((dataRowKey(*(SDataRow *)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index e504109cec..605586515b 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -35,7 +35,7 @@ static int insertData(SInsertInfo *pInfo) { for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) { memset((void *)pMsg, 0, sizeof(SSubmitMsg)); - SSubmitBlk *pBlock = pMsg->blocks; + SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks; pBlock->uid = pInfo->uid; pBlock->tid = pInfo->tid; pBlock->sversion = pInfo->sversion; diff --git a/tests/script/general/db/delete.sim b/tests/script/general/db/delete.sim index 059ab2e353..e4d40bf5da 100644 --- a/tests/script/general/db/delete.sim +++ b/tests/script/general/db/delete.sim @@ -10,7 +10,7 @@ sleep 3000 sql connect print ======== step1 -sql create database db blocks 2 +sql create database db blocks 3 sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int) $tbPrefix = db.t diff --git a/tests/script/general/db/vnodes.sim b/tests/script/general/db/vnodes.sim index ef505b25c1..b01e94206f 100644 --- a/tests/script/general/db/vnodes.sim +++ b/tests/script/general/db/vnodes.sim @@ -17,7 +17,7 @@ print ========== prepare data system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database db blocks 2 cache 1 +sql create database db blocks 3 cache 1 sql use db print ========== step1 diff --git a/tests/script/unique/big/maxvnodes.sim b/tests/script/unique/big/maxvnodes.sim index a4959aab3f..eb6e0b3b53 100644 --- a/tests/script/unique/big/maxvnodes.sim +++ b/tests/script/unique/big/maxvnodes.sim @@ -18,7 +18,7 @@ print ========== prepare data system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database db blocks 2 cache 1 maxTables $maxTables +sql create database db blocks 3 cache 1 maxTables $maxTables sql use db print ========== step1 diff --git a/tests/script/unique/big/restartSpeed.sim b/tests/script/unique/big/restartSpeed.sim index ef32cec585..9d490fcb8b 100644 --- a/tests/script/unique/big/restartSpeed.sim +++ b/tests/script/unique/big/restartSpeed.sim @@ -16,7 +16,7 @@ print ========== prepare data system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database db blocks 2 cache 1 maxTables $maxTables +sql create database db blocks 3 cache 1 maxTables $maxTables sql use db print ========== step1 diff --git a/tests/script/unique/db/delete.sim b/tests/script/unique/db/delete.sim index 49732cdf91..f89588a8ac 100644 --- a/tests/script/unique/db/delete.sim +++ b/tests/script/unique/db/delete.sim @@ -26,7 +26,7 @@ sql create dnode $hostname3 system sh/exec.sh -n dnode3 -s start print ======== step1 -sql create database db replica 3 blocks 2 +sql create database db replica 3 blocks 3 sql create table db.mt (ts timestamp, tbcol int) TAGS(tgcol int) $tbPrefix = db.t