refact and add code
This commit is contained in:
parent
36ade4200e
commit
d084ec804b
|
@ -101,6 +101,15 @@ typedef struct {
|
|||
char data[];
|
||||
} SSubmitBlk;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SSubmitBlkIter;
|
||||
|
||||
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
|
||||
// Submit message for this TSDB
|
||||
typedef struct {
|
||||
int32_t length;
|
||||
|
@ -117,7 +126,7 @@ typedef struct {
|
|||
SSubmitBlk *pBlock;
|
||||
} SSubmitMsgIter;
|
||||
|
||||
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||
SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter);
|
||||
|
||||
// the TSDB repository info
|
||||
|
|
|
@ -322,14 +322,15 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tableId) {
|
|||
|
||||
// TODO: need to return the number of data inserted
|
||||
int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
|
||||
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
|
||||
SSubmitMsgIter msgIter;
|
||||
|
||||
// for (int i = 0; i < pMsg->numOfTables; i++) { // Loop to deal with the submit message
|
||||
// if (tsdbInsertDataToTable(repo, pBlock) < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
// pBlock = (SSubmitBlk *)(((char *)pBlock) + sizeof(SSubmitBlk) + pBlock->len);
|
||||
// }
|
||||
tsdbInitSubmitMsgIter(pMsg, &msgIter);
|
||||
SSubmitBlk *pBlock;
|
||||
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
|
||||
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -415,6 +416,28 @@ void tsdbClearTableCfg(STableCfg *config) {
|
|||
if (config->tagValues) tdFreeDataRow(config->tagValues);
|
||||
}
|
||||
|
||||
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
if (pBlock->len <= 0) return -1;
|
||||
pIter->totalLen = pBlock->len;
|
||||
pIter->len = 0;
|
||||
pIter->row = (SDataRow)(pBlock->data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
pIter->len += dataRowLen(row);
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL || pIter == NULL) return -1;
|
||||
|
||||
|
@ -433,11 +456,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
|||
SSubmitBlk *pBlock = pIter->pBlock;
|
||||
if (pBlock == NULL) return NULL;
|
||||
|
||||
pIter->len += pBlock->len;
|
||||
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->pBlock = NULL;
|
||||
} else {
|
||||
pIter->pBlock = (char *)pBlock + pBlock->len;
|
||||
pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk);
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
|
@ -633,19 +656,15 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
|||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId);
|
||||
if (pTable == NULL) {
|
||||
return -1;
|
||||
}
|
||||
if (pTable == NULL) return -1;
|
||||
|
||||
SDataRows rows = pBlock->data;
|
||||
SDataRowsIter rDataIter, *pIter;
|
||||
pIter = &rDataIter;
|
||||
SSubmitBlkIter blkIter;
|
||||
SDataRow row;
|
||||
|
||||
tdInitSDataRowsIter(rows, pIter);
|
||||
while ((row = tdDataRowsNext(pIter)) != NULL) {
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
|
||||
// TODO: deal with the error here
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,11 +54,11 @@ TEST(TsdbTest, createRepo) {
|
|||
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
|
||||
}
|
||||
|
||||
pBlock->len += dataRowLen(row);
|
||||
}
|
||||
pBlock->len += dataRowLen(row);
|
||||
|
||||
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
||||
}
|
||||
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
||||
|
||||
tsdbInsertData(pRepo, pMsg);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue