Merge pull request #29517 from taosdata/fix/tag
fix(stream): check the stream task last ready timestamp before start the checkpoint procedure.
This commit is contained in:
commit
fe4555f060
|
@ -1168,8 +1168,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -1181,23 +1179,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->status == TASK_STATUS__STOP) {
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
|
|
||||||
STaskId *pId = taosArrayGet(pInvalidList, j);
|
|
||||||
if (pId == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->id.streamId == pId->streamId) {
|
|
||||||
void *px = taosArrayPush(pInvalidList, &pEntry->id);
|
|
||||||
if (px == NULL) {
|
|
||||||
mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->status != TASK_STATUS__READY) {
|
if (pEntry->status != TASK_STATUS__READY) {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
||||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||||
|
@ -1215,9 +1196,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
removeTasksInBuf(pInvalidList, &execInfo);
|
|
||||||
taosArrayDestroy(pInvalidList);
|
|
||||||
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
return ready ? 0 : -1;
|
return ready ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
@ -1258,6 +1236,30 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) {
|
||||||
return pInt1->duration > pInt2->duration ? -1 : 1;
|
return pInt1->duration > pInt2->duration ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all tasks of this stream should be ready, otherwise do nothing
|
||||||
|
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
|
||||||
|
bool ready = false;
|
||||||
|
|
||||||
|
streamMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
||||||
|
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
|
||||||
|
if (lastReadyTs != -1) {
|
||||||
|
mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
|
||||||
|
pStream->uid, lastReadyTs, now - lastReadyTs);
|
||||||
|
} else {
|
||||||
|
mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
ready = false;
|
||||||
|
} else {
|
||||||
|
ready = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
return ready;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1284,20 +1286,17 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexLock(&execInfo.lock);
|
bool ready = isStreamReadyHelp(now, pStream);
|
||||||
int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
if (!ready) {
|
||||||
if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
streamMutexUnlock(&execInfo.lock);
|
|
||||||
|
|
||||||
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
||||||
void *p = taosArrayPush(pList, &in);
|
void *p = taosArrayPush(pList, &in);
|
||||||
if (p) {
|
if (p) {
|
||||||
int32_t currentSize = taosArrayGetSize(pList);
|
int32_t currentSize = taosArrayGetSize(pList);
|
||||||
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
|
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
|
||||||
"s), concurrently launch threshold:%d",
|
"s), concurrently launch threshold:%d",
|
||||||
pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
|
pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
|
||||||
tsMaxConcurrentCheckpoint);
|
tsMaxConcurrentCheckpoint);
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import json
|
import json
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import threading
|
||||||
import psutil
|
import psutil
|
||||||
import time
|
import time
|
||||||
import taos
|
import taos
|
||||||
|
@ -21,6 +21,7 @@ class MonitorSystemLoad:
|
||||||
def get_proc_status(self):
|
def get_proc_status(self):
|
||||||
process = psutil.Process(self.pid)
|
process = psutil.Process(self.pid)
|
||||||
|
|
||||||
|
with open('/tmp/pref.txt', 'w+') as f:
|
||||||
while True:
|
while True:
|
||||||
cpu_percent = process.cpu_percent(interval=1)
|
cpu_percent = process.cpu_percent(interval=1)
|
||||||
|
|
||||||
|
@ -30,22 +31,76 @@ class MonitorSystemLoad:
|
||||||
io_counters = process.io_counters()
|
io_counters = process.io_counters()
|
||||||
sys_load = psutil.getloadavg()
|
sys_load = psutil.getloadavg()
|
||||||
|
|
||||||
print("load: %s, CPU:%s, Mem:%.2f MiB(%.2f%%), Read: %.2fMiB(%d), Write: %.2fMib (%d)" % (
|
s = "load: %.2f, CPU:%s, Mem:%.2fMiB, %.2f%%, Read: %.2fMiB, %d, Write: %.2fMib, %d" % (
|
||||||
sys_load, cpu_percent, memory_info.rss / 1048576.0,
|
sys_load[0], cpu_percent, memory_info.rss / 1048576.0,
|
||||||
memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count,
|
memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count,
|
||||||
io_counters.write_bytes / 1048576.0, io_counters.write_count))
|
io_counters.write_bytes / 1048576.0, io_counters.write_count)
|
||||||
|
|
||||||
|
print(s)
|
||||||
|
f.write(s + '\n')
|
||||||
|
f.flush()
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
self.count -= 1
|
|
||||||
|
|
||||||
|
self.count -= 1
|
||||||
if self.count <= 0:
|
if self.count <= 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def do_monitor():
|
||||||
|
print("start monitor threads")
|
||||||
|
loader = MonitorSystemLoad('taosd', 80000)
|
||||||
|
loader.get_proc_status()
|
||||||
|
|
||||||
|
def get_table_list(cursor):
|
||||||
|
cursor.execute('use stream_test')
|
||||||
|
|
||||||
|
sql = "select table_name from information_schema.ins_tables where db_name = 'stream_test' and stable_name='stb' order by table_name"
|
||||||
|
cursor.execute(sql)
|
||||||
|
|
||||||
|
res = cursor.fetchall()
|
||||||
|
return res
|
||||||
|
|
||||||
|
def do_multi_insert(index, total, host, user, passwd, conf, tz):
|
||||||
|
conn = taos.connect(
|
||||||
|
host=host, user=user, password=passwd, config=conf, timezone=tz
|
||||||
|
)
|
||||||
|
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('use stream_test')
|
||||||
|
|
||||||
|
start_ts = 1609430400000
|
||||||
|
step = 5
|
||||||
|
|
||||||
|
cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)")
|
||||||
|
|
||||||
|
list = get_table_list(cursor)
|
||||||
|
|
||||||
|
list = list[index*total: (index+1)*total]
|
||||||
|
|
||||||
|
print("there are %d tables" % len(list))
|
||||||
|
|
||||||
|
for index, n in enumerate(list):
|
||||||
|
cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)")
|
||||||
|
count = 1
|
||||||
|
while True:
|
||||||
|
sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} "
|
||||||
|
f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}")
|
||||||
|
cursor.execute(sql)
|
||||||
|
|
||||||
|
res = cursor.fetchall()
|
||||||
|
if res[0][3] == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})"
|
||||||
|
cursor.execute(insert)
|
||||||
|
count += 1
|
||||||
|
conn.close()
|
||||||
|
|
||||||
class StreamStarter:
|
class StreamStarter:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.sql = None
|
self.sql = None
|
||||||
self.host='127.0.0.1'
|
self.host='ubuntu'
|
||||||
self.user = 'root'
|
self.user = 'root'
|
||||||
self.passwd = 'taosdata'
|
self.passwd = 'taosdata'
|
||||||
self.conf = '/etc/taos/taos.cfg'
|
self.conf = '/etc/taos/taos.cfg'
|
||||||
|
@ -55,18 +110,18 @@ class StreamStarter:
|
||||||
json_data = {
|
json_data = {
|
||||||
"filetype": "insert",
|
"filetype": "insert",
|
||||||
"cfgdir": "/etc/taos/cfg",
|
"cfgdir": "/etc/taos/cfg",
|
||||||
"host": "127.0.0.1",
|
"host": "ubuntu",
|
||||||
"port": 6030,
|
"port": 6030,
|
||||||
"rest_port": 6041,
|
"rest_port": 6041,
|
||||||
"user": "root",
|
"user": "root",
|
||||||
"password": "taosdata",
|
"password": "taosdata",
|
||||||
"thread_count": 20,
|
"thread_count": 5,
|
||||||
"create_table_thread_count": 40,
|
"create_table_thread_count": 5,
|
||||||
"result_file": "/tmp/taosBenchmark_result.log",
|
"result_file": "/tmp/taosBenchmark_result.log",
|
||||||
"confirm_parameter_prompt": "no",
|
"confirm_parameter_prompt": "no",
|
||||||
"insert_interval": 0,
|
"insert_interval": 1000,
|
||||||
"num_of_records_per_req": 10000,
|
"num_of_records_per_req": 1000,
|
||||||
"max_sql_len": 1024000,
|
"max_sql_len": 102400,
|
||||||
"databases": [
|
"databases": [
|
||||||
{
|
{
|
||||||
"dbinfo": {
|
"dbinfo": {
|
||||||
|
@ -96,7 +151,7 @@ class StreamStarter:
|
||||||
"insert_mode": "taosc",
|
"insert_mode": "taosc",
|
||||||
"interlace_rows": 400,
|
"interlace_rows": 400,
|
||||||
"tcp_transfer": "no",
|
"tcp_transfer": "no",
|
||||||
"insert_rows": 10000,
|
"insert_rows": 1000,
|
||||||
"partial_col_num": 0,
|
"partial_col_num": 0,
|
||||||
"childtable_limit": 0,
|
"childtable_limit": 0,
|
||||||
"childtable_offset": 0,
|
"childtable_offset": 0,
|
||||||
|
@ -281,6 +336,78 @@ class StreamStarter:
|
||||||
loader = MonitorSystemLoad('taosd', 80)
|
loader = MonitorSystemLoad('taosd', 80)
|
||||||
loader.get_proc_status()
|
loader.get_proc_status()
|
||||||
|
|
||||||
|
def do_query_then_insert(self):
|
||||||
|
self.prepare_data()
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Error running Bash command: {e}")
|
||||||
|
|
||||||
|
time.sleep(50)
|
||||||
|
|
||||||
|
conn = taos.connect(
|
||||||
|
host=self.host, user=self.user, password=self.passwd, config=self.conf, timezone=self.tz
|
||||||
|
)
|
||||||
|
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute('use stream_test')
|
||||||
|
|
||||||
|
start_ts = 1609430400000
|
||||||
|
step = 5
|
||||||
|
|
||||||
|
cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)")
|
||||||
|
|
||||||
|
try:
|
||||||
|
t = threading.Thread(target=do_monitor)
|
||||||
|
t.start()
|
||||||
|
except Exception as e:
|
||||||
|
print("Error: unable to start thread, %s" % e)
|
||||||
|
|
||||||
|
print("start to query")
|
||||||
|
|
||||||
|
list = get_table_list(cursor)
|
||||||
|
print("there are %d tables" % len(list))
|
||||||
|
|
||||||
|
for index, n in enumerate(list):
|
||||||
|
cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)")
|
||||||
|
count = 1
|
||||||
|
while True:
|
||||||
|
sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} "
|
||||||
|
f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}")
|
||||||
|
cursor.execute(sql)
|
||||||
|
|
||||||
|
res = cursor.fetchall()
|
||||||
|
if res[0][3] == 0:
|
||||||
|
break
|
||||||
|
|
||||||
|
insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})"
|
||||||
|
cursor.execute(insert)
|
||||||
|
count += 1
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def multi_insert(self):
|
||||||
|
self.prepare_data()
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True)
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Error running Bash command: {e}")
|
||||||
|
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
for n in range(5):
|
||||||
|
try:
|
||||||
|
print(f"start query_insert thread {n}")
|
||||||
|
t = threading.Thread(target=do_multi_insert, args=(n, 100, self.host, self.user, self.passwd, self.conf, self.tz))
|
||||||
|
t.start()
|
||||||
|
except Exception as e:
|
||||||
|
print("Error: unable to start thread, %s" % e)
|
||||||
|
|
||||||
|
loader = MonitorSystemLoad('taosd', 80)
|
||||||
|
loader.get_proc_status()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
StreamStarter().do_start()
|
# StreamStarter().do_start()
|
||||||
|
# StreamStarter().do_query_then_insert()
|
||||||
|
StreamStarter().multi_insert()
|
Loading…
Reference in New Issue