シリアル通信したい(pyserial
)
1import serial
2
3try:
4 with serial.Serial(port="/dev/ttyUSB0", baudrate=9600, timeout=1) as com:
5 while True:
6 if com.in_waiting > 0:
7 line = com.readline().decode("utf-8").strip()
8 if line:
9 print(f"Received: {line}")
10except serial.SerialException as e:
11 print(f"Serial error: {e}")
pyserial
パッケージ(serial
モジュール)でシリアル通信できます。
Serial
オブジェクトを生成するときに、ポート番号(/dev/ttyUSB0
やCOM
など)を指定します。
また、オプションで通信速度(baudrate
)やタイムアウト時間(timeout
)などを設定できます。
Serial
オブジェクトはコンテクストマネージャーに対応しているためwith...as
構文を使って
安全にポートを開閉できます。
接続に失敗したときや、通信中にエラーが起きた場合は、serial.SerialException
で例外を受け取れます。
参考
シリアル通信は、データを1ビットずつ連続的(=シリアル)に送る通信方式です。 身近な例としては、パソコンや周辺機器をつなぐUSBがあります。
IoT機器やマイコンでは、よりシンプルなUART通信がよく利用されます。 UARTでは、1文字(通常8ビッド=1バイト)を送る際に、 スタートビット → データビット(通常8ビット) → パリティビット(任意) → ストップビット の順で送信します。
送信側は、クロックのリズムに合わせて流しそうめんのように電気信号を送り出します。 受信側は、この信号を順番どおりに受け取り、1バイト単位のデータに組み立てます。 そのため、送信側と受信側で通信速度(ボーレート)をそろえることが重要です。
注釈
高速なシリアル通信が普及する以前は、パラレル通信が主流でした。 パラレル通信は、複数ビットを同時に送る通信方式で、短距離であれば非常に高速です。 一度に大量のデータを送ることができる反面、 配線ケーブルが多くなる、 長距離では信号の到着時がずれて誤動作しやすい、 といった課題があります。
現在では、こうした制約を避けるために 高速なシリアル通信を複数本並列に使う方式が一般的です。
インストールしたい
uv pip install pyserial
モジュール名はserial
ですが、パッケージ名はpyserial
です。
受信したい(readline
)
1# 受信(パソコン ← デバイス)
2# バイト列から文字列に変換
3line = com.readline().decode("utf-8").strip()
readline
でデバイスからデータを読み出します。
データはバイト列で送られてくるので、文字列への変換(デコード)が必要です。
strip()
で改行や余白を削除することが多いです。
1if com.in_waiting > 0:
2 print("Data available to read")
in_waiting
で、現在バッファーに溜まっているデータのバイト数を確認できます。
つまり、0より大きいということは受信待ちデータがあるということです。
送信したい(write
)
1# 送信(パソコン → デバイス)
2# バイト列を送信
3com.write(b"HELLO\n")
4
5# 文字列をバイト列に変換して送信
6com.write("HELLO\n".encode("utf-8"))
7
8# 送信時のブロッキング
9com.flush()
write
でデバイスにデータを書き込みます。
書き込むデータはバイト列への変換(エンコード)が必要です。
改行コード(\n
や\r\n
)を区切り文字として利用することが多いです。
1if com.out_waiting > 0:
2 print("Still sending data")
out_waiting
でバッファーに残っているデータのバイト数を確認できます。
つまり、0より大きい場合は未送信データが残っているということです。
ファイルに出力したい
1import serial
2import csv
3import time
4from pathlib import Path
5from datetime import datetime
6from typing import Optional, List
7
8# 設定
9port_name = "/dev/ttyUSB0"
10baud = 9600
11
12# 保存先
13now =datetime.now()
14ymd = now.strftime("%Y%m%d")
15default_filename = now.strftime("%Y%m%d_%Hh%Mm%Ss.csv")
16csv_path = Path.cwd() / "data" / ymd / default_filename
17
18def read_line(
19 com: serial.Serial,
20 *,
21 encoding: str = "utf-8"
22 ) -> Optional[str]:
23 """Read one decoded line from the serial port."""
24 if com.in_waiting == 0:
25 return None
26 raw = com.readline()
27 if not raw:
28 return None
29 line = raw.decode(encoding, errors="ignore").strip()
30 return line or None
31
32def write_row(
33 writer: csv.writer,
34 line: str
35 ) -> List[str]:
36 """Write a CSV row to the file."""
37 parts = line.split()
38 timestamp = datetime.now().isoformat(timespec="seconds")
39 row = [timestamp] + parts + [len(parts)]
40 writer.writerow(row)
41 return row
42
43try:
44 with serial.Serial(port=port_name, baudrate=baud, timeout=1) as com:
45 print(f"[{com.port}] Opened")
46
47 # 接続が成功したときに保存先ディレクトリを作成
48 csv_path.parent.mkdir(parents=True, exist_ok=True)
49
50 with csv_path.open("a", newline="", encoding="utf-8") as f:
51 writer = csv.writer(f)
52
53 print(f"[{com.port}] Listening...")
54 while True:
55 line = read_line(com)
56 if line is None:
57 time.sleep(0.01)
58 continue
59
60 row = write_row(writer, line)
61 f.flush()
62 print(f"Added: {row}")
63
64except serial.SerialException as e:
65 print(f"Serial error: {e}")
66except KeyboardInterrupt:
67 print(f"Stopped by user.")
シリアル通信で取得したデータをCSV形式で保存するサンプルコードです。 ポート接続を確認してから、ファイルを作成する手順になっています。
read_line
関数では、受信したデータを確認しています。
write_row
関数では、データを整形してCSVファイルに出力しています。
ポートを確認したい(serial.tools.list_ports
)
1from serial.tools import list_ports
2
3ports = list_ports.comports()
4for p in ports:
5 print(f"{p.device} - {p.description}")
serial.tools.list_ports
で利用可能なポートを確認できます。
Linuxは/dev/tty*
、
WindowsはCOM*
、
macOSは/dev/cu.usbserial-*
という名前で表示されます。
1from typing import List, Dict, Any
2from serial.tools import list_ports
3
4def find_serial_ports() -> List[Dict[str, Any]]:
5 """List available serial ports with rich metadata."""
6 ports = list_ports.comports()
7
8 if not ports:
9 raise RuntimeError("No ports found.")
10
11 results: List[Dict[str, Any]] = []
12 for p in ports:
13 results.append(
14 {
15 "device": p.device,
16 "name": getattr(p, "name", None),
17 "description": p.description,
18 "hwid": p.hwid,
19 "vid": p.vid,
20 "pid": p.pid,
21 "manufacturer": p.manufacturer,
22 "product": p.product,
23 "serial_number": p.serial_number,
24 "location": p.location,
25 "interface": p.interface,
26 }
27 )
28 return results
このような関数を定義して、ポート情報を辞書型に変換しておくと利便性があがります。
自動検出したい
1def auto_detect_serial_port(ports: List[Dict[str, Any]]) -> str:
2 """Return a single 'best guess' serial device path from find_serial_ports() results.
3
4 Preference order (first match wins):
5 1. /dev/ttyACM* Linux/WSL2: CDC ACM, Arduino-like
6 2. /dev/ttyUSB* Linux/WSL2: USB-serial adapters
7 3. /dev/cu.usbmodem* macOS: CDC ACM
8 4. /dev/cu.usbserial* macOS: USB-serial adapters
9 5. COM* Windows: COM ports
10 6. /dev/ttyS* WSL2: often mapped COM in WLS
11 """
12
13 if not ports:
14 raise RuntimeError("No ports provided to auto-detect from.")
15
16 prefixes = [
17 "/dev/ttyACM",
18 "/dev/ttyUSB",
19 "/dev/cu.usbmodem",
20 "/dev/cu.usbserial",
21 "COM",
22 r"\\.\COM",
23 "/dev/ttyS",
24 ]
25
26 for prefix in prefixes:
27 for p in ports:
28 device = (p.get("device") or "").strip()
29 if not device:
30 continue
31 if device.lower().startswith(prefix.lower()):
32 return device
33
34 # Fallback: return the very first device if present
35 first = (ports[0].get("device") or "").strip()
36 if first:
37 return first
38
39 raise RuntimeError("No available device path found in the provided port list.")
OSごとに使用されているポート名を自動検出できるようにした関数です。
前述のfind_serial_ports
とセットで利用することを想定しています。
複数デバイスしたい(threading
)
1import serial
2import threading
3import time
4
5def read_loop(
6 port_name: str,
7 baudrate: int,
8 stop: threading.Event
9 ) -> None:
10 """Continuously read lines from a serial port and print them."""
11 try:
12 with serial.Serial(port_name, baudrate, timeout=1) as com:
13 print(f"[{port_name}] Opened")
14 while not stop.is_set():
15 raw = com.readline()
16 if not raw:
17 # Avoid busy loop when data is not available
18 time.sleep(0.01)
19 continue
20 line = raw.decode("utf-8", errors="replace").strip()
21 if line:
22 print(f"[{port_name}] {line}")
23 except serial.SerialException as e:
24 print(f"[{port_name}] Serial error: {e}")
25 finally:
26 print(f"[{port_name}] Closed")
27
28
29def wait_for_interrupt(
30 stop: threading.Event,
31 interval: float = 0.2
32 ) -> None:
33 """Block the main thread until Ctrl-C is pressed."""
34 try:
35 print("Press Ctrl-C to stop.")
36 while not stop.is_set():
37 # Avoid busy loop
38 time.sleep(interval)
39 except KeyboardInterrupt:
40 print("\nStopping all readers...")
41 stop.set()
42
43
44if __name__ == "__main__":
45 # 停止フラグ
46 stop_event = threading.Event()
47
48 # デバイスごとのスレッド
49 threads = [
50 threading.Thread(target=read_loop, args=("/dev/ttyUSB0", 9600, stop_event)),
51 threading.Thread(target=read_loop, args=("/dev/ttyUSB1", 115200, stop_event))
52 ]
53
54 # スレッドを起動
55 for t in threads:
56 t.start()
57
58 # 中断(Ctrl-C)待ち
59 wait_for_interrupt(stop_event)
60 for t in threads:
61 t.join() # サブスレッドの終了を待つ
62
63 print("All readers stopped.")
threading
モジュールと組み合わせて、
複数のデバイスから、同時にシリアル通信でデータを読み出すことができます。
Ctrl-C
による中断はメインスレッドにしか送られないため、
サブスレッド(threading.Thread
)の中では受け取れません。
そのため、
except KeyboardInterrupt
はread_loop
関数の中ではなく、
メインスレッドの処理として記述します。
その際に、停止フラグ用のthreading.Event()
を作成しておき、
KeyboardInterrupt
を検知したときに有効にすることで、
サブスレッドを順番かつ安全に停止できます。
複数デバイスしたい(concurrent.futures.ThreadPoolExecutor
)
1import time
2import threading
3from concurrent.futures import ThreadPoolExecutor, wait
4import serial
5
6def read_loop(
7 port_name: str,
8 baudrate: int,
9 stop: threading.Event
10 ) -> None:
11 """Continuously read lines from a serial port and print them."""
12 try:
13 with serial.Serial(port_name, baudrate, timeout=1) as com:
14 print(f"[{port_name}] Opened")
15 while not stop.is_set():
16 raw = com.readline()
17 if not raw:
18 time.sleep(0.01)
19 continue
20 line = raw.decode("utf-8", errors="replace").strip()
21 if line:
22 print(f"[{port_name}] {line}")
23 except serial.SerialException as e:
24 print(f"[{port_name}] Serial error: {e}")
25 finally:
26 print(f"[{port_name}] Closed")
27
28def wait_for_interrupt(
29 stop: threading.Event,
30 interval: float = 0.2
31 )-> None:
32 """Block the main thread until Ctrl-C is pressed"""
33 try:
34 print("Press Ctrl-C to stop.")
35 while not stop.is_set():
36 time.sleep(interval)
37 except KeyboardInterrupt:
38 print("\nStopping all readers...")
39 stop.set()
40
41if __name__ == "__main__":
42 # 停止フラグ
43 stop_event = threading.Event()
44
45 # ポートの定義
46 ports = [
47 ("/dev/ttyUSB0", 9600),
48 ("/dev/ttyUSB1", 115200),
49 ]
50
51 # スレッドプールで実行
52 with ThreadPoolExecutor(max_workers=len(ports)) as ex:
53 futures = [ex.submit(read_loop, port, baud, stop_event) for port, baud in ports]
54
55 # 状態確認
56 # for f in futures:
57 # print(f.done(), f.running())
58
59 try:
60 wait_for_interrupt(stop_event) # Ctrl-C 待ち
61 finally:
62 stop_event.set()
63 wait(futures)
64
65 print("All readers stopped.")
ThreadPoolExecutor
を使って、複数デバイスの処理を書き換えてみました。
read_loop
関数とwait_for_interrupt
関数の内容は同じです。
スレッドの生成と管理をThreadPoolExecutor
に任せ、
ex.submit
でジョブを投入しています。
submit(fn, /, *args, **kwargs)
というシグネチャを持つため、
実行したい関数(fn
)と
位置引数(*args
)、
キーワード引数(**kwargs
)を
そのまま指定すればOKです。
変数futures
には、非同期処理の結果(concurrent.futures.Future
オブジェクト)が入っており、実行結果や例外を後で取得できるようになっています。