commit
16a73dcb28
|
@ -80,6 +80,7 @@ extern short tsNumOfVnodesPerCore;
|
||||||
extern short tsNumOfTotalVnodes;
|
extern short tsNumOfTotalVnodes;
|
||||||
extern short tsCheckHeaderFile;
|
extern short tsCheckHeaderFile;
|
||||||
extern uint32_t tsPublicIpInt;
|
extern uint32_t tsPublicIpInt;
|
||||||
|
extern short tsAffectedRowsMod;
|
||||||
|
|
||||||
extern int tsSessionsPerVnode;
|
extern int tsSessionsPerVnode;
|
||||||
extern int tsAverageCacheBlocks;
|
extern int tsAverageCacheBlocks;
|
||||||
|
|
|
@ -67,7 +67,10 @@ bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// for async test
|
|
||||||
|
/*
|
||||||
|
* for async test
|
||||||
|
* /
|
||||||
/*
|
/*
|
||||||
if (httpCheckUsedbSql(sql)) {
|
if (httpCheckUsedbSql(sql)) {
|
||||||
httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB);
|
httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB);
|
||||||
|
|
|
@ -911,6 +911,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
|
||||||
blockIter.nextKey = maxFileKey + 1;
|
blockIter.nextKey = maxFileKey + 1;
|
||||||
} else { // Case 3. need to search the block for slot and pos
|
} else { // Case 3. need to search the block for slot and pos
|
||||||
if (key == minKey || key == maxKey) {
|
if (key == minKey || key == maxKey) {
|
||||||
|
if (tsAffectedRowsMod) pointsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -939,6 +940,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
|
||||||
} while (left < right);
|
} while (left < right);
|
||||||
|
|
||||||
if (key == blockMinKey || key == blockMaxKey) { // duplicate key
|
if (key == blockMinKey || key == blockMaxKey) { // duplicate key
|
||||||
|
if (tsAffectedRowsMod) pointsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -955,6 +957,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
|
||||||
|
|
||||||
if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
|
if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
|
||||||
key == importHandle.pBlocks[blockIter.slot].keyLast) {
|
key == importHandle.pBlocks[blockIter.slot].keyLast) {
|
||||||
|
if (tsAffectedRowsMod) pointsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -976,6 +979,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
|
||||||
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
|
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
|
||||||
assert(pos != 0);
|
assert(pos != 0);
|
||||||
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
|
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
|
||||||
|
if (tsAffectedRowsMod) pointsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1106,6 +1110,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
|
||||||
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
|
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
|
||||||
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
|
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
|
||||||
blockIter.pos)) { // duplicate key
|
blockIter.pos)) { // duplicate key
|
||||||
|
if (tsAffectedRowsMod) pointsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
|
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
|
||||||
|
@ -1320,7 +1325,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
pImport->lastKey = lastKey;
|
pImport->lastKey = lastKey;
|
||||||
for (payloadIter = 0; payloadIter < rows; payloadIter++) {
|
for (payloadIter = 0; payloadIter < rows; payloadIter++) {
|
||||||
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
|
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
|
||||||
if (key == pObj->lastKey) continue;
|
if (key == pObj->lastKey) {
|
||||||
|
if (tsAffectedRowsMod) rowsImported++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (key > pObj->lastKey) { // Just as insert
|
if (key > pObj->lastKey) { // Just as insert
|
||||||
pImport->slot = pInfo->currentSlot;
|
pImport->slot = pInfo->currentSlot;
|
||||||
pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
|
pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
|
||||||
|
@ -1333,11 +1341,12 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pImport->firstKey != pImport->key) break;
|
if (pImport->firstKey != pImport->key) break;
|
||||||
|
if (tsAffectedRowsMod) rowsImported++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (payloadIter == rows) {
|
if (payloadIter == rows) {
|
||||||
pImport->importedRows = 0;
|
pImport->importedRows += rowsImported;
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -1470,6 +1479,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
if (tsAffectedRowsMod) rowsImported++;
|
||||||
payloadIter++;
|
payloadIter++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,12 @@ short tsCheckHeaderFile = 0;
|
||||||
int tsSessionsPerVnode = 1000;
|
int tsSessionsPerVnode = 1000;
|
||||||
int tsCacheBlockSize = 16384; // 256 columns
|
int tsCacheBlockSize = 16384; // 256 columns
|
||||||
int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS;
|
int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS;
|
||||||
|
/**
|
||||||
|
* Change the meaning of affected rows:
|
||||||
|
* 0: affected rows not include those duplicate records
|
||||||
|
* 1: affected rows include those duplicate records
|
||||||
|
*/
|
||||||
|
short tsAffectedRowsMod = 0;
|
||||||
|
|
||||||
int tsRowsInFileBlock = 4096;
|
int tsRowsInFileBlock = 4096;
|
||||||
float tsFileBlockMinPercent = 0.05;
|
float tsFileBlockMinPercent = 0.05;
|
||||||
|
@ -535,6 +541,9 @@ static void doInitGlobalConfig() {
|
||||||
tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT,
|
tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER,
|
||||||
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
0, 2, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
|
tsInitConfigOption(cfg++, "affectedRowsMod", &tsAffectedRowsMod, TSDB_CFG_VTYPE_SHORT,
|
||||||
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT,
|
||||||
|
0, 1, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
// 0-any, 1-mgmt, 2-dnode
|
// 0-any, 1-mgmt, 2-dnode
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
|
|
Loading…
Reference in New Issue