more work

This commit is contained in:
Hongze Cheng 2022-07-02 10:04:51 +00:00
parent 62519ae4bd
commit 529394a011
1 changed files with 53 additions and 22 deletions

View File

@ -14,6 +14,11 @@
*/ */
#include "tsdb.h" #include "tsdb.h"
typedef struct {
int64_t suid;
int64_t uid;
STSchema *pTSchema;
} SSkmInfo;
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
@ -38,9 +43,8 @@ typedef struct {
SArray *aBlockIdxN; // SArray<SBlockIdx> SArray *aBlockIdxN; // SArray<SBlockIdx>
SMapData nBlockMap; // SMapData<SBlock> SMapData nBlockMap; // SMapData<SBlock>
SBlockData nBlockData; SBlockData nBlockData;
int64_t suid; SSkmInfo skmTable;
int64_t uid; SSkmInfo skmRow;
STSchema *pTSchema;
/* commit del */ /* commit del */
SDelFReader *pDelFReader; SDelFReader *pDelFReader;
SDelFWriter *pDelFWriter; SDelFWriter *pDelFWriter;
@ -307,24 +311,49 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0; int32_t code = 0;
if (pCommitter->pTSchema) { if (pCommitter->skmTable.pTSchema) {
if (pCommitter->suid == suid) { if (pCommitter->skmTable.suid == suid) {
if (suid == 0) { if (suid == 0) {
if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit; if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
} else { } else {
if (sver == pCommitter->pTSchema->version) goto _exit; if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
} }
} }
} }
pCommitter->suid = suid; pCommitter->skmTable.suid = suid;
pCommitter->uid = uid; pCommitter->skmTable.uid = uid;
tTSchemaDestroy(pCommitter->pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver); pCommitter->skmTable.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
if (pCommitter->pTSchema == NULL) { if (pCommitter->skmTable.pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
_exit:
return code;
}
static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
if (pCommitter->skmRow.pTSchema) {
if (pCommitter->skmRow.suid == suid) {
if (suid == 0) {
if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid;
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
pCommitter->skmRow.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
if (pCommitter->skmRow.pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
@ -377,7 +406,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err; if (code) goto _err;
tBlockReset(pBlock); tBlockReset(pBlock);
@ -407,14 +436,14 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
} }
_append_mem_row: _append_mem_row:
code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema); code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
pRow1 = tsdbTbDataIterGet(pIter); pRow1 = tsdbTbDataIterGet(pIter);
if (pRow1) { if (pRow1) {
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
if (code) goto _err; if (code) goto _err;
} else { } else {
pRow1 = NULL; pRow1 = NULL;
@ -481,11 +510,11 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
} }
// update schema // update schema
code = tsdbCommitterUpdateSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow)); code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
// append // append
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
@ -589,11 +618,11 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
while (true) { while (true) {
if (pRow) break; if (pRow) break;
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
if (code) goto _err; if (code) goto _err;
tsdbTbDataIterNext(pIter); tsdbTbDataIterNext(pIter);
@ -602,7 +631,8 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
int32_t c = tBlockCmprFn(&(SBlock){}, pBlock); int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);
if (c == 0) { if (c == 0) {
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); code =
tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
if (code) goto _err; if (code) goto _err;
} else if (c > 0) { } else if (c > 0) {
pRow = NULL; pRow = NULL;
@ -955,7 +985,8 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdxN); taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap); tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData); tBlockDataClear(&pCommitter->nBlockData);
tTSchemaDestroy(pCommitter->pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {