Source code for almir.meta
#import os.path
#import pickle
import logging
#import hashlib
#import sqlite3
from datetime import datetime
from pyramid.httpexceptions import exception_response
from sqlalchemy import engine_from_config
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from zope.sqlalchemy import ZopeTransactionExtension
from webhelpers.date import distance_of_time_in_words
from webhelpers.number import format_byte_size
from almir.lib.sqlalchemy_declarative_reflection import DeclarativeReflectedBase
from almir.lib.sqlalchemy_lowercase_inspector import LowerCaseInspector
from almir.lib.utils import timedelta_to_seconds, convert_timezone
log = logging.getLogger(__name__)
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base(cls=DeclarativeReflectedBase)
[docs]class ModelMixin(object):
query = DBSession.query_property()
@declared_attr
def __tablename__(cls):
if cls.metadata.bind.dialect.name == 'postgresql': # PRAGMA: no cover
return cls.__name__.lower()
else:
return cls.__name__
@classmethod
[docs] def get_list(cls, **kw):
return cls.query
@classmethod
[docs] def get_one(cls, id_=None, query=None):
if id_ is not None:
query = cls.query.get(int(id_))
if query == None:
raise exception_response(404)
else:
return query
@staticmethod
@staticmethod
[docs] def render_distance_of_time_in_words(dt_from, dt_to=None):
if not dt_from:
return
if dt_from.tzinfo is None:
dt_from = convert_timezone(dt_from)
if dt_to is None:
return {'text': distance_of_time_in_words(dt_from, convert_timezone(datetime.now())) + ' ago', 'data_numeric': dt_from.strftime('%s')}
else:
if dt_to.tzinfo is None:
dt_to = convert_timezone(dt_to)
return {'text': distance_of_time_in_words(dt_from, dt_to),
'data_numeric': -timedelta_to_seconds(dt_to - dt_from)}
# TODO: cache for 5min
[docs]def get_database_size(engine):
"""Returns human formatted SQL database size.
Example: 3.04 GB
"""
if engine.name == 'sqlite':
size_bytes = engine.execute('PRAGMA page_count;').scalar() * engine.execute('PRAGMA page_size;').scalar()
elif engine.name == 'mysql':
size_bytes = engine.execute('SELECT sum(ROUND((DATA_LENGTH + INDEX_LENGTH - DATA_FREE),2)) AS Size '
'FROM INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA like "%s";' % engine.url.database).scalar()
elif engine.name == 'postgresql':
size_bytes = engine.execute('SELECT pg_database_size(\'%s\');' % engine.url.database).scalar()
else:
raise NotImplemented
return format_byte_size(float(size_bytes))
[docs]def initialize_sql(settings):
# make sure metadata is populated
engine = engine_from_config(settings, prefix='sqlalchemy.', encoding='utf-8')
# monkey patch inspector to reflect lowercase tables/columns since sqlite/mysql have mixed case
# while postgres has lowercase tables/columns
engine.dialect.inspector = LowerCaseInspector
DBSession.configure(bind=engine)
DBSession.bind = engine
Base.metadata.bind = engine
import almir.models
# cache (pickle) metadata
Base.prepare(engine)
# hash engine paramters so we don't cache wrong metadata
#engine_hash = hashlib.md5(str(engine.url)).hexdigest()
# TODO: figure out a way for sqlalchemy_declarative_reflection to work with metadata caching
# TODO: configure with .ini
#cachefile = os.path.join(os.path.dirname(__name__), 'db.metada.cache.%s' % engine_hash)
#if os.path.isfile(cachefile):
# log.info('Loading database schema from cache file: %s', cachefile)
#with open(cachefile, 'r') as cache:
#Base.metadata = pickle.load(cache)
#else:
#log.info('Generating database schema cache file: %s', cachefile)
# with open(cachefile, 'w') as cache:
# pickle.dump(Base.metadata, cache)