Source code for multicorn

# -*- coding: utf-8 -*-
"""
Base multicorn module.

This module contains all the python code needed by the multicorn C extension
to postgresql.

You should install it in the python path available to the user running
postgresql (usually, the system wide python installation).

"""

import sys
from collections import namedtuple
try:
    from collections import OrderedDict
except ImportError:
    import collections
    from ordereddict import OrderedDict
    collections.OrderedDict = OrderedDict

__version__ = '__VERSION__'

ANY = object()
ALL = object()
UNBOUND = object()


SortKey = namedtuple("SortKey",
                              ["attname", "attnum", "is_reversed",
                               "nulls_first", "collate"])

"""
A SortKey describes the sort of one column an SQL query requested.

A query can request the sort of zero, one or multiple columns. Therefore, a list
of SortKey is provided to the ForeignDataWrapper, containing zero, one or more
SortKey.

Attributes:
    attname(str):       The name of the column to sort as defined in the postgresql
        table.
    attnum(int):        The position of the column to sort as defined in the
        postgresql table.
    is_reversed(bool):  True is the query requested a DESC order.
    nulls_first(bool):  If True, NULL values must appears at the beginning.
        Otherwise, they must appear at the end.
    collate(str):       The collation name to use to sort the data, as appearing
        in the postgresql cluster.
"""

[docs]class Qual(object): """A Qual describes a postgresql qualifier. A qualifier is here defined as an expression of the type:: col_name operator value For example:: mycolumn > 3 mycolumn = ANY(1,2,3) mycolumn ~~ ALL('A%','AB%', '%C') Attributes: field_name (str): The name of the column as defined in the postgresql table. operator (str or tuple): The name of the operator if a string. Example: =, <=, ~~ (for a like clause) If it is a tuple, then the tuple is of the form (operator name, ANY or ALL). The tuple represents a comparison of the form WHERE field = ANY(1, 2, 3), which is the internal representation of WHERE field IN (1, 2, 3) value (object): The constant value on the right side """ def __init__(self, field_name, operator, value): """Constructs a qual object. Instantiated from the C extension with the field name, operator and value extracted from the postgresql where clause. Accepts every field from the qual. """ self.field_name = field_name self.operator = operator self.value = value @property def is_list_operator(self): """ Returns: True if this qual represents an array expr, False otherwise """ return isinstance(self.operator, tuple) @property def list_any_or_all(self): """ Returns: ANY if and only if: - this qual is a list operator - the operator applies as an 'ANY' clause (eg, = ANY(1,2,3)) ALL if and only if: - this is a list operator - the operator applies as an 'ALL' clause (eg, > ALL(1, 2, 3)) None if this is not a list operator. """ if self.is_list_operator: return ANY if self.operator[1] else ALL return None def __repr__(self): if self.is_list_operator: value = '%s(%s)' % ( 'ANY' if self.list_any_or_all == ANY else 'ALL', self.value) operator = self.operator[0] else: value = self.value operator = self.operator return ("%s %s %s" % (self.field_name, operator, value)) def __eq__(self, other): if isinstance(other, Qual): return (self.field_name == other.field_name and self.operator == other.operator and self.value == other.value) return False def __hash__(self): return hash((self.field_name, self.operator, self.value))
[docs]class ForeignDataWrapper(object): """Base class for all foreign data wrapper instances. Though not required, ForeignDataWrapper implementation should inherit from this class. """ _startup_cost = 20
[docs] def __init__(self, fdw_options, fdw_columns): """The foreign data wrapper is initialized on the first query. Args: fdw_options (dict): The foreign data wrapper options. It is a dictionary mapping keys from the sql "CREATE FOREIGN TABLE" statement options. It is left to the implementor to decide what should be put in those options, and what to do with them. fdw_columns (dict): The foreign datawrapper columns. It is a dictionary mapping the column names to their ColumnDefinition. """ pass
[docs] def get_rel_size(self, quals, columns): """ Method called from the planner to estimate the resulting relation size for a scan. It will help the planner in deciding between different types of plans, according to their costs. Args: quals (list): A list of Qual instances describing the filters applied to this scan. columns (list): The list of columns that must be returned. Returns: A tuple of the form (expected_number_of_rows, avg_row_width (in bytes)) """ return (100000000, len(columns) * 100)
[docs] def can_sort(self, pathkeys): """ Method called from the planner to ask the FDW what are the sorts it can enforced, to avoid PostgreSQL to sort the data after retreiving all the rows. These sorts can come from explicit ORDER BY clauses, but also GROUP BY and DISTINCT clauses. The FDW has to inspect every sort, and respond which one are handled. The sorts are cumulatives. For example:: col1 ASC col2 DESC means that the FDW must render the tuples sorted by col1 ascending and col2 descending. Args: pathkeys (list): A list of :class:`SortKey` representing all the sorts the query must enforce. Return: The list of cumulative SortKey, for which the FDW can enforce the sort. """ return []
[docs] def get_path_keys(self): u""" Method called from the planner to add additional Path to the planner. By default, the planner generates an (unparameterized) path, which can be reasoned about like a SequentialScan, optionally filtered. This method allows the implementor to declare other Paths, corresponding to faster access methods for specific attributes. Such a parameterized path can be reasoned about like an IndexScan. For example, with the following query:: select * from foreign_table inner join local_table using(id); where foreign_table is a foreign table containing 100000 rows, and local_table is a regular table containing 100 rows. The previous query would probably be transformed to a plan similar to this one:: ┌────────────────────────────────────────────────────────────────────────────────────┐ │ QUERY PLAN │ ├────────────────────────────────────────────────────────────────────────────────────┤ │ Hash Join (cost=57.67..4021812.67 rows=615000 width=68) │ │ Hash Cond: (foreign_table.id = local_table.id) │ │ -> Foreign Scan on foreign_table (cost=20.00..4000000.00 rows=100000 width=40) │ │ -> Hash (cost=22.30..22.30 rows=1230 width=36) │ │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │ └────────────────────────────────────────────────────────────────────────────────────┘ But with a parameterized path declared on the id key, with the knowledge that this key is unique on the foreign side, the following plan might get chosen:: ┌───────────────────────────────────────────────────────────────────────┐ │ QUERY PLAN │ ├───────────────────────────────────────────────────────────────────────┤ │ Nested Loop (cost=20.00..49234.60 rows=615000 width=68) │ │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │ │ -> Foreign Scan on remote_table (cost=20.00..40.00 rows=1 width=40)│ │ Filter: (id = local_table.id) │ └───────────────────────────────────────────────────────────────────────┘ Returns: A list of tuples of the form: (key_columns, expected_rows), where key_columns is a tuple containing the columns on which the path can be used, and expected_rows is the number of rows this path might return for a simple lookup. For example, the return value corresponding to the previous scenario would be:: [(('id',), 1)] """ return []
[docs] def execute(self, quals, columns, pathkeys=[]): """Execute a query in the foreign data wrapper. This method is called at the first iteration. This is where the actual remote query execution takes place. Multicorn makes no assumption about the particular behavior of a ForeignDataWrapper, and will NOT remove any qualifiers from the PostgreSQL quals list. That means the quals will be rechecked anyway. Typically, an implementation would: - initialize (or reuse) some sort of connection to the remote system - transform the quals and columns arguments to a representation suitable for the remote system - fetch the data according to this query - return it to the C-extension. Although any iterable can be returned, it is strongly advised to implement this method as a generator to prevent loading the whole dataset in memory. Args: quals (list): A list of :class:`Qual` instances, containing the basic where clauses in the query. columns (list): A list of columns that postgresql is going to need. You should return AT LEAST those columns when returning a dict. If returning a sequence, every column from the table should be in the sequence. pathkeys (list): A list of :class:`SortKey` that the FDW said it can enforce. Returns: An iterable of python objects which can be converted back to PostgreSQL. Currently, such objects are: - sequences containing exactly as much columns as the underlying tables - dictionaries mapping column names to their values. If the pathkeys wasn't empty, the FDW has to return the data in the expected order. """ pass
@property def rowid_column(self): """ Returns: A column name which will act as a rowid column, for delete/update operations. One can think of it as a primary key. This can be either an existing column name, or a made-up one. This column name should be subsequently present in every returned resultset. """ raise NotImplementedError("This FDW does not support the writable API")
[docs] def insert(self, values): """ Insert a tuple defined by ''values'' in the foreign table. Args: values (dict): a dictionary mapping column names to column values Returns: A dictionary containing the new values. These values can differ from the ``values`` argument if any one of them was changed or inserted by the foreign side. For example, if a key is auto generated. """ raise NotImplementedError("This FDW does not support the writable API")
[docs] def update(self, oldvalues, newvalues): """ Update a tuple containing ''oldvalues'' to the ''newvalues''. Args: oldvalues (dict): a dictionary mapping from column names to previously known values for the tuple. newvalues (dict): a dictionary mapping from column names to new values for the tuple. Returns: A dictionary containing the new values. See :method:``insert`` for information about this return value. """ raise NotImplementedError("This FDW does not support the writable API")
[docs] def delete(self, oldvalues): """ Delete a tuple identified by ``oldvalues`` Args: oldvalues (dict): a dictionary mapping from column names to previously known values for the tuple. Returns: None """ raise NotImplementedError("This FDW does not support the writable API")
[docs] def pre_commit(self): """ Hook called just before a commit is issued, on PostgreSQL >=9.3. This is where the transaction should tentatively commited. """ pass
[docs] def rollback(self): """ Hook called when the transaction is rollbacked. """ pass
[docs] def commit(self): """ Hook called at commit time. On PostgreSQL >= 9.3, the pre_commit hook should be preferred. """ pass
[docs] def end_scan(self): """ Hook called at the end of a foreign scan. """ pass
[docs] def end_modify(self): """ Hook called at the end of a foreign modify (DML operations) """ pass
[docs] def begin(self, serializable): """ Hook called at the beginning of a transaction. """ pass
[docs] def sub_begin(self, level): """ Hook called at the beginning of a subtransaction. """ pass
[docs] def sub_rollback(self, level): """ Hook called when a subtransaction is rollbacked. """ pass
[docs] def sub_commit(self, level): """ Hook called when a subtransaction is committed. """ pass
@classmethod
[docs] def import_schema(self, schema, srv_options, options, restriction_type, restricts): """ Hook called on an IMPORT FOREIGN SCHEMA command. Args: schema (str): the foreign schema to import srv_options (dict): options defined at the server level options (dict): options defined at the IMPORT FOREIGN SCHEMA statement level restriction_type (str): One of 'limit', 'except' or None restricts (list): a list of tables as passed to the LIMIT TO or EXCEPT clause Returns: list: a list of :class:`multicorn.TableDefinition` """ raise NotImplementedError( "This FDW does not support IMPORT FOREIGN SCHEMA")
class TransactionAwareForeignDataWrapper(ForeignDataWrapper): def __init__(self, fdw_options, fdw_columns): super(TransactionAwareForeignDataWrapper, self).__init__( fdw_options, fdw_columns) self._init_transaction_state() def _init_transaction_state(self): self.current_transaction_state = [] def insert(self, values): self.current_transaction_state.append(('insert', values)) def update(self, oldvalues, newvalues): self.current_transaction_state.append( ('update', (oldvalues, newvalues))) def delete(self, oldvalues): self.current_transaction_state.append(('delete', oldvalues)) def rollback(self): self._init_transaction_state() """Code from python2.7 importlib.import_module.""" """Backport of importlib.import_module from 3.x.""" # While not critical (and in no way guaranteed!), it would be nice to keep this # code compatible with Python 2.3. def _resolve_name(name, package, level): """Return the absolute name of the module to be imported.""" if not hasattr(package, 'rindex'): raise ValueError("'package' not set to a string") dot = len(package) for x in range(level, 1, -1): try: dot = package.rindex('.', 0, dot) except ValueError: raise ValueError("attempted relative import beyond top-level " "package") return "%s.%s" % (package[:dot], name) def import_module(name, package=None): """Import a module. The 'package' argument is required when performing a relative import. It specifies the package to use as the anchor point from which to resolve the relative import to an absolute import. """ if name.startswith('.'): if not package: raise TypeError("relative imports require the 'package' argument") level = 0 for character in name: if character != '.': break level += 1 name = _resolve_name(name[level:], package, level) __import__(name) return sys.modules[name] def get_class(module_path): """ Internal function called from c code to import a foreign data wrapper. Args: module_path (str): A fully qualified name for a class. to import Ex: multicorn.csvfdw.CsvFdw. Returns: the class designated by module_path. """ module_path.split(".") wrapper_class = module_path.split(".")[-1] module_name = ".".join(module_path.split(".")[:-1]) module = import_module(module_name) return getattr(module, wrapper_class) def quote_identifier(value): return '"' + value.replace('"', '""') + '"' def quote_option(value): return "'" + value.replace("'", "''") + "'" def dict_to_optionstring(options): return ",\n".join( "%s %s" % (key, quote_option(value)) for key, value in sorted(options.items()))
[docs]class ColumnDefinition(object): """ Definition of Foreign Table Column. Attributes: column_name (str): the name of the column type_oid (int): the internal OID of the PostgreSQL type typmod (int): the type modifier (ex: VARCHAR(12)) type_name (str): the formatted type name, with the modifier (ex: VARCHAR(12)) base_type_name (str): the base type name, withou modifier (ex: VARCHAR) options (dict): a mapping of option names to option values, as strings. """ def __init__(self, column_name, type_oid=0, typmod=0, type_name="", base_type_name="", options=None): self.column_name = column_name self.type_oid = type_oid self.typmod = typmod self.type_name = type_name self.base_type_name = base_type_name self.options = options or {} def __repr__(self): return "%s(%s, %i, %s%s)" % ( self.__class__.__name__, self.column_name, self.type_oid, self.type_name, " options %s" % self.options if self.options else "") def to_statement(self): stmt = "%s %s" % ( quote_identifier(self.column_name), self.type_name) if self.options: stmt += " OPTIONS ( %s )" % dict_to_optionstring(self.options) return stmt
[docs]class TableDefinition(object): """ Definition of a Foreign Table. Attributes: table_name (str): the name of the table columns (str): a list of :class:`ColumnDefinition` objects options (dict): a dictionary containing the table-level options. """ def __init__(self, table_name, schema=None, columns=None, options=None): self.table_name = table_name self.columns = columns or [] self.options = options or {}
[docs] def to_statement(self, schema_name, server_name): """ Generates the CREATE FOREIGN TABLE statement associated with this definition. """ parts = [] parts.append("CREATE FOREIGN TABLE %s.%s (" % (quote_identifier(schema_name), quote_identifier(self.table_name))) parts.append(",\n".join(col.to_statement() for col in self.columns)) parts.append(" \n ) SERVER %s " % quote_identifier(server_name)) if self.options: parts.append(" OPTIONS (") parts.append(dict_to_optionstring(self.options)) parts.append(")") return '\n'.join(parts)