Python 3を使用してマルチプロセスを実行する際に、どのようにログを記録すべきですか?

PYTHON3 チュートリアル

markdown

Python 3でのマルチプロセスとログ記録の方法

Pythonでマルチプロセスを扱う際、複数のプロセスからのログを適切に記録することは、デバッグやパフォーマンスの分析において非常に重要です。Pythonの標準ライブラリである`multiprocessing`を使用することで、簡単にマルチプロセスを実装できます。しかし、複数のプロセスが同時にログを書き込むと、ログが混在してしまうことがあります。この記事では、Python 3を使用してマルチプロセスを実行しながら、ログを適切に記録する方法を解説します。

マルチプロセスの基本設定

まず、Pythonの`multiprocessing`モジュールを使って、基本的なマルチプロセスを設定する方法を見ていきましょう。以下のサンプルコードは、複数のプロセスを作成し、それぞれが独立して動作する例です。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import multiprocessing
import time
def worker(num):
"""Worker function that performs a simple task."""
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
if __name__ == '__main__':
processes = []
for i in range(5):
process = multiprocessing.Process(target=worker, args=(i,))
processes.append(process)
process.start()
for process in processes:
process.join()
import multiprocessing import time def worker(num): """Worker function that performs a simple task.""" print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') if __name__ == '__main__': processes = [] for i in range(5): process = multiprocessing.Process(target=worker, args=(i,)) processes.append(process) process.start() for process in processes: process.join()
import multiprocessing
import time

def worker(num):
    """Worker function that performs a simple task."""
    print(f'Worker {num} starting')
    time.sleep(2)
    print(f'Worker {num} finished')

if __name__ == '__main__':
    processes = []
    for i in range(5):
        process = multiprocessing.Process(target=worker, args=(i,))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

このコードは、5つのプロセスを作成し、それぞれが2秒間スリープした後に終了するシンプルな例です。しかし、このままでは各プロセスのログが標準出力に出力されるだけで、後から確認することが難しくなります。

ログ記録の設定

次に、Pythonの`logging`モジュールを使用して、各プロセスからのログをファイルに記録する方法を見ていきます。`multiprocessing`と`logging`を組み合わせることで、プロセス間でのログの混在を防ぎます。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import multiprocessing
import logging
import time
def worker(num):
logger = multiprocessing.get_logger()
logger.info(f'Worker {num} starting')
time.sleep(2)
logger.info(f'Worker {num} finished')
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(processName)s %(levelname)s: %(message)s',
handlers=[logging.FileHandler("processes.log"), logging.StreamHandler()]
)
multiprocessing.set_start_method('spawn')
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = []
for i in range(5):
process = multiprocessing.Process(target=worker, args=(i,))
processes.append(process)
process.start()
for process in processes:
process.join()
import multiprocessing import logging import time def worker(num): logger = multiprocessing.get_logger() logger.info(f'Worker {num} starting') time.sleep(2) logger.info(f'Worker {num} finished') if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(processName)s %(levelname)s: %(message)s', handlers=[logging.FileHandler("processes.log"), logging.StreamHandler()] ) multiprocessing.set_start_method('spawn') logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) processes = [] for i in range(5): process = multiprocessing.Process(target=worker, args=(i,)) processes.append(process) process.start() for process in processes: process.join()
import multiprocessing
import logging
import time

def worker(num):
    logger = multiprocessing.get_logger()
    logger.info(f'Worker {num} starting')
    time.sleep(2)
    logger.info(f'Worker {num} finished')

if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(processName)s %(levelname)s: %(message)s',
        handlers=[logging.FileHandler("processes.log"), logging.StreamHandler()]
    )
    multiprocessing.set_start_method('spawn')
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    processes = []
    for i in range(5):
        process = multiprocessing.Process(target=worker, args=(i,))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

このコードでは、`logging.basicConfig`を使用してログのフォーマットや出力先を設定しています。`multiprocessing.get_logger()`を使用することで、各プロセスが同じログ設定を共有し、ログをファイルに記録することができます。

キューを使ったログの管理

さらに複雑なロギングが必要な場合、`Queue`を使用してログメッセージを1つのプロセスに集約し、ログを記録する方法があります。このアプローチでは、`QueueHandler`と`QueueListener`を使用します。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import multiprocessing
import logging
import logging.handlers
import time
def worker(num, queue):
logger = logging.getLogger(f'Worker{num}')
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info(f'Worker {num} starting')
time.sleep(2)
logger.info(f'Worker {num} finished')
def listener_configurer():
root = logging.getLogger()
handler = logging.FileHandler('processes_queue.log')
formatter = logging.Formatter('%(asctime)s %(processName)s %(levelname)s: %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
def listener_process(queue):
listener_configurer()
while True:
try:
record = queue.get()
if record is None: # We send None to signal termination
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys, traceback
print('Error in logging:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
if __name__ == '__main__':
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue,))
listener.start()
processes = []
for i in range(5):
process = multiprocessing.Process(target=worker, args=(i, queue))
processes.append(process)
process.start()
for process in processes:
process.join()
queue.put_nowait(None)
listener.join()
import multiprocessing import logging import logging.handlers import time def worker(num, queue): logger = logging.getLogger(f'Worker{num}') handler = logging.handlers.QueueHandler(queue) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.info(f'Worker {num} starting') time.sleep(2) logger.info(f'Worker {num} finished') def listener_configurer(): root = logging.getLogger() handler = logging.FileHandler('processes_queue.log') formatter = logging.Formatter('%(asctime)s %(processName)s %(levelname)s: %(message)s') handler.setFormatter(formatter) root.addHandler(handler) def listener_process(queue): listener_configurer() while True: try: record = queue.get() if record is None: # We send None to signal termination break logger = logging.getLogger(record.name) logger.handle(record) except Exception: import sys, traceback print('Error in logging:', file=sys.stderr) traceback.print_exc(file=sys.stderr) if __name__ == '__main__': queue = multiprocessing.Queue(-1) listener = multiprocessing.Process(target=listener_process, args=(queue,)) listener.start() processes = [] for i in range(5): process = multiprocessing.Process(target=worker, args=(i, queue)) processes.append(process) process.start() for process in processes: process.join() queue.put_nowait(None) listener.join()
import multiprocessing
import logging
import logging.handlers
import time

def worker(num, queue):
    logger = logging.getLogger(f'Worker{num}')
    handler = logging.handlers.QueueHandler(queue)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    logger.info(f'Worker {num} starting')
    time.sleep(2)
    logger.info(f'Worker {num} finished')

def listener_configurer():
    root = logging.getLogger()
    handler = logging.FileHandler('processes_queue.log')
    formatter = logging.Formatter('%(asctime)s %(processName)s %(levelname)s: %(message)s')
    handler.setFormatter(formatter)
    root.addHandler(handler)

def listener_process(queue):
    listener_configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send None to signal termination
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except Exception:
            import sys, traceback
            print('Error in logging:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

if __name__ == '__main__':
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process, args=(queue,))
    listener.start()

    processes = []
    for i in range(5):
        process = multiprocessing.Process(target=worker, args=(i, queue))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    queue.put_nowait(None)
    listener.join()

このコードでは、`Queue`を使用してログメッセージを集約し、`listener_process`というプロセスでログをファイルに書き込みます。これにより、ログの順序が保証され、複数のプロセスからの出力が混在することを防ぎます。

以上のように、Python 3でマルチプロセスを使用しながらログを記録する方法にはいくつかのアプローチがあります。シンプルな場合は`logging`モジュールを直接使用し、より複雑な場合には`Queue`を活用することで、効率的なログ管理が可能です。

Python 3を使用してマルチプロセスを実行する際に、ログを記録する方法はいくつかあります。一般的な方法は、loggingモジュールを使用してログを記録することです。以下は、マルチプロセスでログを記録するための基本的な手順です。

1. loggingモジュールをインポートします。
“`python
import logging
“`

2. ロガーを設定します。マルチプロセスでログを記録する場合、マルチプロセス間でログが競合しないようにQueueHandlerを使用することが推奨されます。
“`python
logger = logging.getLogger()
handler = logging.handlers.QueueHandler()
logger.addHandler(handler)
“`

3. ログのフォーマットを設定します。ログのフォーマットには、日付、ログレベル、メッセージなどを含めることができます。
“`python
formatter = logging.Formatter(‘%(asctime)s – %(levelname)s – %(message)s’)
handler.setFormatter(formatter)
“`

4. ログを記録する箇所で、loggerを使用してログを記録します。
“`python
logger.info(‘This is an info message’)
logger.error(‘This is an error message’)
“`

これらの手順に従うことで、Python 3を使用してマルチプロセスを実行する際にログを記録することができます。

購読
通知


0 Comments
Inline Feedbacks
View all comments