TD-521
This commit is contained in:
parent
4913ce5897
commit
6632c4160f
|
@ -251,7 +251,7 @@ void tdFreeDataCols(SDataCols *pCols);
|
||||||
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols);
|
||||||
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
|
||||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
|
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows);
|
||||||
|
|
||||||
// ----------------- K-V data row structure
|
// ----------------- K-V data row structure
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -450,7 +450,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
||||||
|
|
||||||
int iter1 = 0;
|
int iter1 = 0;
|
||||||
int iter2 = 0;
|
int iter2 = 0;
|
||||||
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
|
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, &iter2, source->numOfRows,
|
||||||
|
pTarget->numOfRows + rowsToMerge);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdFreeDataCols(pTarget);
|
tdFreeDataCols(pTarget);
|
||||||
|
@ -461,15 +462,15 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
|
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows) {
|
||||||
// TODO: add resolve duplicate key here
|
|
||||||
tdResetDataCols(target);
|
tdResetDataCols(target);
|
||||||
|
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
|
||||||
|
|
||||||
while (target->numOfRows < tRows) {
|
while (target->numOfRows < tRows) {
|
||||||
if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
|
if (*iter1 >= limit1 && *iter2 >= limit2) break;
|
||||||
|
|
||||||
TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
|
||||||
TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
|
||||||
|
|
||||||
if (key1 <= key2) {
|
if (key1 <= key2) {
|
||||||
for (int i = 0; i < src1->numOfCols; i++) {
|
for (int i = 0; i < src1->numOfCols; i++) {
|
||||||
|
|
|
@ -977,7 +977,8 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
// tdResetDataCols(pHelper->pDataCols[1]);
|
||||||
while (true) {
|
while (true) {
|
||||||
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
|
||||||
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
|
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
|
||||||
|
pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
|
||||||
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
|
||||||
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
||||||
|
@ -989,54 +990,6 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
}
|
}
|
||||||
round++;
|
round++;
|
||||||
blkIdx++;
|
blkIdx++;
|
||||||
// TODO: the blkIdx here is not correct
|
|
||||||
|
|
||||||
// if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) {
|
|
||||||
// if (pHelper->pDataCols[1]->numOfRows > 0) {
|
|
||||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
|
|
||||||
// pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
|
|
||||||
// goto _err;
|
|
||||||
// // TODO: the blkIdx here is not correct
|
|
||||||
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows
|
|
||||||
// ? INT64_MAX
|
|
||||||
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
|
|
||||||
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
|
|
||||||
|
|
||||||
// if (key1 < key2) {
|
|
||||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
|
||||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
|
||||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
|
||||||
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
|
|
||||||
// TYPE_BYTES[pDataCol->type]);
|
|
||||||
// }
|
|
||||||
// pHelper->pDataCols[1]->numOfRows++;
|
|
||||||
// iter1++;
|
|
||||||
// } else if (key1 == key2) {
|
|
||||||
// // TODO: think about duplicate key cases
|
|
||||||
// ASSERT(false);
|
|
||||||
// } else {
|
|
||||||
// for (int i = 0; i < pDataCols->numOfCols; i++) {
|
|
||||||
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
|
|
||||||
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
|
|
||||||
// ((char *)pDataCols->cols[i].pData +
|
|
||||||
// TYPE_BYTES[pDataCol->type] * iter2),
|
|
||||||
// TYPE_BYTES[pDataCol->type]);
|
|
||||||
// }
|
|
||||||
// pHelper->pDataCols[1]->numOfRows++;
|
|
||||||
// iter2++;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
|
|
||||||
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err;
|
|
||||||
// // TODO: blkIdx here is not correct, fix it
|
|
||||||
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
|
|
||||||
|
|
||||||
// tdResetDataCols(pHelper->pDataCols[1]);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue