Source code for mandb

#!/usr/bin/env python
#
# Copyright 2016 Kehr<kehr.china@gmail.com>
# Reference: torndb, DBUtils

"""`Mandb` is a lightweight wrapper around `MySQLdb` and `sqlite3`.

    This lib is inspired by `torndb` and `DBUtils`. It supports DBUtils to manage your exists
    connection. If you has any good ideas, please contact me <kehr.china@gmail.com>

"""

import sqlite3
import MySQLdb
import threading

version = '0.1.4'
version_info = (0, 1, 4, 0)


[docs]class MandbEception(Exception): """Base exception for mandb""" pass
[docs]class Row(dict): """A dict that allows for object-like property access syntax.""" def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name)
[docs]class Database(object): """This class provide a series of base database operations. It can manage your connection, if you already has one. Example:: import MySQLdb from mandb import Database from DBUtils.PooledDB import PooledDB pdb = PooledDB(MySQLdb, host='localhost', port=3306, db='test_db', user='root', passwd='passwd', mincached=5, charset='utf8') db = Database(pdb.connection()) db.query('SELECT ...') db.insert('INSERT INTO ...') db.update('UPDATE ...') db.delete('DELETE ...') ... Otherwise, please use `MySQLDatabase` or `SqliteDatabase` to create a new connection. """ def __init__(self, connection=None, **kwargs): """ Args: :connection: Specify an exists database connection. :kwargs: Connection parameters. """ self.connection = connection self.kwargs = kwargs self._closed = True self._conn_lock = threading.Lock() self.connect() def __del__(self): self.close()
[docs] def connect(self): """Get this database connection""" with self._conn_lock: self._closed = False if self.connection is None: self.connection = self._connect() # Ensure auto commit if hasattr(self.connection, 'autocommit'): self.connection.autocommit(True) # Autocommit setting for DBUtils elif hasattr(self.connection, '_con') and hasattr(self.connection._con, '_con'): self.connection._con._con.autocommit(True)
[docs] def close(self): """Closes this database connection""" with self._conn_lock: if self.connection is not None: self.connection.close() self.connection = None self._closed = True
[docs] def is_closed(self): """Return if connnection is closed""" return self._closed
[docs] def iter(self, sql, *args, **kwargs): """Returns an iterator for the given query and parameters.""" cursor = self._cursor() try: self._execute(cursor, sql, args, kwargs) names = [d[0] for d in cursor.description] for row in cursor: yield Row(zip(names, row)) finally: cursor.close()
[docs] def query(self, sql, *args, **kwargs): """Returns a row list for the given query and parameters.""" cursor = self._cursor() try: self._execute(cursor, sql, args, kwargs) names = [d[0] for d in cursor.description] return [Row(zip(names, row)) for row in cursor] finally: cursor.close()
[docs] def get(self, sql, *args, **kwargs): """Returns the (singular) row returned by the given query. If the query has no results, returns None. If it has more than one result, raises an exception. """ rows = self.query(sql, *args, **kwargs) if not rows: return None elif len(rows) > 1: raise MandbEception('Multiple rows returned for Database.get() query') else: return rows[0]
[docs] def execute(self, sql, *args, **kwargs): """Executes the given sql, returning the lastrowid.""" return self.execute_lastrowid(sql, *args, **kwargs)
[docs] def rollback(self): """Rolls backs the current transaction""" self.connection.rollback()
[docs] def execute_lastrowid(self, sql, *args, **kwargs): """Executes the given sql, returning the lastrowid.""" cursor = self._cursor() try: self._execute(cursor, sql, args, kwargs) return cursor.lastrowid finally: cursor.close()
[docs] def execute_rowcount(self, sql, *args, **kwargs): """Executes the given query, returning the rowcount.""" cursor = self._cursor() try: self._execute(cursor, sql, args, kwargs) return cursor.rowcount finally: cursor.close()
[docs] def executemany(self, sql, args): """Executes the given query against all the given param sequences.""" return self.executemany_lastrowid(sql, args)
[docs] def executemany_lastrowid(self, sql, args): """Executes the given query against all the given param sequences.""" cursor = self._cursor() try: cursor.executemany(sql, args) return cursor.lastrowid finally: cursor.close()
[docs] def executemany_rowcount(self, sql, args): """Executes the given query against all the given param sequences.""" cursor = self._cursor() try: cursor.executemany(sql, args) return cursor.rowcount finally: cursor.close()
update = delete = execute_rowcount updatemany = executemany_rowcount insert = execute_lastrowid insertmany = executemany_lastrowid def _connect(self): """Connect to Database(eg. mysql, sqlite). Need rewrite""" raise NotImplementedError def _cursor(self): """Get the cursor of connection. Default use DB-API standard. """ return self.connection.cursor() def _execute(self, cursor, sql, args, kwargs): """execute sql by cursor. Default use DB-API standard. """ return cursor.execute(sql, kwargs or args or None)
[docs]class SqliteDatabase(Database): """Subclass of `Database`, wrapper for Sqlite3 usage:: from mandb import SqliteDatabase db = SqliteDatabase(db='test.db') db.query('SELECT ...') db.insert('INSERT INTO ...') db.update('UPDATE ...') db.delete('DELETE ...') ... """ def __init__(self, db, *args, **kwargs): """ Args: :db: The sqlite database file. """ self.database = db super(SqliteDatabase, self).__init__(*args, **kwargs) def _connect(self): conn = sqlite3.connect(self.database, **self.kwargs) # This setting means `autocommit` conn.isolation_level = None return conn
[docs]class MySQLDatabase(Database): """Subclass of `Database`, wrapper for MySQL usage:: from mandb import MySQLDatabase db = MySQLDatabase(host='localhost', port=3306, db='test', user='root', passwd='123456', charset='utf8') db.query('SELECT ...') db.insert('INSERT INTO ...') db.update('UPDATE ...') db.delete('DELETE ...') ... """ def __init__(self, *args, **kwargs): super(MySQLDatabase, self).__init__(*args, **kwargs) def _connect(self): return MySQLdb.connect(**self.kwargs)