commit
cd8aef28de
|
@ -251,7 +251,7 @@ void tdFreeDataCols(SDataCols *pCols);
|
|||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
|
||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows);
|
||||
|
||||
// ----------------- K-V data row structure
|
||||
/*
|
||||
|
|
|
@ -450,7 +450,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
|||
|
||||
int iter1 = 0;
|
||||
int iter2 = 0;
|
||||
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
|
||||
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
|
||||
pTarget->numOfRows + rowsToMerge);
|
||||
}
|
||||
|
||||
tdFreeDataCols(pTarget);
|
||||
|
@ -461,15 +462,15 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
|
||||
// TODO: add resolve duplicate key here
|
||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
|
||||
tdResetDataCols(target);
|
||||
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
|
||||
|
||||
while (target->numOfRows < tRows) {
|
||||
if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
|
||||
if (*iter1 >= limit1 && *iter2 >= limit2) break;
|
||||
|
||||
TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
||||
TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
||||
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
||||
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
||||
|
||||
if (key1 <= key2) {
|
||||
for (int i = 0; i < src1->numOfCols; i++) {
|
||||
|
|
|
@ -977,7 +977,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
// tdResetDataCols(pHelper->pDataCols[1]);
|
||||
while (true) {
|
||||
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
|
||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
|
||||
pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
|
||||
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
||||
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||
|
@ -989,54 +990,6 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
}
|
||||
round++;
|
||||
blkIdx++;
|
||||
// TODO: the blkIdx here is not correct
|
||||
|
||||
// if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) {
|
||||
// if (pHelper->pDataCols[1]->numOfRows > 0) {
|
||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
|
||||
// pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||
// goto _err;
|
||||
// // TODO: the blkIdx here is not correct
|
||||
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows);
|
||||
// }
|
||||
// }
|
||||
|
||||
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows
|
||||
// ? INT64_MAX
|
||||
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
|
||||
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
|
||||
|
||||
// if (key1 < key2) {
|
||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
||||
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
|
||||
// TYPE_BYTES[pDataCol->type]);
|
||||
// }
|
||||
// pHelper->pDataCols[1]->numOfRows++;
|
||||
// iter1++;
|
||||
// } else if (key1 == key2) {
|
||||
// // TODO: think about duplicate key cases
|
||||
// ASSERT(false);
|
||||
// } else {
|
||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
||||
// ((char *)pDataCols->cols[i].pData +
|
||||
// TYPE_BYTES[pDataCol->type] * iter2),
|
||||
// TYPE_BYTES[pDataCol->type]);
|
||||
// }
|
||||
// pHelper->pDataCols[1]->numOfRows++;
|
||||
// iter2++;
|
||||
// }
|
||||
|
||||
// if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
|
||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err;
|
||||
// // TODO: blkIdx here is not correct, fix it
|
||||
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
|
||||
|
||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue