small improvements, fix potential bug for inserting
This commit is contained in:
parent
e45b047e5f
commit
9e2188b196
|
|
@ -702,11 +702,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//row1 has higher priority
|
//row1 has higher priority
|
||||||
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, STable* pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
|
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo,
|
||||||
|
STSchema **ppSchema1, STSchema **ppSchema2,
|
||||||
|
STable* pTable, int32_t* pPoints, SMemRow* pLastRow) {
|
||||||
|
|
||||||
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
|
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
|
||||||
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
|
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
@ -715,7 +716,6 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
|
||||||
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
|
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
|
||||||
if(pMem == NULL) return NULL;
|
if(pMem == NULL) return NULL;
|
||||||
memRowCpy(pMem, row1);
|
memRowCpy(pMem, row1);
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
*pLastRow = pMem;
|
*pLastRow = pMem;
|
||||||
return pMem;
|
return pMem;
|
||||||
|
|
@ -750,7 +750,6 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
|
||||||
if(pMem == NULL) return NULL;
|
if(pMem == NULL) return NULL;
|
||||||
memRowCpy(pMem, tmp);
|
memRowCpy(pMem, tmp);
|
||||||
|
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
|
|
||||||
*pLastRow = pMem;
|
*pLastRow = pMem;
|
||||||
|
|
@ -758,10 +757,10 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* tsdbInsertDupKeyMergePacked(void** args) {
|
static void* tsdbInsertDupKeyMergePacked(void** args) {
|
||||||
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7], args[8]);
|
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
|
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) {
|
||||||
|
|
||||||
if(pSkipList->insertHandleFn == NULL) {
|
if(pSkipList->insertHandleFn == NULL) {
|
||||||
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
|
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
|
||||||
|
|
@ -769,17 +768,16 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa
|
||||||
dupHandleSavedFunc->args[3] = NULL;
|
dupHandleSavedFunc->args[3] = NULL;
|
||||||
dupHandleSavedFunc->args[4] = NULL;
|
dupHandleSavedFunc->args[4] = NULL;
|
||||||
dupHandleSavedFunc->args[5] = pTable;
|
dupHandleSavedFunc->args[5] = pTable;
|
||||||
dupHandleSavedFunc->args[6] = pAffectedRows;
|
|
||||||
dupHandleSavedFunc->args[7] = pPoints;
|
|
||||||
dupHandleSavedFunc->args[8] = pLastRow;
|
|
||||||
pSkipList->insertHandleFn = dupHandleSavedFunc;
|
pSkipList->insertHandleFn = dupHandleSavedFunc;
|
||||||
}
|
}
|
||||||
|
pSkipList->insertHandleFn->args[6] = pPoints;
|
||||||
|
pSkipList->insertHandleFn->args[7] = pLastRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
|
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
|
||||||
|
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
int64_t points = 0;
|
int32_t points = 0;
|
||||||
STable *pTable = NULL;
|
STable *pTable = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
SMemTable *pMemTable = NULL;
|
SMemTable *pMemTable = NULL;
|
||||||
|
|
@ -830,9 +828,10 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
|
||||||
|
|
||||||
SMemRow lastRow = NULL;
|
SMemRow lastRow = NULL;
|
||||||
int64_t osize = SL_SIZE(pTableData->pData);
|
int64_t osize = SL_SIZE(pTableData->pData);
|
||||||
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, pAffectedRows, &points, &lastRow);
|
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
|
||||||
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
|
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
|
||||||
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
|
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
|
||||||
|
(*pAffectedRows) += points;
|
||||||
|
|
||||||
|
|
||||||
if(lastRow != NULL) {
|
if(lastRow != NULL) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue