improve comments

This commit is contained in:
sheyanjie-qq 2025-03-24 14:05:58 +08:00
parent e9d539e7c5
commit 83e4c563d9
5 changed files with 11 additions and 15 deletions

View File

@ -76,9 +76,9 @@ class ConsumerTask implements Runnable, Stoppable {
long lastTimePolled = System.currentTimeMillis(); long lastTimePolled = System.currentTimeMillis();
while (active) { while (active) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> metersRecord : records) {
i++; i++;
Meters meters = Meters.fromString(record.value()); Meters meters = Meters.fromString(metersRecord.value());
pstmt.setString(1, meters.getTableName()); pstmt.setString(1, meters.getTableName());
pstmt.setTimestamp(2, meters.getTs()); pstmt.setTimestamp(2, meters.getTs());
pstmt.setFloat(3, meters.getCurrent()); pstmt.setFloat(3, meters.getCurrent());
@ -90,8 +90,8 @@ class ConsumerTask implements Runnable, Stoppable {
pstmt.executeBatch(); pstmt.executeBatch();
} }
if (i % (10L * batchSizeByRow) == 0){ if (i % (10L * batchSizeByRow) == 0){
//pstmt.executeUpdate(); pstmt.executeUpdate();
consumer.commitAsync(); consumer.commitSync();
} }
} }
@ -107,7 +107,6 @@ class ConsumerTask implements Runnable, Stoppable {
} catch (Exception e) { } catch (Exception e) {
logger.error("Consumer Task {} Error", taskId, e); logger.error("Consumer Task {} Error", taskId, e);
} finally { } finally {
// 关闭消费者
consumer.close(); consumer.close();
} }
} }

View File

@ -14,8 +14,6 @@ class CreateSubTableTask implements Runnable {
private final int subTableEndIndex; private final int subTableEndIndex;
private final String dbName; private final String dbName;
public CreateSubTableTask(int taskId, public CreateSubTableTask(int taskId,
int subTableStartIndex, int subTableStartIndex,
int subTableEndIndex, int subTableEndIndex,

View File

@ -14,13 +14,12 @@ import java.util.concurrent.atomic.AtomicInteger;
public class FastWriteExample { public class FastWriteExample {
final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class); static final Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
final static DataBaseMonitor databaseMonitor = new DataBaseMonitor(); static final DataBaseMonitor databaseMonitor = new DataBaseMonitor();
static ThreadPoolExecutor writerThreads; static ThreadPoolExecutor writerThreads;
static ThreadPoolExecutor producerThreads; static ThreadPoolExecutor producerThreads;
final static ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); static final ThreadPoolExecutor statThread = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
static private final List<Stoppable> allTasks = new ArrayList<>();
private final static List<Stoppable> allTasks = new ArrayList<>();
private static int readThreadCount = 5; private static int readThreadCount = 5;
private static int writeThreadPerReadThread = 5; private static int writeThreadPerReadThread = 5;

View File

@ -33,7 +33,7 @@ class MockDataSource implements Iterator<Meters> {
@Override @Override
public Meters next() { public Meters next() {
// use interlace rows one to simulate the data distribution in real world // use interlace rows to simulate the data distribution in real world
if (index % (tableEndIndex - tableStartIndex + 1) == 0) { if (index % (tableEndIndex - tableStartIndex + 1) == 0) {
currentMs += 1000; currentMs += 1000;
} }

View File

@ -49,8 +49,8 @@ class ProducerTask implements Runnable, Stoppable {
// to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition. // to avoid the data of the sub-table out of order. we use the partition key to ensure the data of the same sub-table is sent to the same partition.
// Because efficient writing use String hashcodehere we use another hash algorithm to calculate the partition key. // Because efficient writing use String hashcodehere we use another hash algorithm to calculate the partition key.
long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0)); long hashCode = Math.abs(ReqId.murmurHash32(key.getBytes(), 0));
ProducerRecord<String, String> record = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value); ProducerRecord<String, String> metersRecord = new ProducerRecord<>(Util.getKafkaTopic(), (int)(hashCode % Util.getPartitionCount()), key, value);
producer.send(record); producer.send(metersRecord);
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("task id {}, send message error: ", taskId, e); logger.error("task id {}, send message error: ", taskId, e);