シリアル通信したい(pyserial

1import serial
2
3# ポート名と通信速度を指定して接続
4with serial.Serial(port="/dev/ttyUSB0", baudrate=9600, timeout=1) as device:
5    # デバイスからデータを読み込む
6    line = device.readline().decode("utf-8").strip()
7    print(line)

pyserialパッケージでパソコンとマイコンなどをUSBで接続して、データをやり取りできます。 ポート名(/dev/ttyUSB0COM1など)と通信速度(baudrate)を指定してセッションを開きます。 with構文を使うと、接続や切断が自動で管理されるので安全です。

注釈

シリアル通信について: シリアル通信は、データを1ビットずつ順番に送る方式です。マイコンやIoT機器との通信に使われます。

設定オプションについて:

  • baudrate(通信速度):1秒間に送信するビット数です。デバイスと同じ速度に合わせる必要があります。一般的な値は9600、115200などです。デバイスのドキュメントを確認して設定してください。

  • timeout(タイムアウト):データ受信を待つ最大時間(秒)です。timeout=1と設定すると、1秒間データがなければNoneを返します。受信ループで無限に待つのを防ぐために必要です。timeout=Noneにすると無限に待ちます。

  • parity(パリティ):データの誤り検出方式です。デフォルト値はPARITY_NONE(パリティなし)です。

  • stopbits(ストップビット):データフレームの終わりを示すビット数です。デフォルト値は1です。

  • bytesize(データビット):1フレームあたりのビット数です。デフォルト値は8です。

  • rtscts(フロー制御):ハードウェアフロー制御の有効/無効です。デフォルト値のFalseです。

  • dsrdtr(フロー制御):別のハードウェアフロー制御オプションです。デフォルト値のFalseです。

基本的にはデフォルト値でOK: ほとんどの場合、baudratetimeoutを指定するだけで十分です。その他のオプションはデバイスから特別な指定がある場合のみ変更してください。

インストールしたい(pyserial

  • uv pipでインストール

$ uv pip install pyserial

モジュール名はserialですが、パッケージ名はpyserialです。

  • pipでインストール

$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install pyserial

読み出したい(readline

1# 受信(パソコン ← デバイス)
2# バイト列から文字列に変換
3line = device.readline().decode("utf-8").strip()

readlineメソッドで接続先のデバイスからデータを読み出します。 データはバイト型で送られてくるので、文字列への変換(デコード)が必要です。 strip()で改行や余白を削除することが多いです。

1if device.in_waiting > 0:
2    print("Data available to read")

in_waitingプロパティで、現在バッファーに溜まっているデータのバイト数を確認できます。 つまり、0より大きいということは受信待ちデータがあるということです。

書き込みたい(write

1# 送信(パソコン → デバイス)
2# バイト列を送信
3device.write(b"HELLO\n")
4
5# 文字列をバイト列に変換して送信
6device.write("HELLO\n".encode("utf-8"))
7
8# 送信時のブロッキング
9device.flush()

writeメソッドでデバイスにデータを書き込みます。 書き込むデータはバイト型への変換(エンコード)が必要です。 改行コード(\n\r\n)を区切り文字として利用することが多いです。

1if device.out_waiting > 0:
2    print("Still sending data")

out_waitingプロパティで、現在のバッファーに残っているデータのバイト数を確認できます。 つまり、0より大きい場合は未送信データが残っているということです。

例外処理したい(serial.SerialException

1try:
2    with serial.Serial(port="dev/ttyUSB0") as device:
3        ...
4except serial.SerialException as err:
5    print(err)

serial.SerialExceptionでエラーを検出できます。

シリアル通信は、デバイスが接続されていなかったり、 通信に時間がかかりタイムアウトしたり、など 物理的な要因でエラーが発生することがあります。

例外処理は必ず実装しましょう。

連続で読み出したい

 1import serial
 2
 3try:
 4    # ポート名と通信速度を指定して接続
 5    with serial.Serial(port="/dev/ttyUSB0") as device:
 6        while True:
 7            # デバイスからデータを読み込む
 8            if device.in_waiting > 0:
 9                line = device.readline().decode("utf-8").strip()
10                print(line)
11except serial.SerialException as err:
12    print(f"Serial error: {err}")
13except KeyboardInterrupt:
14    print("Stopped by user")

whileループを使って、デバイスから連続でデータを読み出すことができます。in_waitingで受信データの有無を確認してから読み込むことで、効率的に処理できます。 KeyboardInterruptをキャッチしてCtrl-Cで中断できます。

ファイルに出力したい

 1import serial
 2import csv
 3import time
 4from pathlib import Path
 5from datetime import datetime
 6from typing import Optional, List
 7
 8# 設定
 9port_name = "/dev/ttyUSB0"
10baud = 9600
11
12# 保存先
13now =datetime.now()
14ymd = now.strftime("%Y%m%d")
15default_filename = now.strftime("%Y%m%d_%Hh%Mm%Ss.csv")
16csv_path = Path.cwd() / "data" / ymd / default_filename
17
18def read_line(
19    com: serial.Serial,
20    *,
21    encoding: str = "utf-8"
22    ) -> Optional[str]:
23    """Read one decoded line from the serial port."""
24    if com.in_waiting == 0:
25        return None
26    raw = com.readline()
27    if not raw:
28        return None
29    line = raw.decode(encoding, errors="ignore").strip()
30    return line or None
31
32def write_row(
33    writer: csv.writer,
34    line: str
35    ) -> List[str]:
36    """Write a CSV row to the file."""
37    parts = line.split()
38    timestamp = datetime.now().isoformat(timespec="seconds")
39    row = [timestamp] + parts + [len(parts)]
40    writer.writerow(row)
41    return row
42
43try:
44    with serial.Serial(port=port_name, baudrate=baud, timeout=1) as com:
45        print(f"[{com.port}] Opened")
46
47        # 接続が成功したときに保存先ディレクトリを作成
48        csv_path.parent.mkdir(parents=True, exist_ok=True)
49
50        with csv_path.open("a", newline="", encoding="utf-8") as f:
51            writer = csv.writer(f)
52
53            print(f"[{com.port}] Listening...")
54            while True:
55                line = read_line(com)
56                if line is None:
57                    time.sleep(0.01)
58                    continue
59
60                row = write_row(writer, line)
61                f.flush()
62                print(f"Added: {row}")
63
64except serial.SerialException as e:
65    print(f"Serial error: {e}")
66except KeyboardInterrupt:
67    print(f"Stopped by user.")

シリアル通信で取得したデータをCSV形式で保存するサンプルコードです。 ポート接続を確認してから、ファイルを作成する手順になっています。

read_line関数では、受信したデータを確認しています。 write_row関数では、データを整形してCSVファイルに出力しています。

ポートを確認したい(serial.tools.list_ports

1from serial.tools import list_ports
2
3ports = list_ports.comports()
4for p in ports:
5    print(f"{p.device} - {p.description}")

serial.tools.list_portsで利用可能なポートを確認できます。 Linuxは/dev/tty*、 WindowsはCOM*、 macOSは/dev/cu.usbserial-*という名前で表示されます。

複数デバイスしたい(threading

 1import serial
 2import threading
 3import time
 4
 5def read_loop(
 6    port_name: str,
 7    baudrate: int,
 8    stop: threading.Event
 9    ) -> None:
10    """Continuously read lines from a serial port and print them."""
11    try:
12        with serial.Serial(port_name, baudrate, timeout=1) as com:
13            print(f"[{port_name}] Opened")
14            while not stop.is_set():
15                raw = com.readline()
16                if not raw:
17                    # Avoid busy loop when data is not available
18                    time.sleep(0.01)
19                    continue
20                line = raw.decode("utf-8", errors="replace").strip()
21                if line:
22                    print(f"[{port_name}] {line}")
23    except serial.SerialException as e:
24        print(f"[{port_name}] Serial error: {e}")
25    finally:
26        print(f"[{port_name}] Closed")
27
28
29def wait_for_interrupt(
30    stop: threading.Event,
31    interval: float = 0.2
32    ) -> None:
33    """Block the main thread until Ctrl-C is pressed."""
34    try:
35        print("Press Ctrl-C to stop.")
36        while not stop.is_set():
37            # Avoid busy loop
38            time.sleep(interval)
39    except KeyboardInterrupt:
40        print("\nStopping all readers...")
41        stop.set()
42
43
44if __name__ == "__main__":
45    # 停止フラグ
46    stop_event = threading.Event()
47
48    # デバイスごとのスレッド
49    threads = [
50        threading.Thread(target=read_loop, args=("/dev/ttyUSB0", 9600, stop_event)),
51        threading.Thread(target=read_loop, args=("/dev/ttyUSB1", 115200, stop_event))
52    ]
53
54    # スレッドを起動
55    for t in threads:
56        t.start()
57
58    # 中断(Ctrl-C)待ち
59    wait_for_interrupt(stop_event)
60    for t in threads:
61        t.join()      # サブスレッドの終了を待つ
62
63    print("All readers stopped.")

threadingモジュールと組み合わせて、 複数のデバイスから、同時にシリアル通信でデータを読み出すことができます。

Ctrl-Cによる中断はメインスレッドにしか送られないため、 サブスレッド(threading.Thread)の中では受け取れません。 そのため、 except KeyboardInterruptread_loop関数の中ではなく、 メインスレッドの処理として記述します。

その際に、停止フラグ用のthreading.Event()を作成しておき、 KeyboardInterruptを検知したときに有効にすることで、 サブスレッドを順番かつ安全に停止できます。