diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 5001a99c2a..6e344dfdb4 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -57,6 +57,8 @@ typedef enum { TD_ROW_PARTIAL_UPDATE = 2, } TDUpdateConfig; +#define TD_SUPPORT_UPDATE(u) ((u) > 0) + typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 8e07d4e669..8a87a1bd2e 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -482,7 +482,7 @@ void tdResetDataCols(SDataCols *pCols); int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull); +int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer); // ----------------- K-V data row structure /* |<-------------------------------------- len -------------------------------------------->| diff --git a/include/common/trow.h b/include/common/trow.h index b7ffcd14c6..539bda078d 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -230,7 +230,7 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowVa static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId); -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull); +int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols); /** * @brief diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 37d688a0ef..5f1cfc030a 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -27,6 +27,7 @@ extern "C" { typedef int32_t VarDataOffsetT; typedef uint32_t TDRowLenT; typedef uint8_t TDRowValT; +typedef uint64_t TDRowVerT; typedef int16_t col_id_t; typedef int8_t col_type_t; typedef int32_t col_bytes_t; diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f6614633fb..f2a5a98d96 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -411,6 +411,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { #endif pCols->numOfRows = 0; + pCols->bitmapMode = 0; pCols->numOfCols = schemaNCols(pSchema); for (i = 0; i < schemaNCols(pSchema); ++i) { diff --git a/source/common/src/trow.c b/source/common/src/trow.c index d9c48bfe76..7392b0bf8b 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -385,7 +385,6 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int pCol->len += pCol->bytes; } #ifdef TD_SUPPORT_BITMAP - tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode); #endif return 0; @@ -486,7 +485,7 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols * @param pCols * @param forceSetNull */ -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { +int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { if (TD_IS_TP_ROW(pRow)) { return tdAppendTpRowToDataCol(pRow, pSchema, pCols); } else if (TD_IS_KV_ROW(pRow)) { @@ -497,7 +496,7 @@ int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCol return TSDB_CODE_SUCCESS; } -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull, TDRowVerT maxVer) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); int offset = 0; @@ -510,6 +509,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); + // TODO: filter the maxVer for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { if (source->cols[j].len > 0 || target->cols[j].len > 0) { @@ -555,9 +555,9 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i // TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); - + // TODO: filter the maxVer if (key1 < key2) { - for (int i = 0; i < src1->numOfCols; i++) { + for (int i = 0; i < src1->numOfCols; ++i) { ASSERT(target->cols[i].type == src1->cols[i].type); if (src1->cols[i].len > 0 || target->cols[i].len > 0) { SCellVal sVal = {0}; @@ -568,12 +568,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i } } - target->numOfRows++; - (*iter1)++; + ++target->numOfRows; + ++(*iter1); } else if (key1 >= key2) { - // if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { - if ((key1 > key2) || (key1 == key2)) { - for (int i = 0; i < src2->numOfCols; i++) { + // TODO: filter the maxVer + if ((key1 > key2) || ((key1 == key2) && !TKEY_IS_DELETED(key2))) { + for (int i = 0; i < src2->numOfCols; ++i) { SCellVal sVal = {0}; ASSERT(target->cols[i].type == src2->cols[i].type); if (tdGetColDataOfRow(&sVal, src2->cols + i, *iter2, src2->bitmapMode) < 0) { @@ -590,11 +590,11 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i dataColSetNullAt(&target->cols[i], target->numOfRows, true, target->bitmapMode); } } - target->numOfRows++; + ++target->numOfRows; } - (*iter2)++; - if (key1 == key2) (*iter1)++; + ++(*iter2); + if (key1 == key2) ++(*iter1); } ASSERT(target->numOfRows <= target->maxPoints); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 21ad1ec076..67c7973206 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1678,10 +1678,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ASSERT(pSchema != NULL); } - tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); + tdAppendSTSRowToDataCol(row, pSchema, pTarget); tSkipListIterNext(pCommitIter->pIter); } else { +#if 0 if (update != TD_ROW_OVERWRITE_UPDATE) { // copy disk data for (int i = 0; i < pDataCols->numOfCols; ++i) { @@ -1706,6 +1707,43 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt } ++(*iter); tSkipListIterNext(pCommitIter->pIter); +#endif + // copy disk data + for (int i = 0; i < pDataCols->numOfCols; ++i) { + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { + TASSERT(0); + } + // TODO: tdAppendValToDataCol may fail + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, + pTarget->bitmapMode); + } + + if (TD_SUPPORT_UPDATE(update)) { + // copy mem data(Multi-Version) + if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); + ASSERT(pSchema != NULL); + } + + // TODO: merge with Multi-Version + STSRow *curRow = row; + + ++(*iter); + tSkipListIterNext(pCommitIter->pIter); + STSRow *nextRow = tsdbNextIterRow(pCommitIter->pIter); + + if (key2 < TD_ROW_KEY(nextRow)) { + tdAppendSTSRowToDataCol(row, pSchema, pTarget); + } else { + tdAppendSTSRowToDataCol(row, pSchema, pTarget); + } + // TODO: merge with Multi-Version + } else { + ++pTarget->numOfRows; + ++(*iter); + tSkipListIterNext(pCommitIter->pIter); + } } if (pTarget->numOfRows >= maxRows) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index c657b2e947..54115003cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -475,7 +475,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * } } - tdAppendSTSRowToDataCol(row, *ppSchema, pCols, true); + tdAppendSTSRowToDataCol(row, *ppSchema, pCols); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5677bb0615..e308c9c2b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -761,6 +761,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { +#if 0 if(update == TD_ROW_DISCARD_UPDATE){ pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; tSkipListIterNext(pCheckInfo->iter); @@ -771,6 +772,13 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; } +#endif + if (TD_SUPPORT_UPDATE(update)) { + pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; + } else { + pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; + tSkipListIterNext(pCheckInfo->iter); + } return r1; } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 87459593b5..e31ede09cc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -263,8 +263,9 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; + // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, - update != TD_ROW_PARTIAL_UPDATE) < 0) + update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0) return -1; } @@ -293,8 +294,9 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, for (int i = 1; i < pBlock->numOfSubBlocks; i++) { iBlock++; if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; + // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL, - update != TD_ROW_PARTIAL_UPDATE) < 0) + update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0) return -1; } @@ -304,6 +306,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, if (pDataCol->bitmap) { ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID); tdMergeBitmap(pDataCol->pBitmap, TD_BITMAP_BYTES(pReadh->pDCols[0]->numOfRows), pDataCol->pBitmap); + tdDataColsSetBitmapI(pReadh->pDCols[0]); } } }