From 0c7f46e4861dce053a7a153640b50ccbd0b58869 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 29 Jun 2021 20:10:30 +0800 Subject: [PATCH] utilize SMemRow --- src/client/inc/tscUtil.h | 52 +++++++++ src/client/src/tscUtil.c | 157 ++++++++++++++++++++++++- src/common/inc/tdataformat.h | 219 ++++++++++++++++++++++++++++++----- src/common/src/tdataformat.c | 178 +++++++++++++++++++++++----- src/cq/src/cqMain.c | 10 +- src/inc/ttype.h | 2 +- src/tsdb/inc/tsdbMemTable.h | 12 +- src/tsdb/inc/tsdbMeta.h | 4 +- src/tsdb/src/tsdbCommit.c | 23 ++-- src/tsdb/src/tsdbMain.c | 16 +-- src/tsdb/src/tsdbMemTable.c | 119 +++++++++---------- src/tsdb/src/tsdbRead.c | 83 ++++++------- 12 files changed, 683 insertions(+), 192 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2c4d711520..9d9eacb332 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -339,6 +339,58 @@ char* strdup_throw(const char* str); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); +typedef struct { + // for SDataRow + STSchema* pTSchema; + SSchema* pSchema; + int16_t sversion; + int32_t flen; + // for SKVRow + int16_t tCols; + int16_t nCols; + SColIdx* pColIdx; + uint16_t alloc; + uint16_t size; + void* buf; + + void* pDataBlock; + SSubmitBlk* pSubmitBlk; +} SMemRowBuilder; + +int tdInitMemRowBuilder(SMemRowBuilder* pBuilder); +void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder); +void tdResetMemRowBuilder(SMemRowBuilder* pBuilder); +SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder); + +static FORCE_INLINE int tdAddColToMemRow(SMemRowBuilder* pBuilder, int16_t colId, int8_t type, void* value) { + // TODO + + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + pBuilder->pColIdx = (SColIdx*)realloc((void*)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + } + + pBuilder->pColIdx[pBuilder->nCols].colId = colId; + pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; + + pBuilder->nCols++; + + int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + if (tlen > pBuilder->alloc - pBuilder->size) { + while (tlen > pBuilder->alloc - pBuilder->size) { + pBuilder->alloc *= 2; + } + pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); + if (pBuilder->buf == NULL) return -1; + } + + memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); + pBuilder->size += tlen; + + return 0; +} + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0b9867971a..c7241860bc 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -14,7 +14,7 @@ */ #include "tscUtil.h" -#include "hash.h" +#include "hash.h" #include "os.h" #include "taosmsg.h" #include "texpr.h" @@ -1636,6 +1636,136 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i return TSDB_CODE_SUCCESS; } +int tdInitMemRowBuilder(SMemRowBuilder* pBuilder) { + pBuilder->pSchema = NULL; + pBuilder->sversion = 0; + pBuilder->tCols = 128; + pBuilder->nCols = 0; + pBuilder->pColIdx = (SColIdx*)malloc(sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + pBuilder->alloc = 1024; + pBuilder->size = 0; + pBuilder->buf = malloc(pBuilder->alloc); + if (pBuilder->buf == NULL) { + free(pBuilder->pColIdx); + return -1; + } + return 0; +} + +void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder) { + tfree(pBuilder->pColIdx); + tfree(pBuilder->buf); +} + +void tdResetMemRowBuilder(SMemRowBuilder* pBuilder) { + pBuilder->nCols = 0; + pBuilder->size = 0; +} + +#define KvRowNullColRatio 0.75 // If nullable column ratio larger than 0.75, utilize SKVRow, otherwise SDataRow. +#define KvRowNColsThresh 4096 // default value: 32 + +static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, + uint16_t* nColsNotNull) { + ASSERT(pData != NULL); + if (nCols < KvRowNColsThresh) { + return SMEM_ROW_DATA; + } + int32_t dataRowLen = flen; + int32_t kvRowLen = 0; + + uint16_t nColsNull = 0; + char* p = (char*)pData; + for (int i = 0; i < nCols; ++i) { + if (IS_VAR_DATA_TYPE(pSchema[i].type)) { + dataRowLen += varDataTLen(p); + if (!isNull(p, pSchema[i].type)) { + kvRowLen += sizeof(SColIdx) + varDataTLen(p); + } else { + ++nColsNull; + } + } else { + if (!isNull(p, pSchema[i].type)) { + kvRowLen += sizeof(SColIdx) + varDataTLen(p); + } else { + ++nColsNull; + } + } + + // next column + p += pSchema[i].bytes; + } + + tscDebug("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLen, dataRowLen); + + if (kvRowLen < dataRowLen) { + if (nColsNotNull) { + *nColsNotNull = nCols - nColsNull; + } + return SMEM_ROW_KV; + } + + return SMEM_ROW_DATA; +} + +SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) { + SSchema* pSchema = pBuilder->pSchema; + char* p = (char*)pBuilder->buf; + + uint16_t nColsNotNull = 0; + uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); + tscDebug("prop:memType is %d", memRowType); + + memRowType = SMEM_ROW_DATA; + SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; + memRowSetType(memRow, memRowType); + + if (memRowType == SMEM_ROW_DATA) { + int toffset = 0; + SDataRow trow = (SDataRow)memRowBody(memRow); + dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); + dataRowSetVersion(trow, pBuilder->sversion); + + p = (char*)pBuilder->buf; + for (int32_t j = 0; j < pBuilder->nCols; ++j) { + tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); + toffset += TYPE_BYTES[pSchema[j].type]; + p += pSchema[j].bytes; + } + pBuilder->buf = p; + } else { + uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull + pBuilder->size; + SKVRow row = (SKVRow)pBuilder->pDataBlock; + + kvRowSetNCols(row, nColsNotNull); + kvRowSetLen(row, tlen); + + memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); + } + + pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row + pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow); + + // int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; + // if (tlen == 0) return NULL; + + // tlen += TD_KV_ROW_HEAD_SIZE; + + // SKVRow row = malloc(tlen); + // if (row == NULL) return NULL; + + // kvRowSetNCols(row, pBuilder->nCols); + // kvRowSetLen(row, tlen); + + // memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + // memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); + + return NULL; +} + +// Erase the empty space reserved for binary data static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) { // TODO: optimize this function, handle the case while binary is not presented STableMeta* pTableMeta = pTableDataBlock->pTableMeta; @@ -1675,12 +1805,24 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); pBlock->dataLen = 0; int32_t numOfRows = htons(pBlock->numOfRows); - + + SMemRowBuilder mRowBuilder; + mRowBuilder.pSchema = pSchema; + mRowBuilder.sversion = pTableMeta->sversion; + mRowBuilder.flen = flen; + mRowBuilder.nCols = tinfo.numOfColumns; + mRowBuilder.pDataBlock = pDataBlock; + mRowBuilder.pSubmitBlk = pBlock; + mRowBuilder.buf = p; + mRowBuilder.size = 0; + for (int32_t i = 0; i < numOfRows; ++i) { - SDataRow trow = (SDataRow) pDataBlock; +#if 0 + SDataRow trow = (SDataRow)pDataBlock; // generate each SDataRow one by one dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); dataRowSetVersion(trow, pTableMeta->sversion); + // scan each column data and generate the data row int toffset = 0; for (int32_t j = 0; j < tinfo.numOfColumns; j++) { tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); @@ -1688,8 +1830,10 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo p += pSchema[j].bytes; } - pDataBlock = (char*)pDataBlock + dataRowLen(trow); - pBlock->dataLen += dataRowLen(trow); + pDataBlock = (char*)pDataBlock + dataRowLen(trow); // next SDataRow + pBlock->dataLen += dataRowLen(trow); // SSubmitBlk data length +#endif + tdGetMemRowFromBuilder(&mRowBuilder); } int32_t len = pBlock->dataLen + pBlock->schemaLen; @@ -1701,13 +1845,14 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo static int32_t getRowExpandSize(STableMeta* pTableMeta) { int32_t result = TD_DATA_ROW_HEAD_SIZE; - int32_t columns = tscGetNumOfColumns(pTableMeta); + int32_t columns = tscGetNumOfColumns(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); for(int32_t i = 0; i < columns; i++) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; } } + result += TD_MEM_ROW_TYPE_SIZE; // add len of SMemRow flag return result; } diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 8ee7329156..294d8b2cf8 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -24,6 +24,31 @@ extern "C" { #endif +typedef struct { + VarDataLenT len; + uint8_t data; +} SBinaryNullT; + +typedef struct { + VarDataLenT len; + uint32_t data; +} SNCharNullT; + +extern const uint8_t BoolNull; +extern const uint8_t TinyintNull; +extern const uint16_t SmallintNull; +extern const uint32_t IntNull; +extern const uint64_t BigintNull; +extern const uint64_t TimestampNull; +extern const uint8_t UTinyintNull; +extern const uint16_t USmallintNull; +extern const uint32_t UIntNull; +extern const uint64_t UBigintNull; +extern const uint32_t FloatNull; +extern const uint64_t DoubleNull; +extern const SBinaryNullT BinaryNull; +extern const SNCharNullT NcharNull; + #define STR_TO_VARSTR(x, str) \ do { \ VarDataLenT __len = (VarDataLenT)strlen(str); \ @@ -45,10 +70,10 @@ extern "C" { // ----------------- TSDB COLUMN DEFINITION typedef struct { - int8_t type; // Column type - int16_t colId; // column ID - int16_t bytes; // column bytes - int16_t offset; // point offset in SDataRow after the header part + int8_t type; // Column type + int16_t colId; // column ID + uint16_t bytes; // column bytes + uint16_t offset; // point offset in SDataRow/SKVRow after the header part. } STColumn; #define colType(col) ((col)->type) @@ -159,9 +184,9 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { return 0; } } -// ----------------- Data row structure +// ----------------- Sequential Data row structure -/* A data row, the format is like below: +/* A sequential data row, the format is like below: * |<--------------------+--------------------------- len ---------------------------------->| * |<-- Head -->|<--------- flen -------------->| | * +---------------------+---------------------------------+---------------------------------+ @@ -173,6 +198,18 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { * NOTE: timestamp in this row structure is TKEY instead of TSKEY */ typedef void *SDataRow; +/* A memory data row, the format is like below: + *|---------+---------------------+--------------------------- len ---------------------------------->| + *|<- type->|<-- Head -->|<--------- flen -------------->| | + *|---------+---------------------+---------------------------------+---------------------------------+ + *| uint8_t | uint16_t | int16_t | | | + *|---------+----------+----------+---------------------------------+---------------------------------+ + *| flag | len | sversion | First part | Second part | + *|---------+----------+----------+---------------------------------+---------------------------------+ + * + * NOTE: timestamp in this row structure is TKEY instead of TSKEY + */ +typedef void *SMemRow; #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) @@ -187,13 +224,14 @@ typedef void *SDataRow; #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) #define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r)) -SDataRow tdNewDataRowFromSchema(STSchema *pSchema); -void tdFreeDataRow(SDataRow row); +// SDataRow tdNewDataRowFromSchema(STSchema *pSchema); +// void tdFreeDataRow(SDataRow row); void tdInitDataRow(SDataRow row, STSchema *pSchema); -SDataRow tdDataRowDup(SDataRow row); +// SDataRow tdDataRowDup(SDataRow row); +SMemRow tdMemRowDup(SMemRow row); // offset here not include dataRow header length -static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { +static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t bytes, int32_t offset) { ASSERT(value != NULL); int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); @@ -215,15 +253,6 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i return 0; } -// NOTE: offset here including the header size -static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) { - if (IS_VAR_DATA_TYPE(type)) { - return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); - } else { - return POINTER_SHIFT(row, offset); - } -} - // ----------------- Data column structure typedef struct SDataCol { int8_t type; // column type @@ -237,17 +266,56 @@ typedef struct SDataCol { TSKEY ts; // only used in last NULL column } SDataCol; +#define isAllRowOfColNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); -void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); +void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); +static const void *tdGetNullVal(int8_t type) { + switch (type) { + case TSDB_DATA_TYPE_BOOL: + return &BoolNull; + case TSDB_DATA_TYPE_TINYINT: + return &TinyintNull; + case TSDB_DATA_TYPE_SMALLINT: + return &SmallintNull; + case TSDB_DATA_TYPE_INT: + return &IntNull; + case TSDB_DATA_TYPE_BIGINT: + return &BigintNull; + case TSDB_DATA_TYPE_FLOAT: + return &FloatNull; + case TSDB_DATA_TYPE_DOUBLE: + return &DoubleNull; + case TSDB_DATA_TYPE_BINARY: + return &BinaryNull; + case TSDB_DATA_TYPE_TIMESTAMP: + return &TimestampNull; + case TSDB_DATA_TYPE_NCHAR: + return &NcharNull; + case TSDB_DATA_TYPE_UTINYINT: + return &UTinyintNull; + case TSDB_DATA_TYPE_USMALLINT: + return &USmallintNull; + case TSDB_DATA_TYPE_UINT: + return &UIntNull; + case TSDB_DATA_TYPE_UBIGINT: + return &UBigintNull; + default: + ASSERT(0); + return NULL; + } +} // Get the data pointer from a column-wised data -static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { +static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { + if (isAllRowOfColNull(pCol)) { + return tdGetNullVal(pCol->type); + } if (IS_VAR_DATA_TYPE(pCol->type)) { return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); } else { @@ -279,7 +347,7 @@ typedef struct { } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] +#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) { if (pCols->numOfRows) { @@ -318,13 +386,13 @@ void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols); int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset); // ----------------- K-V data row structure /* * +----------+----------+---------------------------------+---------------------------------+ - * | int16_t | int16_t | | | + * | uint16_t | int16_t | | | * +----------+----------+---------------------------------+---------------------------------+ * | len | ncols | cols index | data part | * +----------+----------+---------------------------------+---------------------------------+ @@ -332,13 +400,13 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge typedef void *SKVRow; typedef struct { - int16_t colId; - int16_t offset; + int16_t colId; + uint16_t offset; } SColIdx; #define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t)) -#define kvRowLen(r) (*(int16_t *)(r)) +#define kvRowLen(r) (*(uint16_t *)(r)) #define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) #define kvRowSetLen(r, len) kvRowLen(r) = (len) #define kvRowSetNCols(r, n) kvRowNCols(r) = (n) @@ -349,6 +417,11 @@ typedef struct { #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) #define kvRowFree(r) tfree(r) #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) +#define kvRowVersion(r) (-1) + +#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r))) +#define kvRowKey(r) tdGetKey(kvRowTKey(r)) +#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r)) SKVRow tdKVRowDup(SKVRow row); int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value); @@ -377,8 +450,8 @@ typedef struct { int16_t tCols; int16_t nCols; SColIdx *pColIdx; - int16_t alloc; - int16_t size; + uint16_t alloc; + uint16_t size; void * buf; } SKVRowBuilder; @@ -414,6 +487,94 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, return 0; } +// ----------------- Data row structure + +// ----------------- Sequential Data row structure +/* A sequential data row, the format is like below: + * |<--------------------+--------------------------- len ---------------------------------->| + * |<-- Head -->|<--------- flen -------------->| | + * +---------------------+---------------------------------+---------------------------------+ + * | uint16_t | int16_t | | | + * +----------+----------+---------------------------------+---------------------------------+ + * | len | sversion | First part | Second part | + * +----------+----------+---------------------------------+---------------------------------+ + * + * NOTE: timestamp in this row structure is TKEY instead of TSKEY + */ +// ----------------- K-V data row structure +/* + * +----------+----------+---------------------------------+---------------------------------+ + * | int16_t | int16_t | | | + * +----------+----------+---------------------------------+---------------------------------+ + * | len | ncols | cols index | data part | + * +----------+----------+---------------------------------+---------------------------------+ + */ + +#define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t) +#define TD_MEM_ROW_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + sizeof(uint16_t) + sizeof(int16_t)) + +#define SMEM_ROW_DATA 0U // SDataRow +#define SMEM_ROW_KV 1U // SKVRow +#define TD_DO_NOTHING \ + do { \ + } while (0) + +#define memRowType(r) (*(uint8_t *)(r)) +#define memRowBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) +#define memRowLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) +#define memRowTLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) + (uint16_t)TD_MEM_ROW_TYPE_SIZE) + +#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) +#define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) +#define memRowVersion(r) (isDataRow(r) ? dataRowVersion(memRowBody(r)) : kvRowVersion(r)) // schema version + +#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowBody(r)) : kvRowValues(memRowBody(r))) + +#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowBody(r)) : kvRowTKey(memRowBody(r))) +#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowBody(r)) : kvRowKey(memRowBody(r))) + +#define memRowSetType(r, t) (memRowType(r) = (t)) +#define memRowSetLen(r, l) (memRowLen(r) = (l)) +#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(r, v) : TD_DO_NOTHING) +#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r)) +#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE) +#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) + +#define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow +#define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow +#define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow +#define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow + +// NOTE: offset here including the header size +static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) { + if (IS_VAR_DATA_TYPE(type)) { + return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); + } else { + return POINTER_SHIFT(row, offset); + } + return NULL; +} +static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int8_t type, int32_t offset) { + return POINTER_SHIFT(row, offset); +} + +static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) { + if (isDataRow(row)) { + return tdGetRowDataOfCol(row, type, offset); + } else if (isKvRow(row)) { + return tdGetKvRowDataOfCol(row, type, offset); + } else { + ASSERT(0); + } + return NULL; +} + +// #define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) +// #define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) +// #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) +// #define kvRowFree(r) tfree(r) +// #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) + #ifdef __cplusplus } #endif diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 7ae34d532c..a5819ac858 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -18,6 +18,21 @@ #include "tcoding.h" #include "wchar.h" +const uint8_t BoolNull = TSDB_DATA_BOOL_NULL; +const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL; +const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL; +const uint32_t IntNull = TSDB_DATA_INT_NULL; +const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL; +const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL; +const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL; +const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL; +const uint32_t UIntNull = TSDB_DATA_UINT_NULL; +const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL; +const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL; +const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL; +const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL}; +const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL}; + static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows); @@ -173,36 +188,43 @@ void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetVersion(row, schemaVersion(pSchema)); } -SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { - int32_t size = dataRowMaxBytesFromSchema(pSchema); +// SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { +// int32_t size = dataRowMaxBytesFromSchema(pSchema); - SDataRow row = malloc(size); - if (row == NULL) return NULL; +// SDataRow row = malloc(size); +// if (row == NULL) return NULL; - tdInitDataRow(row, pSchema); - return row; -} +// tdInitDataRow(row, pSchema); +// return row; +// } /** * Free the SDataRow object */ -void tdFreeDataRow(SDataRow row) { - if (row) free(row); -} +// void tdFreeDataRow(SDataRow row) { +// if (row) free(row); +// } -SDataRow tdDataRowDup(SDataRow row) { - SDataRow trow = malloc(dataRowLen(row)); +// SDataRow tdDataRowDup(SDataRow row) { +// SDataRow trow = malloc(dataRowLen(row)); +// if (trow == NULL) return NULL; + +// dataRowCpy(trow, row); +// return trow; +// } + +SMemRow tdMemRowDup(SMemRow row) { + SMemRow trow = malloc(memRowTLen(row)); if (trow == NULL) return NULL; - dataRowCpy(trow, row); + memRowCpy(trow, row); return trow; } - void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { pDataCol->type = colType(pCol); pDataCol->colId = colColId(pCol); pDataCol->bytes = colBytes(pCol); - pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; + pDataCol->offset = colOffset(pCol) + TD_MEM_ROW_HEAD_SIZE; pDataCol->len = 0; if (IS_VAR_DATA_TYPE(pDataCol->type)) { @@ -219,9 +241,21 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) } // value from timestamp should be TKEY here instead of TSKEY -void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { +void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); + if (pCol->len == 0) { + if (isNull(value, pCol->type)) { + // all null value yet, just return + return; + } + + if (numOfRows > 0) { + // Find the first not null value, fill all previous values as NULL + dataColSetNEleNull(pCol, numOfRows, maxPoints); + } + } + if (IS_VAR_DATA_TYPE(pCol->type)) { // set offset pCol->dataOff[numOfRows] = pCol->len; @@ -399,11 +433,10 @@ void tdResetDataCols(SDataCols *pCols) { } } } - -void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { +static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); - int rcol = 0; + int rcol = 0; // rowCol int dcol = 0; if (dataRowDeleted(row)) { @@ -419,7 +452,7 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) while (dcol < pCols->numOfCols) { SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { - dataColSetNullAt(pDataCol, pCols->numOfRows); + dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dcol++; continue; } @@ -433,7 +466,7 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); + dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dcol++; } } @@ -441,6 +474,98 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) pCols->numOfRows++; } +static void tdGetKVRowColInfo(const STSchema *pSchema, SColIdx *pColIdx, int nRowCols, STColumn *pSTColumn, + int *nColMatched) { + int nSchema = schemaNCols(pSchema); + int iCol = 0; + int iSchema = 0; + int nColMatch = 0; + SColIdx * pIdx = pColIdx; + const STColumn *pColumn = NULL; + + while (iCol < nRowCols && iSchema < nSchema) { + pColumn = &pSchema->columns[iSchema]; + if (pIdx->colId == pColumn->colId) { + pSTColumn[nColMatch].colId = pIdx->colId; + pSTColumn[nColMatch].type = pColumn->type; + pSTColumn[nColMatch].bytes = pColumn->bytes; + pSTColumn[nColMatch].offset = pIdx->offset; + + pIdx += sizeof(SColIdx); + + ++iCol; + ++iSchema; + ++nColMatch; + } else if (pIdx->colId > pColumn->colId) { + ++iSchema; + } else { + pIdx += sizeof(SColIdx); + ++iCol; + } + } + *nColMatched = nColMatch; +} + +static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) { + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); + + int rcol = 0; + int dcol = 0; + + if (kvRowDeleted(row)) { + for (; dcol < pCols->numOfCols; dcol++) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (dcol == 0) { + dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints); + } else { + dataColSetNullAt(pDataCol, pCols->numOfRows); + } + } + } else { + int nRowCols = kvRowNCols(row); + int nRowColsMatched = 0; + STColumn stColumn[nRowCols]; + tdGetKVRowColInfo(pSchema, kvRowColIdx(row), nRowCols, stColumn, &nRowColsMatched); + uDebug("kvRow: nRowCols=%d, nRowColsMatched=%d, nSchemaCols=%d", nRowCols, nRowColsMatched, schemaNCols(pSchema)); + + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= nRowColsMatched || rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + continue; + } + + SColIdx *colIdx = kvRowColIdxAt(row, rcol); + + if (colIdx->colId == pDataCol->colId) { + ASSERT(pDataCol->type == stColumn[rcol].type); + + void *value = tdGetKvRowDataOfCol(row, pDataCol->type, stColumn[rcol].offset + TD_KV_ROW_HEAD_SIZE); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (colIdx->colId < pDataCol->colId) { + rcol++; + } else { + dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + } + } + } + pCols->numOfRows++; +} + +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) { + if (isDataRow(row)) { + tdAppendDataRowToDataCol(memRowBody(row), pSchema, pCols); + } else if (isKvRow(row)) { + tdAppendKvRowToDataCol(memRowBody(row), pSchema, pCols); + } else { + ASSERT(0); + } +} + int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -559,11 +684,11 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row - int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + uint16_t diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff); if (nrow == NULL) return -1; - kvRowSetLen(nrow, kvRowLen(row) + (int16_t)sizeof(SColIdx) + diff); + kvRowSetLen(nrow, kvRowLen(row) + (uint16_t)sizeof(SColIdx) + diff); kvRowSetNCols(nrow, kvRowNCols(row) + 1); if (ptr == NULL) { @@ -605,8 +730,8 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place memcpy(pOldVal, value, varDataTLen(value)); } else { // need to reallocate the memory - int16_t diff = varDataTLen(value) - varDataTLen(pOldVal); - int16_t nlen = kvRowLen(row) + diff; + uint16_t diff = varDataTLen(value) - varDataTLen(pOldVal); + uint16_t nlen = kvRowLen(row) + diff; ASSERT(nlen > 0); nrow = malloc(nlen); if (nrow == NULL) return -1; @@ -693,7 +818,7 @@ void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { } SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { - int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; + uint16_t tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; if (tlen == 0) return NULL; tlen += TD_KV_ROW_HEAD_SIZE; @@ -709,3 +834,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { return row; } + diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index f539e77253..a460b1d619 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -483,8 +483,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); - SDataRow trow = (SDataRow)pBlk->data; - tdInitDataRow(trow, pSchema); + SMemRow trow = (SMemRow)pBlk->data; + tdInitDataRow(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), pSchema); for (int32_t i = 0; i < pSchema->numOfCols; i++) { STColumn *c = pSchema->columns + i; @@ -500,9 +500,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { memcpy((char *)val + sizeof(VarDataLenT), buf, len); varDataLen(val) = len; } - tdAppendColVal(trow, val, c->type, c->bytes, c->offset); + tdAppendColVal(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), val, c->type, c->bytes, c->offset); } - pBlk->dataLen = htonl(dataRowLen(trow)); + pBlk->dataLen = htonl(memRowTLen(trow)); pBlk->schemaLen = 0; pBlk->uid = htobe64(pObj->uid); @@ -511,7 +511,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { pBlk->sversion = htonl(pSchema->version); pBlk->padding = 0; - pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow); + pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowTLen(trow); pMsg->header.vgId = htonl(pContext->vgId); pMsg->header.contLen = htonl(pHead->len); diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 9949f31c59..0308379509 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -11,7 +11,7 @@ extern "C" { // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR typedef int32_t VarDataOffsetT; -typedef int16_t VarDataLenT; +typedef int16_t VarDataLenT; // maxDataLen: 32767 typedef struct tstr { VarDataLenT len; diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index babb7024b2..67e9976c70 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -71,27 +71,27 @@ int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxK TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); void* tsdbCommitData(STsdbRepo* pRepo); -static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { +static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; SSkipListNode* node = tSkipListIterGet(pIter); if (node == NULL) return NULL; - return (SDataRow)SL_GET_NODE_DATA(node); + return (SMemRow)SL_GET_NODE_DATA(node); } static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); + SMemRow row = tsdbNextIterRow(pIter); if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - return dataRowKey(row); + return memRowKey(row); } static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { - SDataRow row = tsdbNextIterRow(pIter); + SMemRow row = tsdbNextIterRow(pIter); if (row == NULL) return TKEY_NULL; - return dataRowTKey(row); + return memRowTKey(row); } #endif /* _TD_TSDB_MEMTABLE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index 45bbd5a7c6..9a8de01f71 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -32,7 +32,7 @@ typedef struct STable { void* eventHandler; // TODO void* streamHandler; // TODO TSKEY lastKey; - SDataRow lastRow; + SMemRow lastRow; char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions @@ -148,7 +148,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { - ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); + ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow))); return pTable->lastKey; } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 82cc6f07f7..d0bad840e0 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -920,7 +920,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo SDataCol * pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it + // if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it + if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } @@ -1264,12 +1265,12 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); bool isRowDel = false; - SDataRow row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || dataRowKey(row) > maxKey) { + SMemRow row = tsdbNextIterRow(pCommitIter->pIter); + if (row == NULL || memRowKey(row) > maxKey) { key2 = INT64_MAX; } else { - key2 = dataRowKey(row); - isRowDel = dataRowDeleted(row); + key2 = memRowKey(row); + isRowDel = memRowDeleted(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; @@ -1284,24 +1285,24 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt (*iter)++; } else if (key1 > key2) { if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); ASSERT(pSchema != NULL); } - tdAppendDataRowToDataCol(row, pSchema, pTarget); + tdAppendMemRowToDataCol(row, pSchema, pTarget); } tSkipListIterNext(pCommitIter->pIter); } else { if (update) { if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); ASSERT(pSchema != NULL); } - tdAppendDataRowToDataCol(row, pSchema, pTarget); + tdAppendMemRowToDataCol(row, pSchema, pTarget); } } else { ASSERT(!isRowDel); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 0cbabb8909..c0474f3b07 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -639,7 +639,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea int numColumns; int32_t blockIdx; SDataStatis* pBlockStatis = NULL; - SDataRow row = NULL; + SMemRow row = NULL; // restore last column data with last schema int err = 0; @@ -655,13 +655,13 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea } } - row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + row = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); if (row == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; err = -1; goto out; } - tdInitDataRow(row, pSchema); + tdInitDataRow(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), pSchema); // first load block index info if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { @@ -718,9 +718,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; - tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); + tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type, + pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; - void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + void *value = tdGetMemRowDataOfCol(row, (int8_t)pCol->type, TD_MEM_ROW_HEAD_SIZE + pCol->offset); if (isNull(value, pCol->type)) { continue; } @@ -740,8 +741,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // save row ts(in column 0) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); - tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); - pLastCol->ts = dataRowKey(row); + tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type, + pCol->bytes, pCol->offset); + pLastCol->ts = memRowKey(row); pTable->restoreColumnNum += 1; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 9d8b1ca7f2..94bac68296 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -21,7 +21,7 @@ typedef struct { int32_t totalLen; int32_t len; - SDataRow row; + SMemRow row; } SSubmitBlkIter; typedef struct { @@ -36,20 +36,19 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); -static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); +static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); -static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow); +static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row); - -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { @@ -354,7 +353,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey TSKEY fKey = 0; bool isRowDel = false; int filterIter = 0; - SDataRow row = NULL; + SMemRow row = NULL; SMergeInfo mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo; @@ -365,12 +364,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (pCols) tdResetDataCols(pCols); row = tsdbNextIterRow(pIter); - if (row == NULL || dataRowKey(row) > maxKey) { + if (row == NULL || memRowKey(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = dataRowKey(row); - isRowDel = dataRowDeleted(row); + rowKey = memRowKey(row); + isRowDel = memRowDeleted(row); } if (filterIter >= nFilterKeys) { @@ -407,12 +406,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || dataRowKey(row) > maxKey) { + if (row == NULL || memRowKey(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = dataRowKey(row); - isRowDel = dataRowDeleted(row); + rowKey = memRowKey(row); + isRowDel = memRowDeleted(row); } } else { if (isRowDel) { @@ -437,12 +436,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || dataRowKey(row) > maxKey) { + if (row == NULL || memRowKey(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = dataRowKey(row); - isRowDel = dataRowDeleted(row); + rowKey = memRowKey(row); + isRowDel = memRowDeleted(row); } filterIter++; @@ -548,7 +547,7 @@ static void tsdbFreeTableData(STableData *pTableData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } +static char *tsdbGetTsTupleKey(const void *data) { return memRowTuple((SMemRow)data); } static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ASSERT(pMemTable->maxTables < maxTables); @@ -572,17 +571,17 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { return 0; } -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) { +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) { if (pCols) { - if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); + if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) { + *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row)); if (*ppSchema == NULL) { ASSERT(false); return -1; } } - tdAppendDataRowToDataCol(row, *ppSchema, pCols); + tdAppendMemRowToDataCol(row, *ppSchema, pCols); } return 0; @@ -592,31 +591,32 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pBlock->dataLen <= 0) return -1; pIter->totalLen = pBlock->dataLen; pIter->len = 0; - pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen); + pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen); return 0; } -static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SDataRow row = pIter->row; +static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { + SMemRow row = pIter->row; // firstly, get current row if (row == NULL) return NULL; - pIter->len += dataRowLen(row); - if (pIter->len >= pIter->totalLen) { + pIter->len += memRowTLen(row); + if (pIter->len >= pIter->totalLen) { // reach the end pIter->row = NULL; } else { - pIter->row = (char *)row + dataRowLen(row); + pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row } return row; } -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now) { - if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) { + TSKEY rowKey = memRowKey(row); + if (rowKey < minKey || rowKey > maxKey) { tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 " maxKey %" PRId64 " row key %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, - dataRowKey(row)); + rowKey); terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; return -1; } @@ -630,7 +630,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { SSubmitMsgIter msgIter = {0}; SSubmitBlk * pBlock = NULL; SSubmitBlkIter blkIter = {0}; - SDataRow row = NULL; + SMemRow row = NULL; TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; @@ -698,7 +698,7 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * int64_t points = 0; STable * pTable = NULL; SSubmitBlkIter blkIter = {0}; - SDataRow row = NULL; + SMemRow row = NULL; void * rows[TSDB_MAX_INSERT_BATCH] = {0}; int rowCounter = 0; @@ -744,10 +744,10 @@ _err: return -1; } -static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) { +static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow) { STsdbCfg * pCfg = &pRepo->config; - TKEY tkey = dataRowTKey(row); - TSKEY key = dataRowKey(row); + TKEY tkey = memRowTKey(row); + TSKEY key = memRowKey(row); bool isRowDelete = TKEY_IS_DELETED(tkey); if (isRowDelete) { @@ -765,15 +765,15 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void } } - void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); + void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row)); if (pRow == NULL) { tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", - REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); + REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno)); return -1; } - dataRowCpy(pRow, row); - ppRow[0] = pRow; + memRowCpy(pRow, row); + ppRow[0] = pRow; // save the memory address of data rows tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), @@ -954,8 +954,8 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { STsdbBufPool *pBufPool = pRepo->pPool; for (int i = rowCounter - 1; i >= 0; --i) { - SDataRow row = (SDataRow)rows[i]; - int bytes = (int)dataRowLen(row); + SMemRow row = (SMemRow)rows[i]; + int bytes = (int)memRowTLen(row); if (pRepo->mem->extraBuffList == NULL) { STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); @@ -988,15 +988,16 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { } } -static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { - tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); +static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) { + tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, + memRowVersion(row)); STSchema* pSchema = tsdbGetTableLatestSchema(pTable); if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { return; } - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); if (pSchema == NULL) { return; } @@ -1010,8 +1011,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r if (idx == -1) { continue; } - - void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + + void *value = tdGetMemRowDataOfCol(row, (int8_t)pTCol->type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset); if (isNull(value, pTCol->type)) { continue; } @@ -1027,11 +1028,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r memcpy(pDataCol->pData, value, pDataCol->bytes); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); - pDataCol->ts = dataRowKey(row); + pDataCol->ts = memRowKey(row); } } -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row) { STsdbCfg *pCfg = &pRepo->config; // if cacheLastRow config has been reset, free the lastRow @@ -1042,31 +1043,31 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow TSDB_WUNLOCK_TABLE(pTable); } - if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { + if (tsdbGetTableLastKeyImpl(pTable) < memRowKey(row)) { if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { - SDataRow nrow = pTable->lastRow; - if (taosTSizeof(nrow) < dataRowLen(row)) { - SDataRow orow = nrow; - nrow = taosTMalloc(dataRowLen(row)); + SMemRow nrow = pTable->lastRow; + if (taosTSizeof(nrow) < memRowTLen(row)) { + SMemRow orow = nrow; + nrow = taosTMalloc(memRowTLen(row)); if (nrow == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - dataRowCpy(nrow, row); + memRowCpy(nrow, row); TSDB_WLOCK_TABLE(pTable); - pTable->lastKey = dataRowKey(row); + pTable->lastKey = memRowKey(row); pTable->lastRow = nrow; TSDB_WUNLOCK_TABLE(pTable); taosTZfree(orow); } else { TSDB_WLOCK_TABLE(pTable); - pTable->lastKey = dataRowKey(row); - dataRowCpy(nrow, row); + pTable->lastKey = memRowKey(row); + memRowCpy(nrow, row); TSDB_WUNLOCK_TABLE(pTable); } } else { - pTable->lastKey = dataRowKey(row); + pTable->lastKey = memRowKey(row); } if (CACHE_LAST_NULL_COLUMN(pCfg)) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 972c3c4e10..32232e6815 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -139,7 +139,7 @@ typedef struct STableGroupSupporter { static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); -static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey); +static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); @@ -669,8 +669,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); - TSKEY key = dataRowKey(row); // first timestamp in buffer + SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); + TSKEY key = memRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast, @@ -691,8 +691,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); - TSKEY key = dataRowKey(row); // first timestamp in buffer + SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); + TSKEY key = memRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast, @@ -716,19 +716,19 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { tSkipListDestroyIter(pCheckInfo->iiter); } -static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { - SDataRow rmem = NULL, rimem = NULL; +static SMemRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { + SMemRow rmem = NULL, rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = (SDataRow)SL_GET_NODE_DATA(node); + rmem = (SMemRow)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = (SDataRow)SL_GET_NODE_DATA(node); + rimem = (SMemRow)SL_GET_NODE_DATA(node); } } @@ -746,8 +746,8 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order return rimem; } - TSKEY r1 = dataRowKey(rmem); - TSKEY r2 = dataRowKey(rimem); + TSKEY r1 = memRowKey(rmem); + TSKEY r2 = memRowKey(rimem); if (r1 == r2) { // data ts are duplicated, ignore the data in mem if (!update) { @@ -826,12 +826,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update); + SMemRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update); if (row == NULL) { return false; } - pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer + pCheckInfo->lastKey = memRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, 0x%"PRIx64, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qId); @@ -1082,11 +1082,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p int32_t code = TSDB_CODE_SUCCESS; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); + SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); assert(cur->pos >= 0 && cur->pos <= binfo.rows); - TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; + TSKEY key = (row != NULL) ? memRowKey(row) : TSKEY_INITIAL_VAL; if (key != TSKEY_INITIAL_VAL) { tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pQueryHandle, key, pQueryHandle->qId); } else { @@ -1327,7 +1327,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity // todo refactor, only copy one-by-one for (int32_t k = start; k < num + start; ++k) { - char* p = tdGetColDataOfRow(src, k); + const char* p = tdGetColDataOfRow(src, k); memcpy(dst, p, varDataTLen(p)); dst += bytes; } @@ -1378,14 +1378,14 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity return numOfRows + num; } -static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, +static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row, int32_t numOfCols, STable* pTable, STSchema* pSchema) { char* pData = NULL; // the schema version info is embeded in SDataRow int32_t numOfRowCols = 0; if (pSchema == NULL) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); numOfRowCols = schemaNCols(pSchema); } else { numOfRowCols = schemaNCols(pSchema); @@ -1406,7 +1406,8 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, } if (pSchema->columns[j].colId == pColInfo->info.colId) { - void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + void* value = + tdGetMemRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset); switch (pColInfo->info.type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -1656,12 +1657,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); + SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); if (row == NULL) { break; } - TSKEY key = dataRowKey(row); + TSKEY key = memRowKey(row); if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { break; @@ -1674,11 +1675,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - if (rv != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); - rv = dataRowVersion(row); - } - + if (rv != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); + rv = memRowVersion(row); + } + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -1692,11 +1693,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it if (pCfg->update) { - if (rv != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); - rv = dataRowVersion(row); + if (rv != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); + rv = memRowVersion(row); } - + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -1746,8 +1747,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || - ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { + ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && + ASCENDING_TRAVERSE(pQueryHandle->order)) || + ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && + !ASCENDING_TRAVERSE(pQueryHandle->order))) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; @@ -2333,12 +2336,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; do { - SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); + SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); if (row == NULL) { break; } - TSKEY key = dataRowKey(row); + TSKEY key = memRowKey(row); if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, pQueryHandle->window.ekey); @@ -2351,9 +2354,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } win->ekey = key; - if (rv != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); - rv = dataRowVersion(row); + if (rv != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); + rv = memRowVersion(row); } copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema); @@ -2470,7 +2473,7 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { SQueryFilePos* cur = &pQueryHandle->cur; - SDataRow pRow = NULL; + SMemRow pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; @@ -2873,7 +2876,7 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) { * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW * else set pRes and return TSDB_CODE_SUCCESS and save lastKey */ -int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { +int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey) { int32_t code = TSDB_CODE_SUCCESS; TSDB_RLOCK_TABLE(pTable); @@ -2884,7 +2887,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { } if (pRes) { - *pRes = tdDataRowDup(pTable->lastRow); + *pRes = tdMemRowDup(pTable->lastRow); if (*pRes == NULL) { code = TSDB_CODE_TDB_OUT_OF_MEMORY; }