ククログ

株式会社クリアコード > ククログ > メールフィルタープラグインであるmilterをPythonで簡単実装! - milterを実装できるようになろう編

メールフィルタープラグインであるmilterをPythonで簡単実装! - milterを実装できるようになろう編

以前の記事で、milter managerというメールフィルタを管理するための自由ソフトウェアを、GObject Introspectionに対応させてバインディングを生成することについて紹介しました。

milter managerは従来からRubyでmilterを作るためのライブラリーを提供してきましたが、今回のGObject Introspection対応によって、Pythonでmilterを作るためのライブラリーも提供するようになりました。

前回の記事では、実際にPythonで書いたmilterを動かしてみることについて紹介しています。

今回は、milter managerの機能を使ってPythonでmilterを作る方法について詳しく紹介します。

なお本記事の内容は、milter manager v2.2.5についての説明になります。

milter実装の基本的な構成例

下記が、Pythonによるmilter実装の基本的な構成例の1つになります。

#!/usr/bin/env python3

import milter.client

# milter.client.Sessionを継承したクラスを実装する。
class MilterTemplate(milter.client.Session):
    # contextの後の引数は、milter.client.Clientのregisterメソッドで渡す
    # 2番目以降の引数に対応する。
    # (client.register(MilterTemplate, options))
    def __init__(self, context, options):
        super().__init__(context)

    # 1つのSMTPセッションの中で複数メールが送信される場合に、その切れ目で呼ばれる。
    # 初期化(super().__init__())時にも呼ばれる。
    # 1メール毎にセッションを分けるか、1セッションの中で複数メールを送るかは
    # クライアント次第。
    def reset(self):
        pass

    # 各処理でのエラー発生時に呼ばれる。
    def on_error(self, event, exception):
        pass

    # 以下の関数は、SMTPプロトコルの各段階と対応している。

    # MAIL FROM
    def envelope_from(self, from_):
        pass

    # RCPT TO
    def envelope_recipient(self, to):
        pass

    # DATA
    # MAIL FROMとRCPT TOが揃った時点であるため、初期化処理を行うのに良いタイミング。
    def data(self):
        pass

    # ヘッダー1行毎に複数回呼ばれる。
    def header(self, name, value):
        pass

    # ヘッダーを全て処理した時点で呼ばれる。
    def end_of_header(self):
        pass

    # 本文。ある程度大きい場合は、チャンクに分かれて複数回呼ばれる。
    def body(self, chunk):
        pass

    # 本文を全て処理した時点で呼ばれる。
    def end_of_message(self):
        pass


# コマンドラインをパースして、milter.client.Clientインスタンスを生成する。
# Clientインスタンスは、SMTPのセッション毎に
# milter.client.Sessionインスタンスを立てて処理を行わせる。
command_line = milter.client.CommandLine()
with command_line.run() as (client, options):
    # このClientインスタンスが使うSessionクラスを登録する。
    # SMTPのセッションが開始されると、ここで登録したクラスを
    # コンストラクトして処理をさせることになる。
    # そのためmilter開発の基本的な流れは、
    # milter.client.Sessionを継承したクラスを実装して、ここで登録することになる。
    client.register(MilterTemplate, options)

要点は次の通りです。

  • milter.client.Sessionクラスを継承したクラスを実装します。
    • 各コールバック関数に必要な実装を行います。
  • このクラスを、milter.client.Clientのインスタンスにregister()メソッドで登録します。

重要なポイントは、milter.client.ClientインスタンスがMTAとのやり取りを管理し、SMTPのセッション毎にmilter.client.Sessionクラスを使って処理を行う、ということです。 よってこのSessionの実装が、milterの主な実装になるわけです。 SMTPのセッションが新たに開始されるたびに、実装したクラスがコンストラクトされて使われます。 そのSMTPセッションの各段階で各メソッドがコールバックされるので、必要な部分に必要な処理を実装することになります。

基本的な構成がわかったところでよくある処理をどのように実装するかを紹介していきます。

ヘッダーや本文を更新する

メールフィルタープラグインであるmilterをPythonで簡単実装! - milterを動かしてみよう編 で紹介した、特定のワードを置換するmilterを例に説明します。 この記事で、次の実装のmilterを動かして実際にヘッダーと本文のworldという単語がClearCodeという単語に置換されていることを確認しました。

#!/usr/bin/env python3
#
# Copyright (C) 2022  Sutou Kouhei <kou@clear-code.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import re

import milter.client

class MilterReplace(milter.client.Session):
    def __init__(self, context, patterns):
        super().__init__(context)
        self._patterns = patterns

    def header(self, name, value):
        self._headers.append([name, value])

    def body(self, chunk):
        self._body += chunk

    def end_of_message(self):
        header_indexes = {}
        for name, value in self._headers:
            if name not in header_indexes:
                header_indexes[name] = 0
            header_indexes[name] += 1
            for pattern, replaced in self._patterns.items():
                replaced_value, _ = pattern.subn(replaced, value)
                if value != replaced_value:
                    self._change_header(name,
                                        header_indexes[name],
                                        replaced_value)
                    break

        for pattern, replaced in self._patterns.items():
            body = self._body.decode("utf-8")
            replaced_body, _ = pattern.subn(replaced, body)
            if body != replaced_body:
                self._replace_body(replaced_body)

    def reset(self):
        self._headers = []
        self._body = b""

command_line = milter.client.CommandLine()
with command_line.run() as (client, options):
    # "world" を "ClearCode" に置換する!
    patterns = {
        re.compile("world", re.IGNORECASE): "ClearCode",
    }
    client.register(MilterReplace, patterns)

この実装では_change_header()メソッドと_replace_body()メソッドを使うことで、ヘッダーと本文を更新しています。 それぞれ使い方を説明します。

ヘッダーを更新する

_change_header()メソッドを使うことで、ヘッダーを更新することができます。 end_of_message()時点でのみ使用可能です。

この関数は次の3つの引数を取ります。

  • 第1引数: 更新対象のヘッダー名です。
  • 第2引数: そのヘッダー名のインデックスです。
  • 第3引数: ここに指定した値で、更新対象のヘッダーの値を更新します。

第2引数が分かりにくいですが、重複したヘッダー名が存在しない場合は1になります。 重複したヘッダー名が存在する場合は、ヘッダー名だけではどれを更新するのか区別できないため、 そのヘッダー名の中で何番目か、というインデックス情報が必要となります。 基本的に、重複したヘッダー名が存在するかもしれないことを考慮して実装するべきです。

単純な例として、Subjectというヘッダーの値をnew subjectに更新したい場合は、end_of_message()を次のように実装します。

def end_of_message(self):
    self._change_header("Subject", 1, "new subject")

しかしこれではSubjectというヘッダーが複数あった場合に、1つめのヘッダーしか更新できません。 またメールが持っているヘッダーを知るには、header()時点でメールのヘッダーを把握しておく必要があります。

先ほどの単語を置換する実装例では、次のようにヘッダーを更新しています。 わかりやすさのため、順番を変えてコメントを付けています。

# 1つのSMTPセッションの中で複数メールが送信される場合に、その切れ目で呼ばれる。
# 初期化(super().__init__())時にも呼ばれる。
# 1メール毎にセッションを分けるか、1セッションの中で複数メールを送るかは
# クライアント次第。
def reset(self):
    # ヘッダーを管理するためのメンバー変数を初期化する。
    self._headers = []

# ヘッダー1行毎に複数回呼ばれる。
def header(self, name, value):
    # ヘッダーの情報を追加する。
    self._headers.append([name, value])

# 本文を全て処理した時点で呼ばれる。
def end_of_message(self):
    header_indexes = {}

    # 各ヘッダーに順番に置換処理を行う。
    for name, value in self._headers:
        # ヘッダーのインデックス(何個目の同じ名前のヘッダーか)をカウントする。
        if name not in header_indexes:
            header_indexes[name] = 0
        header_indexes[name] += 1

        for pattern, replaced in self._patterns.items():
            replaced_value, _ = pattern.subn(replaced, value)
            # 置換した結果が元の値と異なるのであれば、ヘッダーを更新する。
            if value != replaced_value:
                self._change_header(name,
                                    header_indexes[name],
                                    replaced_value)
                break

ポイントは次です。

  • header()時点で、メンバー変数にヘッダーの情報を追加する。
  • 同じ名前のヘッダーが複数登場しても大丈夫なように、そのヘッダー名が何個目かをカウントする。

本文を更新する

_replace_body()メソッドを使うことで、本文を更新することができます。 end_of_message()時点でのみ使用可能です。

この関数は次の1つの引数を取ります。

  • 第1引数: ここに指定した値で、本文を更新します。

紛らわしいかもしれませんが、最初にこのメソッドを使った場合は、元の本文を完全に置き換えます。 しかし、その後繰り返しこのメソッドを使った場合は、以前置き換えた内容に追記をしていく動作になります。 大きなデータを扱う場合などに、複数回に分けてこのメソッドを使用することができます。

先ほどの単語を置換する実装例では、次のように本文を更新しています。 わかりやすさのため、順番を変えてコメントを付けています。

# 1つのSMTPセッションの中で複数メールが送信される場合に、その切れ目で呼ばれる。
# 初期化(super().__init__())時にも呼ばれる。
# 1メール毎にセッションを分けるか、1セッションの中で複数メールを送るかは
# クライアント次第。
def reset(self):
    # 本文を管理するためのメンバー変数を初期化する。
    self._body = b""

# 本文。ある程度大きい場合は、チャンクに分かれて複数回呼ばれる。
def body(self, chunk):
    # 本文を追加する。
    self._body += chunk

# 本文を全て処理した時点で呼ばれる。
def end_of_message(self):
    for pattern, replaced in self._patterns.items():
        body = self._body.decode("utf-8")
        replaced_body, _ = pattern.subn(replaced, body)
        # 置換した結果が元の値と異なるのであれば、本文を更新する。
        if body != replaced_body:
            self._replace_body(replaced_body)

body()時点でメンバー変数に本文の情報を追加しておき、それを元にend_of_message()で更新処理を行っています。

メールの通過や拒否

前章ではメールの内容を編集する方法を紹介しました。 milterはその他にも、メールの通過、拒否、再送要求、隔離、などを行うことができます。 本章ではそのような機能と使い方を紹介します。

後続の処理をスキップしてメールを通過させる

_accept()メソッドを使うことで、後続のフィルタリング処理をスキップしてメールを通過させることができます。

この関数の引数はありません。

先ほどの本文の更新処理に、「特定のヘッダーがある場合は更新処理をスキップしてメールを即座に通過させる」という動作を加えてみましょう。

# 1つのSMTPセッションの中で複数メールが送信される場合に、その切れ目で呼ばれる。
# 初期化(super().__init__())時にも呼ばれる。
# 1メール毎にセッションを分けるか、1セッションの中で複数メールを送るかは
# クライアント次第。
def reset(self):
    self._body = b""

# ヘッダー1行毎に複数回呼ばれる。
def header(self, name, value):
    # Fromに"@clear-code.com"で終わるメールアドレスがあった場合に
    # 後続の処理をスキップしてメールを通過させる。
    if name.lower() == "From".lower():
        if value.endswith("@clear-code.com"):
            self._accept()

# 本文。ある程度大きい場合は、チャンクに分かれて複数回呼ばれる。
def body(self, chunk):
    self._body += chunk

# 本文を全て処理した時点で呼ばれる。
def end_of_message(self):
    for pattern, replaced in self._patterns.items():
        body = self._body.decode("utf-8")
        replaced_body, _ = pattern.subn(replaced, body)
        if body != replaced_body:
            self._replace_body(replaced_body)

header()の時点で特定の条件を満たしたら_accept()を行うようにしました。 これによりその条件を満たすメールは後続の処理を行わずに配信されます。

メールを拒否する

_reject()メソッドを使うことで、そのメールを拒否することができます。

この関数の引数はありません。

先ほどの_accept()_reject()に変えてみましょう。

# ヘッダー1行毎に複数回呼ばれる。
def header(self, name, value):
    # Fromに"@clear-code.com"で終わるメールアドレスがあった場合に
    # このメールを拒否する。
    if name.lower() == "From".lower():
        if value.endswith("@clear-code.com"):
            self._reject()

header()の時点で特定の条件を満たしたら_reject()を行うようにしました。 これによりその条件を満たすメールは後続の処理を行わずに拒否され、配信されません。

メールを通過させずに一時的な失敗にする

_temporary_failure()メソッドを使うことで、そのメールを通過させずに一時的な失敗とすることができます。 一時的な失敗にすると、送信元は時間をおいて何度かリトライします。

この関数の引数はありません。

先ほどの_accept()_reject()と同様に使えます。 例えば、何かの処理が一時的に失敗したので後でもう一度送信してほしい、という場合に使います。

メールを隔離する

_quarantine()メソッドを使うことで、そのメールを隔離することができます。 先ほどまでの通過や拒否とは少し異なり、end_of_message()時点でのみ使用可能な機能です。

隔離されたメールは、Postfixでは/var/spool/postfix/hold/配下に保管されます。 また、$ postqueue -pでキューに残っていることを確認できます。 隔離されたメールの内容を確認するには、postcatコマンドを使用します。

この関数は次の1つの引数を取ります。

  • 第1引数: 隔離理由のメッセージ

例えば特定のヘッダーがあった場合にメールを隔離するには、次のように実装します。

# 1つのSMTPセッションの中で複数メールが送信される場合に、その切れ目で呼ばれる。
# 初期化(super().__init__())時にも呼ばれる。
# 1メール毎にセッションを分けるか、1セッションの中で複数メールを送るかは
# クライアント次第。
def reset(self):
    self._headers = []

# ヘッダー1行毎に複数回呼ばれる。
def header(self, name, value):
    self._headers.append([name, value])

# 本文を全て処理した時点で呼ばれる。
def end_of_message(self):
    for name, value in self._headers:
        # Fromに"@clear-code.com"で終わるメールアドレスがあった場合に
        # このメールを隔離する。
        if name.lower() != "From".lower():
            continue
        if not value.endswith("@clear-code.com"):
            continue
        self._quarantine("This mail is quarantined " +
                         "since it has the header " +
                         f"{name}:{value}")
        break

条件を満たして隔離されたメールは配信されません。 _reject()と異なるのは、これは動作としては_accept()であり、メールを受信した上で配信せずに隔離する、という点です。

Postfixを使っている場合は、次のようにpostqueueコマンドを使って隔離されたメールを確認できます。

$ postqueue -p
-Queue ID-  --Size-- ----Arrival Time---- -Sender/Recipient-------
38BFCA4F9!      371 Tue Feb  7 13:55:38  root@localhost
                                         root@localhost

メールの中身などを確認するには、次のようにpostcatコマンドを使います。

$ postcat -q 38BFCA4F9
*** ENVELOPE RECORDS hold/38BFCA4F9 ***
...

フォールバックステータスをセットする

オプションでfallback-statusを設定することで、エラー発生時の挙動を設定することができます(デフォルトはacceptです)。 次の値を指定できます。

  • accept
  • temporary-failure
  • reject
  • discard

Postfixの場合は、Postfix側のmilter_default_actionも同じ設定にした方が好ましいです。

例えば、次のようにmilterを起動することでエラー発生時にそのメールを拒否することができます。

$ python3 milter_replace.py --fallback-status reject

設定ファイルを使う

コマンドライン引数で様々なオプションを指定できますが、それらを設定ファイルから読み込ませることもできます。

milter_setting.ini

[basic]
environment = production

[milter]
connection_spec = inet:20025@0.0.0.0
fallback_status = temporary-failure

[log]
log_level = +info
log_path = milter_template.log

[milter_setting]
option_name = value

milter_setting.py

#!/usr/bin/env python3

import milter.client

class MilterSetting(milter.client.Session):
    def __init__(self, context, config):
        super().__init__(context)
        milter_setting_section = config["milter_setting"]
        self._option = milter_setting_section["option_name"]


command_line = milter.client.CommandLine()
with command_line.run() as (client, options):
    client.register(MilterSetting, options.config)

以上のように、milter managerが提供しているオプションをファイルで設定できますし、独自の設定を持たせることもできます。 現時点(milter manager v2.2.5)で設定可能なオプションについては、次の実装などをご覧ください。

次のように、--configurationオプションに設定ファイルのパスを指定して実行します。

$ python3 milter_setting.py --configuration milter_setting.ini

ロギング

次のように、milter.coreをimportして、milter.core.Logger.defaultからロガーを取得することで、任意のログをロギングすることができます。

#!/usr/bin/env python3

import milter.core
import milter.client

class MilterLogging(milter.client.Session):
    def __init__(self, context, config):
        super().__init__(context)
        self._logger = milter.core.Logger.default

    def body(self, chunk):
        self._logger.info(f"body: {chunk}")


command_line = milter.client.CommandLine()
with command_line.run() as (client, options):
    client.register(MilterLogging, options.config)

外部コマンドを実行する

milterの動作と外部コマンドを連携したい場合は、少々工夫が必要です。

milter manager v2.2.5に、外部コマンドを実行する例が梱包されています。 これが基本的な方法になります。

