Django多处理和数据库连接
背景:
我正在使用一个Postgres数据库使用Django的项目。 我们也使用mod_wsgi以防万一,因为我的一些网页search提到了它。 在Web表单提交中,Django视图开始了一个需要大量时间(比用户想要等待的时间更长)的工作,所以我们通过系统调用在后台启动工作。 现在正在运行的作业需要能够读取和写入数据库。 因为这个工作需要很长时间,所以我们使用多处理来并行运行其中的一部分。
问题:
顶层脚本有一个数据库连接,当它产生subprocess时,似乎父进程的连接对subprocess是可用的。 然后,在查询之前,必须调用SET TRANSACTION ISOLATION LEVEL的例外。 研究表明,这是由于尝试在多个进程中使用相同的数据库连接。 我发现一个线程build议在subprocess开始时调用connection.close(),以便Django在需要时自动创build一个新的连接,因此每个subprocess都将具有唯一的连接 – 即不共享。 这对我不起作用,因为在subprocess中调用connection.close()导致父进程抱怨连接丢失。
其他调查结果:
我读过的一些东西似乎表明你不能真正做到这一点,多处理,mod_wsgi和Django不能很好地协作。 我猜这似乎很难相信。
有人build议使用芹菜,这可能是一个长期的解决scheme,但目前我无法安装芹菜,等待一些审批过程,所以现在不是一个选项。
在SO和其他地方发现了几个有关持久数据库连接的参考文献,我认为这是一个不同的问题。
还发现引用psycopg2.pool和pgpool和一些关于保镖。 不可否认的是,我并不了解我读到的大部分内容,但当然并没有像我所期望的那样跳出来。
目前的“解决方法”:
现在,我已经回到了连续运行的事情,它的工作,但比我想要的慢。
任何有关如何使用多处理并行运行的build议? 似乎如果我可以让父母和两个孩子都有独立的数据库连接,事情会好起来的,但我似乎无法得到这种行为。
谢谢,抱歉的长度!
多进程复制进程之间的连接对象,因为它分叉进程,因此复制父进程的所有文件描述符。 也就是说,到SQL服务器的连接只是一个文件,你可以在linux下的/ proc / fd / ….中看到它,任何打开的文件将在分叉进程之间共享。 你可以在这里find更多关于分叉的信息 。
我的解决scheme就是在启动进程之前简单地closures数据库连接,每个进程在需要时自动重新创build连接(在django 1.4中testing):
from django import db db.connections.close_all() def db_worker(): some_paralell_code() Process(target = db_worker,args = ())
Pgbouncer / pgpool在多处理的意义上不与线程连接。 这是相当的解决scheme不closures每个请求连接=加速连接到postgres,而在高负载。
更新:
要完全删除数据库连接的问题,只需将所有与数据库连接的逻辑移动到db_worker – 我想传递QueryDict作为参数…更好的主意是简单地传递ID列表…请参阅QueryDict和values_list('id',flat =真的),不要忘记把它变成列表! 列表(QueryDict)传递给db_worker之前。 感谢那我们不复制模型数据库连接。
def db_worker(models_ids): obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids) obj.run() model_ids = Model.objects.all().values_list('id', flat=True) model_ids = list(model_ids) # cast to list process_count = 5 delta = (len(model_ids) / process_count) + 1 # do all the db stuff here ... # here you can close db connection from django import db db.connections.close_all() for it in range(0:process_count): Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))
使用多个数据库时,应closures所有连接。
from django import db for connection_name in db.connections.databases: db.connections[connection_name].close()
编辑
请使用相同的@lechup提到closures所有连接(不知道自从哪个django版本添加此方法):
from django import db db.connections.close_all()
对于Python 3和Django 1.9来说,这对我来说是很有用的:
import multiprocessing import django django.setup() # Must call setup def db_worker(): for name, info in django.db.connections.databases.items(): # Close the DB connections django.db.connection.close() # Execute parallel code here if __name__ == '__main__': multiprocessing.Process(target=db_worker)
请注意,没有django.setup()我不能得到这个工作。 我猜测一些需要重新初始化的多处理。
嘿,我碰到这个问题,并能够解决它通过执行以下(我们正在实施一个有限的任务系统)
task.py
from django.db import connection def as_task(fn): """ this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ connection.close() # this is where i kill the database connection VERY IMPORTANT # This will force django to open a new unique connection, since on linux at least # Connections do not fare well when forked #...etc
ScheduledJob.py
from django.db import connection def run_task(request, job_id): """ Just a simple view that when hit with a specific job id kicks of said job """ # your logic goes here # ... processor = multiprocessing.Queue() multiprocessing.Process( target=call_command, # all of our tasks are setup as management commands in django args=[ job_info.management_command, ], kwargs= { 'web_processor': processor, }.items() + vars(options).items()).start() result = processor.get(timeout=10) # wait to get a response on a successful init # Result is a tuple of [TRUE|FALSE,<ErrorMessage>] if not result[0]: raise Exception(result[1]) else: # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close() connection.close() # we do some database accessing here to get the most recently updated job id in the database
老实说,为了防止竞争条件(同时有多个用户),最好在分叉进程之后尽快调用database.close()。 在你有机会刷新数据库之前,可能还有另一个用户在某处完全向db请求。
诚实地说,让fork不直接调用命令会更安全,更智能 ,而是调用操作系统上的脚本,以使生成的任务在其自己的django shell中运行!
(不是一个很好的解决scheme,但可能的解决方法)
如果你不能使用芹菜,也许你可以实现你自己的排队系统,基本上把任务添加到一些任务表,并有一个正常的cron,把它们挑选和处理? (通过pipe理命令)
您可以将更多资源提供给Postgre,您可以在Debian / Ubuntu中编辑:
nano /etc/postgresql/9.4/main/postgresql.conf
用你的postgre版本replace9.4。
这里有一些有用的行应该用示例值进行更新,名称为自己说话:
max_connections=100 shared_buffers = 3000MB temp_buffers = 800MB effective_io_concurrency = 300 max_worker_processes = 80
注意不要过多地提高这些参数,因为这可能会导致Postgre尝试获取比可用资源更多的错误。 上面的例子在装有4个内核的Debian 8GB Ram机器上运行良好。
如果所有你需要的是I / O并行和不处理并行,你可以通过把进程切换到线程来避免这个问题。 更换
from multiprocessing import Process
同
from threading import Thread
Thread
对象具有与Procsess
相同的接口