modify alert sql

This commit is contained in:
menshibin 2024-08-12 20:28:31 +08:00
parent eb4a7213fd
commit 29df00a89d
3 changed files with 13 additions and 23 deletions

View File

@ -63,7 +63,7 @@ async function subscribe(consumer) {
console.log(`data: ${key} ${value}`); console.log(`data: ${key} ${value}`);
} }
consumer.commit(); consumer.commit();
console.log("commit offset manually successfully."); console.log("Commit offset manually successfully.");
} }
} catch (err) { } catch (err) {
console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message); console.error("Failed to poll data; err.code, ErrCode:" + err.code + "; ErrMessage: " + err.message);
@ -80,7 +80,7 @@ async function test() {
let consumer = await createConsumer() let consumer = await createConsumer()
await subscribe(consumer) await subscribe(consumer)
await consumer.unsubscribe(); await consumer.unsubscribe();
console.log("unsubscribe consumer successfully."); console.log("Consumer unsubscribed successfully.");
} }
catch (err) { catch (err) {
console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message); console.error("Failed to unsubscribe consume, ErrCode:" + err.code + "; ErrMessage: " + err.message);

View File

@ -79,20 +79,11 @@ async function test() {
} }
let assignment = await consumer.assignment(); let assignment = await consumer.assignment();
for (let i in assignment) {
console.log("seek before:", assignment[i]);
}
await consumer.seekToBeginning(assignment); await consumer.seekToBeginning(assignment);
console.log("assignment seek to beginning successfully"); console.log("Assignment seek to beginning successfully");
assignment = await consumer.assignment();
for (let i in assignment) {
console.log("seek after:", assignment[i]);
}
await consumer.unsubscribe();
} }
catch (err) { catch (err) {
console.error("seek example failed, ErrCode:" + err.code + "; ErrMessage: " + err.message); console.error("Seek example failed, ErrCode:" + err.code + "; ErrMessage: " + err.message);
} }
finally { finally {
if (consumer) { if (consumer) {

View File

@ -37,9 +37,9 @@ def prepareMeta():
VALUES (NOW + 1a, 10.30000, 218, 0.25000) VALUES (NOW + 1a, 10.30000, 218, 0.25000)
""" """
affectedRows = conn.execute(sql) affectedRows = conn.execute(sql)
print(f"inserted into {affectedRows} rows to power.meters successfully.") print(f"Inserted into {affectedRows} rows to power.meters successfully.")
except Exception as err: except Exception as err:
print(f"prepare meta ErrMessage:{err}") print(f"Prepare insert data error, ErrMessage:{err}")
raise err raise err
finally: finally:
if conn: if conn:
@ -80,13 +80,13 @@ def subscribe(consumer):
try: try:
# subscribe to the topics # subscribe to the topics
consumer.subscribe(["topic_meters"]) consumer.subscribe(["topic_meters"])
print("subscribe topics successfully") print("Subscribe topics successfully")
for i in range(50): for i in range(50):
records = consumer.poll(1) records = consumer.poll(1)
if records: if records:
err = records.error() err = records.error()
if err is not None: if err is not None:
print(f"poll data error, {err}") print(f"Poll data error, {err}")
raise err raise err
val = records.value() val = records.value()
@ -110,7 +110,7 @@ def commit_offset(consumer):
if records: if records:
err = records.error() err = records.error()
if err is not None: if err is not None:
print(f"poll data error, {err}") print(f"Poll data error, {err}")
raise err raise err
val = records.value() val = records.value()
@ -120,7 +120,7 @@ def commit_offset(consumer):
# after processing the data, commit the offset manually # after processing the data, commit the offset manually
consumer.commit(records) consumer.commit(records)
print("commit offset manually successfully."); print("Commit offset manually successfully.");
except Exception as err: except Exception as err:
print(f"Failed to poll data, ErrMessage:{err}") print(f"Failed to poll data, ErrMessage:{err}")
@ -134,12 +134,11 @@ def seek_offset(consumer):
assignments = consumer.assignment() assignments = consumer.assignment()
if assignments: if assignments:
for partition in assignments: for partition in assignments:
print(f"first data polled: {partition.offset}")
partition.offset = 0 partition.offset = 0
consumer.seek(partition) consumer.seek(partition)
print(f"assignment seek to beginning successfully"); print(f"Assignment seek to beginning successfully");
except Exception as err: except Exception as err:
print(f"seek example failed; ErrMessage:{err}") print(f"Seek example failed; ErrMessage:{err}")
raise err raise err
# ANCHOR_END: assignment # ANCHOR_END: assignment
@ -148,7 +147,7 @@ def seek_offset(consumer):
def unsubscribe(consumer): def unsubscribe(consumer):
try: try:
consumer.unsubscribe() consumer.unsubscribe()
print("unsubscribe consumer successfully."); print("Consumer unsubscribed successfully.");
except Exception as err: except Exception as err:
print(f"Failed to unsubscribe consumer. ErrMessage:{err}") print(f"Failed to unsubscribe consumer. ErrMessage:{err}")