Source code for pgextras
# -*- coding: utf-8 -*-
import re
from collections import namedtuple
import psycopg2
import psycopg2.extras
from . import sql_constants as sql
__author__ = 'Scott Woodall'
__email__ = 'scott.woodall@gmail.com'
__version__ = '0.2.0'
[docs]class PgExtras(object):
def __init__(self, dsn=None):
self.dsn = dsn
self._pg_stat_statement = None
self._cursor = None
self._conn = None
self._is_pg_at_least_nine_two = None
def __enter__(self):
"""
The context manager convention is preferred so that if there are ever
any exceptions the database cursor/connection will always be closed.
"""
return self
def __exit__(self, type, value, trace):
self.close_db_connection()
@property
[docs] def cursor(self):
if self._cursor is None:
self._conn = psycopg2.connect(
self.dsn,
cursor_factory=psycopg2.extras.NamedTupleCursor
)
self._cursor = self._conn.cursor()
return self._cursor
@property
[docs] def query_column(self):
"""
PG9.2 changed column names.
:returns: str
"""
if self.is_pg_at_least_nine_two():
return 'query'
else:
return 'current_query'
@property
[docs] def pid_column(self):
"""
PG9.2 changed column names.
:returns: str
"""
if self.is_pg_at_least_nine_two():
return 'pid'
else:
return 'procpid'
[docs] def pg_stat_statement(self):
"""
Some queries require the pg_stat_statement module to be installed.
http://www.postgresql.org/docs/current/static/pgstatstatements.html
:returns: boolean
"""
if self._pg_stat_statement is None:
results = self.execute(sql.PG_STAT_STATEMENT)
is_available = results[0].available
if is_available:
self._pg_stat_statement = True
else:
self._pg_stat_statement = False
return self._pg_stat_statement
[docs] def get_missing_pg_stat_statement_error(self):
Record = namedtuple('Record', 'error')
error = """
pg_stat_statements extension needs to be installed in the
public schema first. This extension is only available on
Postgres versions 9.2 or greater. You can install it by
adding pg_stat_statements to shared_preload_libraries in
postgresql.conf, restarting postgres and then running the
following sql statement in your database:
CREATE EXTENSION pg_stat_statements;
"""
return Record(error)
[docs] def is_pg_at_least_nine_two(self):
"""
Some queries have different syntax depending what version of postgres
we are querying against.
:returns: boolean
"""
if self._is_pg_at_least_nine_two is None:
results = self.version()
regex = re.compile("PostgreSQL (\d+\.\d+\.\d+) on")
matches = regex.match(results[0].version)
version = matches.groups()[0]
if version > '9.2.0':
self._is_pg_at_least_nine_two = True
else:
self._is_pg_at_least_nine_two = False
return self._is_pg_at_least_nine_two
[docs] def close_db_connection(self):
if self._cursor is not None:
self._cursor.close()
if self._conn is not None:
self._conn.close()
[docs] def execute(self, statement):
"""
Execute the given sql statement.
:param statement: sql statement to run
:returns: list
"""
# Make the sql statement easier to read in case some of the queries we
# run end up in the output
sql = statement.replace('\n', '')
sql = ' '.join(sql.split())
self.cursor.execute(sql)
return self.cursor.fetchall()
[docs] def cache_hit(self):
"""
Calculates your cache hit rate (effective databases are at 99% and up).
Record(
name='index hit rate',
ratio=Decimal('0.99994503346970922117')
)
:returns: list of Records
"""
return self.execute(sql.CACHE_HIT)
[docs] def index_usage(self):
"""
Calculates your index hit rate (effective databases are at 99% and up).
Record(
relname='pgbench_history',
percent_of_times_index_used=None,
rows_in_table=249976
)
:returns: list of Records
"""
return self.execute(sql.INDEX_USAGE)
[docs] def calls(self, truncate=False):
"""
Show 10 most frequently called queries. Requires the pg_stat_statements
Postgres module to be installed.
Record(
query='BEGIN;',
exec_time=datetime.timedelta(0, 0, 288174),
prop_exec_time='0.0%',
ncalls='845590',
sync_io_time=datetime.timedelta(0)
)
:param truncate: trim the Record.query output if greater than 40 chars
:returns: list of Records
"""
if self.pg_stat_statement():
if truncate:
select = """
SELECT CASE
WHEN length(query) < 40
THEN query
ELSE substr(query, 0, 38) || '..'
END AS qry,
"""
else:
select = 'SELECT query,'
return self.execute(sql.CALLS.format(select=select))
else:
return [self.get_missing_pg_stat_statement_error()]
[docs] def blocking(self):
"""
Display queries holding locks other queries are waiting to be
released.
Record(
pid=40821,
source='',
running_for=datetime.timedelta(0, 0, 2857),
waiting=False,
query='SELECT pg_sleep(10);'
)
:returns: list of Records
"""
return self.execute(
sql.BLOCKING.format(
query_column=self.query_column,
pid_column=self.pid_column
)
)
[docs] def outliers(self, truncate=False):
"""
Show 10 queries that have longest execution time in aggregate. Requires
the pg_stat_statments Postgres module to be installed.
Record(
qry='UPDATE pgbench_tellers SET tbalance = tbalance + ?;',
exec_time=datetime.timedelta(0, 19944, 993099),
prop_exec_time='67.1%',
ncalls='845589',
sync_io_time=datetime.timedelta(0)
)
:param truncate: trim the Record.qry output if greater than 40 chars
:returns: list of Records
"""
if self.pg_stat_statement():
if truncate:
query = """
CASE WHEN length(query) < 40
THEN query
ELSE substr(query, 0, 38) || '..'
END
"""
else:
query = 'query'
return self.execute(sql.OUTLIERS.format(query=query))
else:
return [self.get_missing_pg_stat_statement_error()]
[docs] def vacuum_stats(self):
"""
Show dead rows and whether an automatic vacuum is expected to be
triggered.
Record(
schema='public',
table='pgbench_tellers',
last_vacuum='2014-04-29 14:45',
last_autovacuum='2014-04-29 14:45',
rowcount='10',
dead_rowcount='0',
autovacuum_threshold='52',
expect_autovacuum=None
)
:returns: list of Records
"""
return self.execute(sql.VACUUM_STATS)
[docs] def bloat(self):
"""
Table and index bloat in your database ordered by most wasteful.
Record(
type='index',
schemaname='public',
object_name='pgbench_accounts::pgbench_accounts_pkey',
bloat=Decimal('0.2'),
waste='0 bytes'
)
:returns: list of Records
"""
return self.execute(sql.BLOAT)
[docs] def long_running_queries(self):
"""
Show all queries longer than five minutes by descending duration.
Record(
pid=19578,
duration=datetime.timedelta(0, 19944, 993099),
query='SELECT * FROM students'
)
:returns: list of Records
"""
if self.is_pg_at_least_nine_two():
idle = "AND state <> 'idle'"
else:
idle = "AND current_query <> '<IDLE>'"
return self.execute(
sql.LONG_RUNNING_QUERIES.format(
pid_column=self.pid_column,
query_column=self.query_column,
idle=idle
)
)
[docs] def seq_scans(self):
"""
Show the count of sequential scans by table descending by order.
Record(
name='pgbench_branches',
count=237
)
:returns: list of Records
"""
return self.execute(sql.SEQ_SCANS)
[docs] def unused_indexes(self):
"""
Show unused and almost unused indexes, ordered by their size relative
to the number of index scans. Exclude indexes of very small tables
(less than 5 pages), where the planner will almost invariably select
a sequential scan, but may not in the future as the table grows.
Record(
table='public.grade_levels',
index='index_placement_attempts_on_grade_level_id',
index_size='97 MB',
index_scans=0
)
:returns: list of Records
"""
return self.execute(sql.UNUSED_INDEXES)
[docs] def total_table_size(self):
"""
Show the size of the tables (including indexes), descending by size.
Record(
name='pgbench_accounts',
size='15 MB'
)
:returns: list of Records
"""
return self.execute(sql.TOTAL_TABLE_SIZE)
[docs] def total_indexes_size(self):
"""
Show the total size of all the indexes on each table, descending by
size.
Record(
table='pgbench_accounts',
index_size='2208 kB'
)
:returns: list of Records
"""
return self.execute(sql.TOTAL_INDEXES_SIZE)
[docs] def table_size(self):
"""
Show the size of the tables (excluding indexes), descending by size.
:returns: list
"""
return self.execute(sql.TABLE_SIZE)
[docs] def index_size(self):
"""
Show the size of indexes, descending by size.
:returns: list
"""
return self.execute(sql.INDEX_SIZE)
[docs] def total_index_size(self):
"""
Show the total size of all indexes.
Record(
size='2240 kB'
)
:returns: list of Records
"""
return self.execute(sql.TOTAL_INDEX_SIZE)
[docs] def locks(self):
"""
Display queries with active locks.
Record(
procpid=31776,
relname=None,
transactionid=None,
granted=True,
query_snippet='select * from hello;',
age=datetime.timedelta(0, 0, 288174),
)
:returns: list of Records
"""
return self.execute(
sql.LOCKS.format(
pid_column=self.pid_column,
query_column=self.query_column
)
)
[docs] def table_indexes_size(self):
"""
Show the total size of all the indexes on each table, descending by
size.
Record(
table='pgbench_accounts',
index_size='2208 kB'
)
:returns: list of Records
"""
return self.execute(sql.TABLE_INDEXES_SIZE)
[docs] def ps(self):
"""
View active queries with execution time.
Record(
pid=28023,
source='pgbench',
running_for=datetime.timedelta(0, 0, 288174),
waiting=0,
query='UPDATE pgbench_accounts SET abalance = abalance + 423;'
)
:returns: list of Records
"""
if self.is_pg_at_least_nine_two():
idle = "AND state <> 'idle'"
else:
idle = "AND current_query <> '<IDLE>'"
return self.execute(
sql.PS.format(
pid_column=self.pid_column,
query_column=self.query_column,
idle=idle
)
)
[docs] def version(self):
"""
Get the Postgres server version.
Record(
version='PostgreSQL 9.3.3 on x86_64-apple-darwin13.0.0'
)
:returns: list of Records
"""
return self.execute(sql.VERSION)