シリアル通信したい(pyserial

 1import serial
 2
 3try:
 4    with serial.Serial(port="/dev/ttyUSB0", baudrate=9600, timeout=1) as com:
 5        while True:
 6            if com.in_waiting > 0:
 7                line = com.readline().decode("utf-8").strip()
 8                if line:
 9                    print(f"Received: {line}")
10except serial.SerialException as e:
11    print(f"Serial error: {e}")

pyserialパッケージ(serialモジュール)でシリアル通信できます。 Serialオブジェクトを生成するときに、ポート番号(/dev/ttyUSB0COMなど)を指定します。 また、オプションで通信速度(baudrate)やタイムアウト時間(timeout)などを設定できます。

Serialオブジェクトはコンテクストマネージャーに対応しているためwith...as構文を使って 安全にポートを開閉できます。 接続に失敗したときや、通信中にエラーが起きた場合は、serial.SerialExceptionで例外を受け取れます。

参考

シリアル通信は、データを1ビットずつ連続的(=シリアル)に送る通信方式です。 身近な例としては、パソコンや周辺機器をつなぐUSBがあります。

IoT機器やマイコンでは、よりシンプルなUART通信がよく利用されます。 UARTでは、1文字(通常8ビッド=1バイト)を送る際に、 スタートビット → データビット(通常8ビット) → パリティビット(任意) → ストップビット の順で送信します。

送信側は、クロックのリズムに合わせて流しそうめんのように電気信号を送り出します。 受信側は、この信号を順番どおりに受け取り、1バイト単位のデータに組み立てます。 そのため、送信側と受信側で通信速度(ボーレート)をそろえることが重要です。

注釈

高速なシリアル通信が普及する以前は、パラレル通信が主流でした。 パラレル通信は、複数ビットを同時に送る通信方式で、短距離であれば非常に高速です。 一度に大量のデータを送ることができる反面、 配線ケーブルが多くなる、 長距離では信号の到着時がずれて誤動作しやすい、 といった課題があります。

現在では、こうした制約を避けるために 高速なシリアル通信を複数本並列に使う方式が一般的です。

インストールしたい

uv pip install pyserial

モジュール名はserialですが、パッケージ名はpyserialです。

受信したい(readline

1# 受信(パソコン ← デバイス)
2# バイト列から文字列に変換
3line = com.readline().decode("utf-8").strip()

readlineでデバイスからデータを読み出します。 データはバイト列で送られてくるので、文字列への変換(デコード)が必要です。 strip()で改行や余白を削除することが多いです。

1if com.in_waiting > 0:
2    print("Data available to read")

in_waitingで、現在バッファーに溜まっているデータのバイト数を確認できます。 つまり、0より大きいということは受信待ちデータがあるということです。

送信したい(write

1# 送信(パソコン → デバイス)
2# バイト列を送信
3com.write(b"HELLO\n")
4
5# 文字列をバイト列に変換して送信
6com.write("HELLO\n".encode("utf-8"))
7
8# 送信時のブロッキング
9com.flush()

writeでデバイスにデータを書き込みます。 書き込むデータはバイト列への変換(エンコード)が必要です。 改行コード(\n\r\n)を区切り文字として利用することが多いです。

1if com.out_waiting > 0:
2    print("Still sending data")

out_waitingでバッファーに残っているデータのバイト数を確認できます。 つまり、0より大きい場合は未送信データが残っているということです。

ファイルに出力したい

 1import serial
 2import csv
 3import time
 4from pathlib import Path
 5from datetime import datetime
 6from typing import Optional, List
 7
 8# 設定
 9port_name = "/dev/ttyUSB0"
10baud = 9600
11
12# 保存先
13now =datetime.now()
14ymd = now.strftime("%Y%m%d")
15default_filename = now.strftime("%Y%m%d_%Hh%Mm%Ss.csv")
16csv_path = Path.cwd() / "data" / ymd / default_filename
17
18def read_line(
19    com: serial.Serial,
20    *,
21    encoding: str = "utf-8"
22    ) -> Optional[str]:
23    """Read one decoded line from the serial port."""
24    if com.in_waiting == 0:
25        return None
26    raw = com.readline()
27    if not raw:
28        return None
29    line = raw.decode(encoding, errors="ignore").strip()
30    return line or None
31
32def write_row(
33    writer: csv.writer,
34    line: str
35    ) -> List[str]:
36    """Write a CSV row to the file."""
37    parts = line.split()
38    timestamp = datetime.now().isoformat(timespec="seconds")
39    row = [timestamp] + parts + [len(parts)]
40    writer.writerow(row)
41    return row
42
43try:
44    with serial.Serial(port=port_name, baudrate=baud, timeout=1) as com:
45        print(f"[{com.port}] Opened")
46
47        # 接続が成功したときに保存先ディレクトリを作成
48        csv_path.parent.mkdir(parents=True, exist_ok=True)
49
50        with csv_path.open("a", newline="", encoding="utf-8") as f:
51            writer = csv.writer(f)
52
53            print(f"[{com.port}] Listening...")
54            while True:
55                line = read_line(com)
56                if line is None:
57                    time.sleep(0.01)
58                    continue
59
60                row = write_row(writer, line)
61                f.flush()
62                print(f"Added: {row}")
63
64except serial.SerialException as e:
65    print(f"Serial error: {e}")
66except KeyboardInterrupt:
67    print(f"Stopped by user.")

シリアル通信で取得したデータをCSV形式で保存するサンプルコードです。 ポート接続を確認してから、ファイルを作成する手順になっています。

read_line関数では、受信したデータを確認しています。 write_row関数では、データを整形してCSVファイルに出力しています。

ポートを確認したい(serial.tools.list_ports

1from serial.tools import list_ports
2
3ports = list_ports.comports()
4for p in ports:
5    print(f"{p.device} - {p.description}")

serial.tools.list_portsで利用可能なポートを確認できます。 Linuxは/dev/tty*、 WindowsはCOM*、 macOSは/dev/cu.usbserial-* という名前で表示されます。

 1from typing import List, Dict, Any
 2from serial.tools import list_ports
 3
 4def find_serial_ports() -> List[Dict[str, Any]]:
 5    """List available serial ports with rich metadata."""
 6    ports = list_ports.comports()
 7
 8    if not ports:
 9        raise RuntimeError("No ports found.")
10
11    results: List[Dict[str, Any]] = []
12    for p in ports:
13        results.append(
14            {
15                "device": p.device,
16                "name": getattr(p, "name", None),
17                "description": p.description,
18                "hwid": p.hwid,
19                "vid": p.vid,
20                "pid": p.pid,
21                "manufacturer": p.manufacturer,
22                "product": p.product,
23                "serial_number": p.serial_number,
24                "location": p.location,
25                "interface": p.interface,
26            }
27        )
28    return results

このような関数を定義して、ポート情報を辞書型に変換しておくと利便性があがります。

自動検出したい

 1def auto_detect_serial_port(ports: List[Dict[str, Any]]) -> str:
 2    """Return a single 'best guess' serial device path from find_serial_ports() results.
 3
 4    Preference order (first match wins):
 5      1. /dev/ttyACM*         Linux/WSL2: CDC ACM, Arduino-like
 6      2. /dev/ttyUSB*         Linux/WSL2: USB-serial adapters
 7      3. /dev/cu.usbmodem*    macOS: CDC ACM
 8      4. /dev/cu.usbserial*   macOS: USB-serial adapters
 9      5. COM*                 Windows: COM ports
10      6. /dev/ttyS*           WSL2: often mapped COM in WLS
11    """
12
13    if not ports:
14        raise RuntimeError("No ports provided to auto-detect from.")
15
16    prefixes = [
17        "/dev/ttyACM",
18        "/dev/ttyUSB",
19        "/dev/cu.usbmodem",
20        "/dev/cu.usbserial",
21        "COM",
22        r"\\.\COM",
23        "/dev/ttyS",
24    ]
25
26    for prefix in prefixes:
27        for p in ports:
28            device = (p.get("device") or "").strip()
29            if not device:
30                continue
31            if device.lower().startswith(prefix.lower()):
32                return device
33
34    # Fallback: return the very first device if present
35    first = (ports[0].get("device") or "").strip()
36    if first:
37        return first
38
39    raise RuntimeError("No available device path found in the provided port list.")

OSごとに使用されているポート名を自動検出できるようにした関数です。 前述のfind_serial_portsとセットで利用することを想定しています。

複数デバイスしたい(threading

 1import serial
 2import threading
 3import time
 4
 5def read_loop(
 6    port_name: str,
 7    baudrate: int,
 8    stop: threading.Event
 9    ) -> None:
10    """Continuously read lines from a serial port and print them."""
11    try:
12        with serial.Serial(port_name, baudrate, timeout=1) as com:
13            print(f"[{port_name}] Opened")
14            while not stop.is_set():
15                raw = com.readline()
16                if not raw:
17                    # Avoid busy loop when data is not available
18                    time.sleep(0.01)
19                    continue
20                line = raw.decode("utf-8", errors="replace").strip()
21                if line:
22                    print(f"[{port_name}] {line}")
23    except serial.SerialException as e:
24        print(f"[{port_name}] Serial error: {e}")
25    finally:
26        print(f"[{port_name}] Closed")
27
28
29def wait_for_interrupt(
30    stop: threading.Event,
31    interval: float = 0.2
32    ) -> None:
33    """Block the main thread until Ctrl-C is pressed."""
34    try:
35        print("Press Ctrl-C to stop.")
36        while not stop.is_set():
37            # Avoid busy loop
38            time.sleep(interval)
39    except KeyboardInterrupt:
40        print("\nStopping all readers...")
41        stop.set()
42
43
44if __name__ == "__main__":
45    # 停止フラグ
46    stop_event = threading.Event()
47
48    # デバイスごとのスレッド
49    threads = [
50        threading.Thread(target=read_loop, args=("/dev/ttyUSB0", 9600, stop_event)),
51        threading.Thread(target=read_loop, args=("/dev/ttyUSB1", 115200, stop_event))
52    ]
53
54    # スレッドを起動
55    for t in threads:
56        t.start()
57
58    # 中断(Ctrl-C)待ち
59    wait_for_interrupt(stop_event)
60    for t in threads:
61        t.join()      # サブスレッドの終了を待つ
62
63    print("All readers stopped.")

threadingモジュールと組み合わせて、 複数のデバイスから、同時にシリアル通信でデータを読み出すことができます。

Ctrl-Cによる中断はメインスレッドにしか送られないため、 サブスレッド(threading.Thread)の中では受け取れません。 そのため、 except KeyboardInterruptread_loop関数の中ではなく、 メインスレッドの処理として記述します。

その際に、停止フラグ用のthreading.Event()を作成しておき、 KeyboardInterruptを検知したときに有効にすることで、 サブスレッドを順番かつ安全に停止できます。

複数デバイスしたい(concurrent.futures.ThreadPoolExecutor

 1import time
 2import threading
 3from concurrent.futures import ThreadPoolExecutor, wait
 4import serial
 5
 6def read_loop(
 7    port_name: str,
 8    baudrate: int,
 9    stop: threading.Event
10    ) -> None:
11    """Continuously read lines from a serial port and print them."""
12    try:
13        with serial.Serial(port_name, baudrate, timeout=1) as com:
14            print(f"[{port_name}] Opened")
15            while not stop.is_set():
16                raw = com.readline()
17                if not raw:
18                    time.sleep(0.01)
19                    continue
20                line = raw.decode("utf-8", errors="replace").strip()
21                if line:
22                    print(f"[{port_name}] {line}")
23    except serial.SerialException as e:
24        print(f"[{port_name}] Serial error: {e}")
25    finally:
26        print(f"[{port_name}] Closed")
27
28def wait_for_interrupt(
29    stop: threading.Event,
30    interval: float = 0.2
31    )-> None:
32    """Block the main thread until Ctrl-C is pressed"""
33    try:
34        print("Press Ctrl-C to stop.")
35        while not stop.is_set():
36            time.sleep(interval)
37    except KeyboardInterrupt:
38        print("\nStopping all readers...")
39        stop.set()
40
41if __name__ == "__main__":
42    # 停止フラグ
43    stop_event = threading.Event()
44
45    # ポートの定義
46    ports = [
47        ("/dev/ttyUSB0", 9600),
48        ("/dev/ttyUSB1", 115200),
49    ]
50
51    # スレッドプールで実行
52    with ThreadPoolExecutor(max_workers=len(ports)) as ex:
53        futures = [ex.submit(read_loop, port, baud, stop_event) for port, baud in ports]
54
55        # 状態確認
56        # for f in futures:
57        #   print(f.done(), f.running())
58
59        try:
60            wait_for_interrupt(stop_event)  # Ctrl-C 待ち
61        finally:
62            stop_event.set()
63            wait(futures)
64
65    print("All readers stopped.")

ThreadPoolExecutorを使って、複数デバイスの処理を書き換えてみました。 read_loop関数とwait_for_interrupt関数の内容は同じです。

スレッドの生成と管理をThreadPoolExecutorに任せ、 ex.submitでジョブを投入しています。 submit(fn, /, *args, **kwargs)というシグネチャを持つため、 実行したい関数(fn)と 位置引数(*args)、 キーワード引数(**kwargs)を そのまま指定すればOKです。

変数futuresには、非同期処理の結果(concurrent.futures.Futureオブジェクト)が入っており、実行結果や例外を後で取得できるようになっています。