tsdbCache: first round tsRowFromTsdbRow
This commit is contained in:
parent
b278180254
commit
6ece070911
|
@ -295,7 +295,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile
|
|||
} else if (pIMemRow != NULL) {
|
||||
} else {
|
||||
if (!tsdbKeyDeleted(key, pSkyline)) {
|
||||
*ppLastRow = buildTsrowFromTsdbrow(&row);
|
||||
code = buildTsrowFromTsdbrow(&row, ppLastRow);
|
||||
goto _done;
|
||||
} else {
|
||||
continue;
|
||||
|
@ -475,15 +475,34 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) {
|
||||
// TODO: new tsrow from tsdbrow
|
||||
STSRow *ret = NULL;
|
||||
static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
if (pRow->type == 0) {
|
||||
return tdRowDup(pRow->pTSRow);
|
||||
*ppRow = tdRowDup(pRow->pTSRow);
|
||||
} else {
|
||||
SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
|
||||
if (pArray == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
if (taosArrayPush(pArray, pColVal) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
code = tdSTSRowNew(pArray, pTSchema, ppRow);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
return ret;
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
|
||||
|
@ -584,6 +603,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
input[1].next = true;
|
||||
}
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
|
||||
do {
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
if (input[i].next && !input[i].stop) {
|
||||
|
@ -640,13 +661,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
// merge if nMerge > 1
|
||||
if (nMerge > 0) {
|
||||
if (nMerge == 1) {
|
||||
*ppRow = tsRowFromTsdbRow(merge[nMerge]);
|
||||
code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
// merge 2 or 3 rows
|
||||
SRowMerger merger = {0};
|
||||
|
||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
|
||||
|
||||
tRowMergerInit(&merger, merge[0], pTSchema);
|
||||
for (int i = 1; i < nMerge; ++i) {
|
||||
tRowMerge(&merger, merge[i]);
|
||||
|
@ -657,9 +677,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
|||
}
|
||||
} while (*ppRow == NULL);
|
||||
|
||||
return code;
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
|
||||
return code;
|
||||
_err:
|
||||
taosMemoryFreeClear(pTSchema);
|
||||
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -749,8 +749,9 @@ _exit:
|
|||
|
||||
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
|
||||
code = tdSTSRowNew(pMerger->pArray, pMerger->pTSchema, ppRow);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1278,4 +1279,4 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
|
|||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue