diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9e6188e9d9..4f5198acc0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1168,8 +1168,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } - SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); - for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); if (p == NULL) { @@ -1181,23 +1179,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } - if (pEntry->status == TASK_STATUS__STOP) { - for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { - STaskId *pId = taosArrayGet(pInvalidList, j); - if (pId == NULL) { - continue; - } - - if (pEntry->id.streamId == pId->streamId) { - void *px = taosArrayPush(pInvalidList, &pEntry->id); - if (px == NULL) { - mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - } - break; - } - } - } - if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); @@ -1215,9 +1196,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } - removeTasksInBuf(pInvalidList, &execInfo); - taosArrayDestroy(pInvalidList); - streamMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1258,6 +1236,30 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) { return pInt1->duration > pInt2->duration ? -1 : 1; } +// all tasks of this stream should be ready, otherwise do nothing +static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) { + bool ready = false; + + streamMutexLock(&execInfo.lock); + + int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); + if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) { + if (lastReadyTs != -1) { + mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold", + pStream->uid, lastReadyTs, now - lastReadyTs); + } else { + mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid); + } + + ready = false; + } else { + ready = true; + } + + streamMutexUnlock(&execInfo.lock); + return ready; +} + static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1284,20 +1286,17 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { continue; } - streamMutexLock(&execInfo.lock); - int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); - if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { - streamMutexUnlock(&execInfo.lock); + bool ready = isStreamReadyHelp(now, pStream); + if (!ready) { sdbRelease(pSdb, pStream); continue; } - streamMutexUnlock(&execInfo.lock); SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; void *p = taosArrayPush(pList, &in); if (p) { int32_t currentSize = taosArrayGetSize(pList); - mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64 + mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64 "s), concurrently launch threshold:%d", pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000, tsMaxConcurrentCheckpoint); diff --git a/tests/perf-test/stream.py b/tests/perf-test/stream.py index a34fe5381b..90ca773184 100644 --- a/tests/perf-test/stream.py +++ b/tests/perf-test/stream.py @@ -1,6 +1,6 @@ import json import subprocess - +import threading import psutil import time import taos @@ -21,31 +21,86 @@ class MonitorSystemLoad: def get_proc_status(self): process = psutil.Process(self.pid) + with open('/tmp/pref.txt', 'w+') as f: + while True: + cpu_percent = process.cpu_percent(interval=1) + + memory_info = process.memory_info() + memory_percent = process.memory_percent() + + io_counters = process.io_counters() + sys_load = psutil.getloadavg() + + s = "load: %.2f, CPU:%s, Mem:%.2fMiB, %.2f%%, Read: %.2fMiB, %d, Write: %.2fMib, %d" % ( + sys_load[0], cpu_percent, memory_info.rss / 1048576.0, + memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count, + io_counters.write_bytes / 1048576.0, io_counters.write_count) + + print(s) + f.write(s + '\n') + f.flush() + + time.sleep(1) + + self.count -= 1 + if self.count <= 0: + break + + +def do_monitor(): + print("start monitor threads") + loader = MonitorSystemLoad('taosd', 80000) + loader.get_proc_status() + +def get_table_list(cursor): + cursor.execute('use stream_test') + + sql = "select table_name from information_schema.ins_tables where db_name = 'stream_test' and stable_name='stb' order by table_name" + cursor.execute(sql) + + res = cursor.fetchall() + return res + +def do_multi_insert(index, total, host, user, passwd, conf, tz): + conn = taos.connect( + host=host, user=user, password=passwd, config=conf, timezone=tz + ) + + cursor = conn.cursor() + cursor.execute('use stream_test') + + start_ts = 1609430400000 + step = 5 + + cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)") + + list = get_table_list(cursor) + + list = list[index*total: (index+1)*total] + + print("there are %d tables" % len(list)) + + for index, n in enumerate(list): + cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)") + count = 1 while True: - cpu_percent = process.cpu_percent(interval=1) + sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} " + f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}") + cursor.execute(sql) - memory_info = process.memory_info() - memory_percent = process.memory_percent() - - io_counters = process.io_counters() - sys_load = psutil.getloadavg() - - print("load: %s, CPU:%s, Mem:%.2f MiB(%.2f%%), Read: %.2fMiB(%d), Write: %.2fMib (%d)" % ( - sys_load, cpu_percent, memory_info.rss / 1048576.0, - memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count, - io_counters.write_bytes / 1048576.0, io_counters.write_count)) - - time.sleep(1) - self.count -= 1 - - if self.count <= 0: + res = cursor.fetchall() + if res[0][3] == 0: break + insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})" + cursor.execute(insert) + count += 1 + conn.close() class StreamStarter: def __init__(self) -> None: self.sql = None - self.host='127.0.0.1' + self.host='ubuntu' self.user = 'root' self.passwd = 'taosdata' self.conf = '/etc/taos/taos.cfg' @@ -55,18 +110,18 @@ class StreamStarter: json_data = { "filetype": "insert", "cfgdir": "/etc/taos/cfg", - "host": "127.0.0.1", + "host": "ubuntu", "port": 6030, "rest_port": 6041, "user": "root", "password": "taosdata", - "thread_count": 20, - "create_table_thread_count": 40, + "thread_count": 5, + "create_table_thread_count": 5, "result_file": "/tmp/taosBenchmark_result.log", "confirm_parameter_prompt": "no", - "insert_interval": 0, - "num_of_records_per_req": 10000, - "max_sql_len": 1024000, + "insert_interval": 1000, + "num_of_records_per_req": 1000, + "max_sql_len": 102400, "databases": [ { "dbinfo": { @@ -96,7 +151,7 @@ class StreamStarter: "insert_mode": "taosc", "interlace_rows": 400, "tcp_transfer": "no", - "insert_rows": 10000, + "insert_rows": 1000, "partial_col_num": 0, "childtable_limit": 0, "childtable_offset": 0, @@ -281,6 +336,78 @@ class StreamStarter: loader = MonitorSystemLoad('taosd', 80) loader.get_proc_status() + def do_query_then_insert(self): + self.prepare_data() + + try: + subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True) + except subprocess.CalledProcessError as e: + print(f"Error running Bash command: {e}") + + time.sleep(50) + + conn = taos.connect( + host=self.host, user=self.user, password=self.passwd, config=self.conf, timezone=self.tz + ) + + cursor = conn.cursor() + cursor.execute('use stream_test') + + start_ts = 1609430400000 + step = 5 + + cursor.execute("create stable if not exists stb_result(wstart timestamp, minx float, maxx float, countx bigint) tags(gid bigint unsigned)") + + try: + t = threading.Thread(target=do_monitor) + t.start() + except Exception as e: + print("Error: unable to start thread, %s" % e) + + print("start to query") + + list = get_table_list(cursor) + print("there are %d tables" % len(list)) + + for index, n in enumerate(list): + cursor.execute(f"create table if not exists {n[0]}_1 using stb_result tags(1)") + count = 1 + while True: + sql = (f"select cast({start_ts + step * 1000 * (count - 1)} as timestamp), min(c1), max(c2), count(c3) from stream_test.{n[0]} " + f"where ts >= {start_ts + step * 1000 * (count - 1)} and ts < {start_ts + step * 1000 * count}") + cursor.execute(sql) + + res = cursor.fetchall() + if res[0][3] == 0: + break + + insert = f"insert into {n[0]}_1 values ({start_ts + step * 1000 * (count - 1)}, {res[0][1]}, {res[0][2]}, {res[0][3]})" + cursor.execute(insert) + count += 1 + conn.close() + + def multi_insert(self): + self.prepare_data() + + try: + subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True) + except subprocess.CalledProcessError as e: + print(f"Error running Bash command: {e}") + + time.sleep(10) + + for n in range(5): + try: + print(f"start query_insert thread {n}") + t = threading.Thread(target=do_multi_insert, args=(n, 100, self.host, self.user, self.passwd, self.conf, self.tz)) + t.start() + except Exception as e: + print("Error: unable to start thread, %s" % e) + + loader = MonitorSystemLoad('taosd', 80) + loader.get_proc_status() if __name__ == "__main__": - StreamStarter().do_start() + # StreamStarter().do_start() + # StreamStarter().do_query_then_insert() + StreamStarter().multi_insert() \ No newline at end of file