diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 0962e7c2cb..a905f9803f 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -208,6 +208,18 @@ typedef struct { } SFileGroupIter; // ------------------ tsdbMain.c +typedef struct { + int32_t totalLen; + int32_t len; + SDataRow row; +} SSubmitBlkIter; + +typedef struct { + int32_t totalLen; + int32_t len; + void * pMsg; +} SSubmitMsgIter; + typedef struct { int8_t state; @@ -430,7 +442,6 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); // ------------------ tsdbMemTable.c -int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 1efde49ed0..58ae2c3ae1 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -516,7 +516,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { SFile file; SFile * pFile = &file; - strncpy(pFile->fname, fname, TSDB_FILENAME_LEN); + strncpy(pFile->fname, fname, TSDB_FILENAME_LEN - 1); pFile->fd = -1; if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a18fa6d0ef..3a3a824afa 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -32,18 +32,6 @@ #define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) -typedef struct { - int32_t totalLen; - int32_t len; - SDataRow row; -} SSubmitBlkIter; - -typedef struct { - int32_t totalLen; - int32_t len; - void * pMsg; -} SSubmitMsgIter; - static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg); static int32_t tsdbUnsetRepoEnv(char *rootDir); @@ -52,20 +40,13 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg); static char * tsdbGetCfgFname(char *rootDir); static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static void tsdbFreeRepo(STsdbRepo *pRepo); -static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); -static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows); -static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); -static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbRestoreInfo(STsdbRepo *pRepo); -static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int keyFGroupCompFunc(const void *key, const void *fgroup); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); -static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo); @@ -177,40 +158,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { tsdbDebug("vgId:%d repository is closed", vgId); } -int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { - STsdbRepo * pRepo = (STsdbRepo *)repo; - SSubmitMsgIter msgIter = {0}; - - if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) { - if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { - tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); - } - return -1; - } - - if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) { - tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - - SSubmitBlk *pBlock = NULL; - int32_t affectedrows = 0; - - TSKEY now = taosGetTimestamp(pRepo->config.precision); - while (true) { - tsdbGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; - if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { - return -1; - } - } - - if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); - - if (tsdbCheckCommit(pRepo) < 0) return -1; - return 0; -} - uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { STsdbRepo *pRepo = (STsdbRepo *)repo; // STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -735,93 +682,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { } } -static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { - if (pMsg == NULL) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - pIter->totalLen = pMsg->length; - pIter->len = 0; - pIter->pMsg = pMsg; - if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; - } - - return 0; -} - -static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) { - STsdbMeta *pMeta = pRepo->tsdbMeta; - int64_t points = 0; - - ASSERT(pBlock->tid < pMeta->maxTables); - STable *pTable = pMeta->tables[pBlock->tid]; - ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); - - SSubmitBlkIter blkIter = {0}; - SDataRow row = NULL; - - TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; - TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; - - tsdbInitSubmitBlkIter(pBlock, &blkIter); - while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { - if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) { - tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 - " maxKey %" PRId64, - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey); - terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - return -1; - } - - if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1; - - (*affectedrows)++; - points++; - } - - STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); - pRepo->stat.pointsWritten += points * schemaNCols(pSchema); - pRepo->stat.totalStorage += points * schemaVLen(pSchema); - - return 0; -} - -static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { - if (pIter->len == 0) { - pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE; - } else { - SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); - } - - if (pIter->len > pIter->totalLen) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - *pPBlock = NULL; - return -1; - } - - *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); - - return 0; -} - -static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SDataRow row = pIter->row; - if (row == NULL) return NULL; - - pIter->len += dataRowLen(row); - if (pIter->len >= pIter->totalLen) { - pIter->row = NULL; - } else { - pIter->row = (char *)row + dataRowLen(row); - } - - return row; -} - static int tsdbRestoreInfo(STsdbRepo *pRepo) { STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; @@ -855,14 +715,6 @@ _err: return -1; } -static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { - if (pBlock->dataLen <= 0) return -1; - pIter->totalLen = pBlock->dataLen; - pIter->len = 0; - pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen); - return 0; -} - static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { int8_t ocompression = pRepo->config.compression; pRepo->config.compression = compression; @@ -959,134 +811,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { return buf; } -static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { - ASSERT(pTable != NULL); - - STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - int sversion = schemaVersion(pSchema); - - if (pBlock->sversion == sversion) { - return 0; - } else { - if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - return -1; - } - } - - if (pBlock->sversion > sversion) { // may need to update table schema - if (pBlock->schemaLen > 0) { - tsdbDebug( - "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); - ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0); - int numOfCols = pBlock->schemaLen / sizeof(STColumn); - STColumn *pTCol = (STColumn *)pBlock->data; - - STSchemaBuilder schemaBuilder = {0}; - if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - return -1; - } - - for (int i = 0; i < numOfCols; i++) { - if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - tdDestroyTSchemaBuilder(&schemaBuilder); - return -1; - } - } - - STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder); - if (pNSchema == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tdDestroyTSchemaBuilder(&schemaBuilder); - return -1; - } - - tdDestroyTSchemaBuilder(&schemaBuilder); - tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true); - } else { - tsdbDebug( - "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); - terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE; - return -1; - } - } else { - ASSERT(pBlock->sversion >= 0); - if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) { - tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), - pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); - } - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - return -1; - } - - return 0; -} - -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { - ASSERT(pMsg != NULL); - STsdbMeta * pMeta = pRepo->tsdbMeta; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk * pBlock = NULL; - - terrno = TSDB_CODE_SUCCESS; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - pBlock->sversion = htonl(pBlock->sversion); - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - - if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - STable *pTable = pMeta->tables[pBlock->tid]; - if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { - if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - continue; - } else { - return -1; - } - } - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} - static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // TODO // STsdbCache *pCache = pRepo->tsdbCache; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 36b57a7dcd..9f9c52222e 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -18,7 +18,6 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 -static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); @@ -32,103 +31,49 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); +static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); +static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); +static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); +static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow); +static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); +static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); +static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); +static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); -// ---------------- INTERNAL FUNCTIONS ---------------- -int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { - STsdbCfg * pCfg = &pRepo->config; - STsdbMeta * pMeta = pRepo->tsdbMeta; - TKEY tkey = dataRowTKey(row); - TSKEY key = dataRowKey(row); - SMemTable * pMemTable = pRepo->mem; - STableData *pTableData = NULL; - bool isRowDelete = TKEY_IS_DELETED(tkey); +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, + TSKEY now); - if (isRowDelete) { - if (!pCfg->update) { - tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; +int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { + STsdbRepo * pRepo = (STsdbRepo *)repo; + SSubmitMsgIter msgIter = {0}; + SSubmitBlk * pBlock = NULL; + int32_t affectedrows = 0; + + if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) { + if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { + tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); } - - if (key > TABLE_LASTKEY(pTable)) { - tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, - REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); - return 0; - } - } - - void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); - if (pRow == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); return -1; } - dataRowCpy(pRow, row); - - // Operations above may change pRepo->mem, retake those values - ASSERT(pRepo->mem != NULL); - pMemTable = pRepo->mem; - - if (TABLE_TID(pTable) >= pMemTable->maxTables) { - if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { - tsdbFreeBytes(pRepo, pRow, dataRowLen(row)); + tsdbInitSubmitMsgIter(pMsg, &msgIter); + while (true) { + tsdbGetSubmitMsgNext(&msgIter, &pBlock); + if (pBlock == NULL) break; + if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) { return -1; } } - pTableData = pMemTable->tData[TABLE_TID(pTable)]; - if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { - if (pTableData != NULL) { - taosWLockLatch(&(pMemTable->latch)); - pMemTable->tData[TABLE_TID(pTable)] = NULL; - tsdbFreeTableData(pTableData); - taosWUnLockLatch(&(pMemTable->latch)); - } - - pTableData = tsdbNewTableData(pCfg, pTable); - if (pTableData == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 - " to table %s while create new table data object since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno)); - tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); - return -1; - } - - pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; - } - - ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); - - int64_t oldSize = SL_SIZE(pTableData->pData); - if (tSkipListPut(pTableData->pData, pRow) == NULL) { - tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row)); - } else { - int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize; - if (isRowDelete) { - if (TABLE_LASTKEY(pTable) == key) { - // TODO: need to update table last key here (may from file) - } - } else { - if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key; - } - - if (pMemTable->keyFirst > key) pMemTable->keyFirst = key; - if (pMemTable->keyLast < key) pMemTable->keyLast = key; - pMemTable->numOfRows += deltaSize; - - if (pTableData->keyFirst > key) pTableData->keyFirst = key; - if (pTableData->keyLast < key) pTableData->keyLast = key; - pTableData->numOfRows += deltaSize; - } - - tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), - isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), - key); + if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; } +// ---------------- INTERNAL FUNCTIONS ---------------- int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { if (pMemTable == NULL) return 0; int ref = T_REF_INC(pMemTable); @@ -152,7 +97,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { } int code = pthread_cond_signal(&pBufPool->poolNotEmpty); if (code != 0) { - tsdbUnlockRepo(pRepo); + if (tsdbUnlockRepo(pRepo) < 0) return -1; tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code)); terrno = TAOS_SYSTEM_ERROR(code); return -1; @@ -189,6 +134,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { } void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) { + tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); + if (pMem != NULL) { taosRUnLockLatch(&(pMem->latch)); tsdbUnRefMemTable(pRepo, pMem); @@ -197,8 +144,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) if (pIMem != NULL) { tsdbUnRefMemTable(pRepo, pIMem); } - - tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); } void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { @@ -230,6 +175,10 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ASSERT(pRepo->mem->extraBuffList != NULL); SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes); if (pNode == NULL) { + if (listNEles(pRepo->mem->extraBuffList) == 0) { + tdListFree(pRepo->mem->extraBuffList); + pRepo->mem->extraBuffList = NULL; + } terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } @@ -260,18 +209,18 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { } int tsdbAsyncCommit(STsdbRepo *pRepo) { + if (pRepo->mem == NULL) return 0; + SMemTable *pIMem = pRepo->imem; - if (pRepo->mem != NULL) { - sem_wait(&(pRepo->readyToCommit)); + sem_wait(&(pRepo->readyToCommit)); - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); - if (tsdbLockRepo(pRepo) < 0) return -1; - pRepo->imem = pRepo->mem; - pRepo->mem = NULL; - tsdbScheduleCommit(pRepo); - if (tsdbUnlockRepo(pRepo) < 0) return -1; - } + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); + if (tsdbLockRepo(pRepo) < 0) return -1; + pRepo->imem = pRepo->mem; + pRepo->mem = NULL; + tsdbScheduleCommit(pRepo); + if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; @@ -469,25 +418,6 @@ _exit: } // ---------------- LOCAL FUNCTIONS ---------------- -static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { - ASSERT(pRepo->mem != NULL); - if (pRepo->mem->extraBuffList == NULL) { - STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); - ASSERT(pBufBlock != NULL); - pBufBlock->offset -= bytes; - pBufBlock->remain += bytes; - ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); - tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, - listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); - } else { - SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode))); - ASSERT(listTail(pRepo->mem->extraBuffList) == pNode); - tdListPopNode(pRepo->mem->extraBuffList, pNode); - free(pNode); - tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes); - } -} - static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -848,4 +778,400 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * } return 0; +} + +static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { + if (pBlock->dataLen <= 0) return -1; + pIter->totalLen = pBlock->dataLen; + pIter->len = 0; + pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen); + return 0; +} + +static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { + SDataRow row = pIter->row; + if (row == NULL) return NULL; + + pIter->len += dataRowLen(row); + if (pIter->len >= pIter->totalLen) { + pIter->row = NULL; + } else { + pIter->row = (char *)row + dataRowLen(row); + } + + return row; +} + +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, + TSKEY now) { + if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) { + tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 + " maxKey %" PRId64 " row key %" PRId64, + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, + dataRowKey(row)); + terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + return -1; + } + + return 0; +} + +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { + ASSERT(pMsg != NULL); + STsdbMeta * pMeta = pRepo->tsdbMeta; + SSubmitMsgIter msgIter = {0}; + SSubmitBlk * pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + SDataRow row = NULL; + TSKEY now = taosGetTimestamp(pRepo->config.precision); + TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; + TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + + if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + STable *pTable = pMeta->tables[pBlock->tid]; + if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + // Check schema version and update schema if needed + if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { + if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + continue; + } else { + return -1; + } + } + + tsdbInitSubmitBlkIter(pBlock, &blkIter); + while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { + if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) { + return -1; + } + } + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) { + STsdbMeta * pMeta = pRepo->tsdbMeta; + int64_t points = 0; + STable * pTable = NULL; + SSubmitBlkIter blkIter = {0}; + SDataRow row = NULL; + void ** rows = NULL; + int rowCounter = 0; + + ASSERT(pBlock->tid < pMeta->maxTables); + pTable = pMeta->tables[pBlock->tid]; + ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); + + rows = (void **)calloc(pBlock->numOfRows, sizeof(void *)); + if (rows == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tsdbInitSubmitBlkIter(pBlock, &blkIter); + while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { + if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) { + tsdbFreeRows(pRepo, rows, rowCounter); + goto _err; + } + + (*affectedrows)++; + points++; + + if (rows[rowCounter] != NULL) { + rowCounter++; + } + } + + if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) { + goto _err; + } + + STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); + pRepo->stat.pointsWritten += points * schemaNCols(pSchema); + pRepo->stat.totalStorage += points * schemaVLen(pSchema); + + free(rows); + return 0; + +_err: + free(rows); + return -1; +} + +static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) { + STsdbCfg * pCfg = &pRepo->config; + TKEY tkey = dataRowTKey(row); + TSKEY key = dataRowKey(row); + bool isRowDelete = TKEY_IS_DELETED(tkey); + + if (isRowDelete) { + if (!pCfg->update) { + tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + if (key > TABLE_LASTKEY(pTable)) { + tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, + REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); + return 0; + } + } + + void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); + if (pRow == NULL) { + tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); + return -1; + } + + dataRowCpy(pRow, row); + ppRow[0] = pRow; + + tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), + isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), + key); + + return 0; +} + +static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { + if (pMsg == NULL) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + pIter->totalLen = pMsg->length; + pIter->len = 0; + pIter->pMsg = pMsg; + if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + return 0; +} + +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + if (pIter->len == 0) { + pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE; + } else { + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); + } + + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + + return 0; +} + +static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { + ASSERT(pTable != NULL); + + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + int sversion = schemaVersion(pSchema); + + if (pBlock->sversion == sversion) { + return 0; + } else { + if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + } + + if (pBlock->sversion > sversion) { // may need to update table schema + if (pBlock->schemaLen > 0) { + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0); + int numOfCols = pBlock->schemaLen / sizeof(STColumn); + STColumn *pTCol = (STColumn *)pBlock->data; + + STSchemaBuilder schemaBuilder = {0}; + if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + return -1; + } + + for (int i = 0; i < numOfCols; i++) { + if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + tdDestroyTSchemaBuilder(&schemaBuilder); + return -1; + } + } + + STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder); + if (pNSchema == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdDestroyTSchemaBuilder(&schemaBuilder); + return -1; + } + + tdDestroyTSchemaBuilder(&schemaBuilder); + tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true); + } else { + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE; + return -1; + } + } else { + ASSERT(pBlock->sversion >= 0); + if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) { + tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), + pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); + } + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + + return 0; +} + +static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) { + if (rowCounter < 1) return 0; + + SMemTable * pMemTable = NULL; + STableData *pTableData = NULL; + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbCfg * pCfg = &(pRepo->config); + + ASSERT(pRepo->mem != NULL); + pMemTable = pRepo->mem; + + if (TABLE_TID(pTable) >= pMemTable->maxTables) { + if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { + tsdbFreeRows(pRepo, rows, rowCounter); + return -1; + } + } + pTableData = pMemTable->tData[TABLE_TID(pTable)]; + + if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) { + if (pTableData != NULL) { + taosWLockLatch(&(pMemTable->latch)); + pMemTable->tData[TABLE_TID(pTable)] = NULL; + tsdbFreeTableData(pTableData); + taosWUnLockLatch(&(pMemTable->latch)); + } + + pTableData = tsdbNewTableData(pCfg, pTable); + if (pTableData == NULL) { + tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno)); + tsdbFreeRows(pRepo, rows, rowCounter); + return -1; + } + + pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; + } + + ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); + + int64_t osize = SL_SIZE(pTableData->pData); + tSkipListPutBatch(pTableData->pData, rows, rowCounter); + int64_t dsize = SL_SIZE(pTableData->pData) - osize; + + if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]); + if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]); + pMemTable->numOfRows += dsize; + + if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]); + if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]); + pTableData->numOfRows += dsize; + + // TODO: impl delete row thing + if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]); + + return 0; +} + +static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { + ASSERT(pRepo->mem != NULL); + STsdbBufPool *pBufPool = pRepo->pPool; + + for (int i = rowCounter - 1; i >= 0; --i) { + SDataRow row = (SDataRow)rows[i]; + int bytes = (int)dataRowLen(row); + + if (pRepo->mem->extraBuffList == NULL) { + STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); + ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes); + + pBufBlock->offset -= bytes; + pBufBlock->remain += bytes; + ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset)); + tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes, + listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain); + + if (pBufBlock->offset == 0) { // return the block to buffer pool + tsdbLockRepo(pRepo); + SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList); + tdListPrependNode(pBufPool->bufBlockList, pNode); + tsdbUnlockRepo(pRepo); + } + } else { + ASSERT(listNEles(pRepo->mem->extraBuffList) > 0); + SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList); + ASSERT(row == pNode->data); + free(pNode); + tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes); + + if (listNEles(pRepo->mem->extraBuffList) == 0) { + tdListFree(pRepo->mem->extraBuffList); + pRepo->mem->extraBuffList = NULL; + } + } + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 98f815f78a..5b65b2185a 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -1595,7 +1595,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, tblkIdx++; } else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) { // Delete the block and do some stuff - ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); + // ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN); if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1; *pCommitIter->pIter = slIter; if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 65b6482520..2c4d1a86ef 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -131,6 +131,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ __sl_key_fn_t fn); void tSkipListDestroy(SSkipList *pSkipList); SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData); +void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata); SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 5ecc8ce119..4ae13bd7e5 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -24,10 +24,12 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListCorrectLevel(SSkipList *pSkipList); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode); -static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData); -static SSkipListNode * tSkipListNewNode(uint8_t level); +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward); +static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData); +static SSkipListNode *tSkipListNewNode(uint8_t level); #define tSkipListFreeNode(n) tfree((n)) +static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward, + bool hasDup); static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); @@ -109,32 +111,87 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { if (pSkipList == NULL || pData == NULL) return NULL; SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; - uint8_t dupMode = SL_DUP_MODE(pSkipList); SSkipListNode *pNode = NULL; tSkipListWLock(pSkipList); bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); - - if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { - if (dupMode == SL_UPDATE_DUP_KEY) { - pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0); - atomic_store_ptr(&(pNode->pData), pData); - } - } else { - pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); - if (pNode != NULL) { - pNode->pData = pData; - - tSkipListDoInsert(pSkipList, backward, pNode); - } - } + pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); tSkipListUnlock(pSkipList); return pNode; } +// Put a batch of data into skiplist. The batch of data must be in ascending order +void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { + SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; + SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + bool hasDup = false; + char * pKey = NULL; + char * pDataKey = NULL; + int compare = 0; + + tSkipListWLock(pSkipList); + + // backward to put the first data + hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]); + tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup); + + for (int level = 0; level < pSkipList->maxLevel; level++) { + forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level); + } + + // forward to put the rest of data + for (int idata = 1; idata < ndata; idata++) { + pDataKey = pSkipList->keyFn(ppData[idata]); + + // Compare max key + pKey = SL_GET_MAX_KEY(pSkipList); + compare = pSkipList->comparFn(pDataKey, pKey); + if (compare > 0) { + for (int i = 0; i < pSkipList->maxLevel; i++) { + forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); + } + + hasDup = false; + } else { + SSkipListNode *px = pSkipList->pHead; + for (int i = pSkipList->maxLevel - 1; i >= 0; --i) { + if (i < pSkipList->level) { + // set new px + if (forward[i] != pSkipList->pHead) { + if (px == pSkipList->pHead || + pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, forward[i]), SL_GET_NODE_KEY(pSkipList, px)) > 0) { + px = forward[i]; + } + } + + SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); + while (p != pSkipList->pTail) { + pKey = SL_GET_NODE_KEY(pSkipList, p); + + compare = pSkipList->comparFn(pKey, pDataKey); + if (compare >= 0) { + if (compare == 0) hasDup = true; + break; + } else { + px = p; + p = SL_NODE_GET_FORWARD_POINTER(px, i); + } + } + } + + forward[i] = px; + } + } + + tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup); + } + + tSkipListUnlock(pSkipList); +} + uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { uint32_t count = 0; @@ -310,22 +367,25 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { } } -static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) { +static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) { for (int32_t i = 0; i < pNode->level; ++i) { - if (i >= pSkipList->level) { - SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; - SL_NODE_GET_BACKWARD_POINTER(pNode, i) = pSkipList->pHead; - SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; - SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; + SSkipListNode *x = direction[i]; + if (isForward) { + SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x; + + SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i); + SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode; + + SL_NODE_GET_FORWARD_POINTER(pNode, i) = next; + SL_NODE_GET_FORWARD_POINTER(x, i) = pNode; } else { - SSkipListNode *x = backward[i]; SL_NODE_GET_FORWARD_POINTER(pNode, i) = x; SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i); SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode; - SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode; SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev; + SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode; } } @@ -377,7 +437,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, char * pDataKey = pSkipList->keyFn(pData); if (pSkipList->size == 0) { - for (int i = 0; i < pSkipList->level; i++) { + for (int i = 0; i < pSkipList->maxLevel; i++) { backward[i] = pSkipList->pTail; } } else { @@ -387,7 +447,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, pKey = SL_GET_MAX_KEY(pSkipList); compare = pSkipList->comparFn(pDataKey, pKey); if (compare >= 0) { - for (int i = 0; i < pSkipList->level; i++) { + for (int i = 0; i < pSkipList->maxLevel; i++) { backward[i] = pSkipList->pTail; } @@ -398,7 +458,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, pKey = SL_GET_MIN_KEY(pSkipList); compare = pSkipList->comparFn(pDataKey, pKey); if (compare < 0) { - for (int i = 0; i < pSkipList->level; i++) { + for (int i = 0; i < pSkipList->maxLevel; i++) { backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i); } @@ -406,18 +466,20 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, } SSkipListNode *px = pSkipList->pTail; - for (int i = pSkipList->level - 1; i >= 0; --i) { - SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i); - while (p != pSkipList->pHead) { - pKey = SL_GET_NODE_KEY(pSkipList, p); + for (int i = pSkipList->maxLevel - 1; i >= 0; --i) { + if (i < pSkipList->level) { + SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i); + while (p != pSkipList->pHead) { + pKey = SL_GET_NODE_KEY(pSkipList, p); - compare = pSkipList->comparFn(pKey, pDataKey); - if (compare <= 0) { - if (compare == 0 && !hasDupKey) hasDupKey = true; - break; - } else { - px = p; - p = SL_NODE_GET_BACKWARD_POINTER(px, i); + compare = pSkipList->comparFn(pKey, pDataKey); + if (compare <= 0) { + if (compare == 0 && !hasDupKey) hasDupKey = true; + break; + } else { + px = p; + p = SL_NODE_GET_BACKWARD_POINTER(px, i); + } } } @@ -579,6 +641,32 @@ static SSkipListNode *tSkipListNewNode(uint8_t level) { return pNode; } +static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward, + bool hasDup) { + uint8_t dupMode = SL_DUP_MODE(pSkipList); + SSkipListNode *pNode = NULL; + + if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { + if (dupMode == SL_UPDATE_DUP_KEY) { + if (isForward) { + pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0); + } else { + pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0); + } + atomic_store_ptr(&(pNode->pData), pData); + } + } else { + pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); + if (pNode != NULL) { + pNode->pData = pData; + + tSkipListDoInsert(pSkipList, direction, pNode, isForward); + } + } + + return pNode; +} + // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // int32_t cond, SSkipListNode ***pRes) { // pthread_rwlock_rdlock(&pSkipList->lock);