Python的sqlite3和并发

我有一个使用“线程”模块的Python程序。 每秒一次,我的程序启动一个新的线程,从Web获取一些数据,并将这些数据存储到我的硬盘。 我想使用sqlite3来存储这些结果,但我不能得到它的工作。 这个问题似乎是关于以下的一行:

conn = sqlite3.connect("mydatabase.db") 
  • 如果我把这行代码放在每个线程中,我得到一个OperationalError告诉我数据库文件被locking。 我想这意味着另一个线程通过sqlite3连接打开mydatabase.db并locking它。
  • 如果我将这行代码放在主程序中,并将连接对象(conn)传递给每个线程,我会得到一个ProgrammingError,表示在一个线程中创build的SQLite对象只能在同一个线程中使用。

以前我将所有的结果存储在CSV文件中,并没有任何这些文件locking问题。 希望这将是可能的与SQLite。 有任何想法吗?

你可以使用消费者 – 生产者模式。 例如,您可以创build线程之间共享的队列。 第一个从Web获取数据的线程将这些数据排入共享队列。 另一个拥有数据库连接的线程从队列中取出数据并将其传递给数据库。

与stream行的观点相反,较新版本的sqlite3 确实支持来自多个线程的访问。

这可以通过可选的关键字参数check_same_thread启用:

 sqlite.connect(":memory:", check_same_thread=False) 

以下findmail.python.org.pipermail.1239789

我find了解决办法。 我不知道为什么python文档没有一个关于这个选项的单词。 所以我们必须为连接函数添加一个新的关键字参数,我们将能够在不同的线程中创build光标。 所以使用:

 sqlite.connect(":memory:", check_same_thread = False) 

为我完美的工作。 当然,从现在开始,我需要注意安全的multithreading访问数据库。 无论如何thx所有的尝试帮助。

你根本不应该使用线程。 这是一个微不足道的任务,这可能会让你显着进一步。

只使用一个线程,并且完成请求触发一个事件来执行写操作。

扭曲将照顾的调度,callback等…为你。 它会把整个结果作为一个string提交给你,或者你可以通过一个stream处理器(我有一个twitter API和一个friendfeed API ,当结果仍然被下载时把事件发送给调用者)来运行它。

根据你对数据的处理方式,你可以把完整的结果转储到sqlite中,完成后将其烧制并转储,或者在读取数据时进行烹饪,并在最后转储。

我有一个非常简单的应用程序,它接近你在github上想要的东西。 我称之为pfetch (并行获取)。 它按计划抓取各种页面,将结果stream式传输到一个文件,并在成功完成每个文件时select运行脚本。 它也有一些像条件GET一样的花哨的东西,但仍然可以为你做的任何一个很好的基础。

切换到多处理 。 它更好,扩展性好,可以使用多CPU来超越多核心的使用,并且接口与使用python线程模块相同。

或者,正如阿里所build议的,只需使用SQLAlchemy的线程池机制即可 。 它会自动处理所有的东西,并有许多额外的function,只是引用其中的一些:

  1. SQLAlchemy包括用于SQLite,Postgres,MySQL,Oracle,MS-SQL,Firebird,MaxDB,MS Access,Sybase和Informix的方言; IBM还发布了一个DB2驱动程序。 所以如果你决定离开SQLite,你不必重写你的应用程序。
  2. 作为SQLAlchemy的对象关系映射器(ORM)核心部分的Unit Of Work系统将未决的创build/插入/更新/删除操作组织到队列中,并将它们全部批量刷新。 为了实现这一点,它执行队列中所有修改项目的拓扑“依赖性sorting”,以兑现外键约束,并将冗余语句分组在一起,有时甚至可以进一步对其进行批处理。 这样可以提高最高效率和交易安全性,并将死锁的可能性降到最低。

或者如果你是懒惰的,像我一样,你可以使用SQLAlchemy 。 它将为你处理线程( 使用线程本地和一些连接池 ),它的方式甚至是可configuration的 。

为了获得额外的奖励,如果/当你意识到/决定为任何并发应用程序使用Sqlite将是一场灾难,你将不必改变你的代码使用MySQL,Postgres或其他任何东西。 你可以切换。

我喜欢Evgeny的回答 – 队列通常是实现线程间通信的最佳方式。 为了完整性,这里有一些其他的select:

  • 生成的线程完成使用后closures数据库连接。 这将修复您的OperationalError ,但是像这样打开和closures连接通常是一个No-No,由于性能开销。
  • 不要使用子线程。 如果每秒一次的任务是相当轻量级的,那么你可以逃避抓取和存储,然后睡觉,直到正确的时刻。 这是不可取的,因为读取和存储操作可能需要1秒以上,并且您失去了使用multithreading方法的多路复用资源的好处。

您需要为您的程序devise并发性。 SQLite有明确的限制,你需要遵守它们,请参阅常见问题 (也是下面的问题)。

Scrapy似乎是我的问题的一个潜在的答案。 它的主页描述了我的确切任务。 (虽然我不确定代码的稳定性。)

我会看看y_serial Python模块的数据持久性:http://yserial.sourceforge.net

它处理围绕单个SQLite数据库的死锁问题。 如果并发需求变大,可以很容易地设置多个数据库的Farm类来随机时间扩散负载。

希望这有助于您的项目…它应该很简单,在10分钟内实施。

使用threading.Lock()

您在locking数据库时遇到错误的最可能原因是您必须发出

 conn.commit() 

完成数据库操作后。 如果你不这样做,你的数据库将被写入locking,并保持这种状态。 等待写入的其他线程将在一段时间后超时(默认设置为5秒,有关详细信息,请参阅http://docs.python.org/2/library/sqlite3.html#sqlite3.connect ) 。

一个正确和并发插入的例子是这样的:

 import threading, sqlite3 class InsertionThread(threading.Thread): def __init__(self, number): super(InsertionThread, self).__init__() self.number = number def run(self): conn = sqlite3.connect('yourdb.db', timeout=5) conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);') conn.commit() for i in range(1000): conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i)) conn.commit() # create as many of these as you wish # but be careful to set the timeout value appropriately: thread switching in # python takes some time for i in range(2): t = InsertionThread(i) t.start() 

如果你喜欢SQLite,或者有其他工具可以与SQLite数据库一起工作,或者想用SQLite数据库文件replaceCSV文件,或者必须做一些像平台间IPC这样的罕见工具,那么SQLite是一个很好的工具,非常适合这个目的。 如果感觉不对劲,不要让自己受到压力,使用不同的解决scheme!