この例を参考にして、外部コマンドを実行した後にmilter側で後処理を行うことも考慮した実装例が次になります。 この実装を事例に、仕組みを説明します。

#!/usr/bin/env python3
#
# Copyright (C) 2022  Sutou Kouhei <kou@clear-code.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import subprocess
import sys

import milter.client

class MilterExternal(milter.client.Session):
    def __init__(self, context, timeout):
        super().__init__(context)
        self._timeout = timeout

    def end_of_message(self):
        command_line = [
            sys.executable,
            "-c",
            f"import time; time.sleep({self._timeout})",
        ]
        # 外部コマンドを別プロセスで実行する。
        process = subprocess.Popen(command_line)
        # 外部コマンドが完了した時点で呼ばれるコールバック関数を定義する。
        def on_exit(pid, wait_status):
            # コールバック関数を登録したイベントを削除する。
            self._remove_source(self._source_id)
            try:
                # 外部コマンド完了後の後処理部分。
                # ここでエラーが発生した場合も、 _context.emit() を呼ぶ必要がある。
                self._process_after_external_command()
            except Exception as exception:
                # _context.fallback_status を使うことで、
                # 設定したフォールバックステータスが効くようにする。
                self._context.emit("end_of_message_response",
                                   self._context.fallback_status)
        # コールバック関数を登録する。
        self._source_id = self._watch_child(process.pid, on_exit)
        # 外部コマンドの完了を待つ。
        # _context.emit() を実行して end_of_message が完了したことを
        # 知らせるまで待機する。
        self._delay_response()

    def _process_after_external_command(self):
        # 外部コマンド完了後の後処理を行う部分。
        print("Do process after external command")
        # 後処理が正常に完了したら、accept状態にして _context.emit() を呼ぶ。
        self._accept()
        self._context.emit("end_of_message_response",
                           self._context.status)

    def abort(self, status):
        if self._source_id is not None:
            self._remove_source(self._source_id)

    def reset(self):
        self._source_id = None

command_line = milter.client.CommandLine()
with command_line.run() as (client, options):
    client.register(MilterExternal, 3)

外部コマンドを別プロセスで実行する場合、その完了を待つために_delay_response()を使います。 これを行わなければ、外部コマンドを完了を待たずにend_of_message()が完了して、milter側が先に処理を終えてしまう可能性があります。

_delay_response()をした場合は、_context.emit()により該当するresponseが返されるまで待機状態になります。 この例では、外部コマンド完了時点で_context.emit()によってend_of_message_responseを返すことで、end_of_message()を完了させています。 また、そのために_watch_child()を使って、外部コマンドプロセス完了時点で呼ばれるコールバック関数を登録しています。

_context.emit()は、第2引数にステータスを取ります。 _context.statusに現在のステータスが保持されているので、この例ではそれを利用しています。 後処理が完了した時点で_accept()を実行することで、_context.statusがaccept状態になっています。

注意が必要なポイントとして、後処理部分でエラーが発生する場合を考慮する必要があります。 _context.emit()によってend_of_message_responseを返すまで処理が完了しないので、エラー発生時にend_of_message()を抜けてしまわないようにtry-exceptを使い、エラー時も_context.emit()を行うようにしています。 さらに、設定されているフォールバックステータスで完了するようにするため、エラー時は_context.fallback_statusを使っています。

その他

現時点(milter manager v2.2.5)で使える機能については、次の実装などをご覧ください。

まとめ

これまでの記事で、milter managerというメールフィルタを管理するための自由ソフトウェアを、GObject Introspectionに対応させてバインディングを生成することや、それによってPythonで簡単に作れるようになったmilterを動かしてみることについて紹介をしてきました。

本記事ではこれらに続いて、milter managerの機能を使ってPythonでmilterを作る方法について詳しく紹介しました。 milter managerを使えば、PostfixやSendmailなどのMTAにおけるメールのフィルタリングをこんなに便利にできるんだ、と実感していただけたら幸いです。

クリアコードではmilter managerを始め、様々な自由ソフトウェアの開発・サポートを行っております。 詳しくは次をご覧いただき、こちらのお問い合わせフォームよりお気軽にお問い合わせください。

また、クリアコードではこのように業務の成果を公開することを重視しています。 業務の成果を公開する職場で働きたい人はクリアコードの採用情報をぜひご覧ください。