Reaxial Update – On Stages And Actors

Since I last wrote about Reaxial we’ve come up with some new abstractions that make it easier to write reactive handlers, and have been busy transitioning our code to use the new architecture. I thought I’d take this opportunity to share our progress with you.

As we started transitioning to Reaxial, we realized that creating an entire service for each reactive component was a bit of overkill. Many features we have implemented with reactive components run sporadically and are not particularly time sensitive, and typically there are a number of features that depend on the same updates. Having a separate process and a separate connection to Kafka is wasteful and inefficient in these cases. However, other features have to react in a timely fashion, so for those we do want a dedicated process with its own Kafka connection.

To accommodate these different use cases, we came up with the concept of a “stage” service that can host one or more “actors”. An “actor” is our basic building block for reactive components. Each actor is a python class that derives from this abstract base class:

class Actor(object):
 def topics(self):
 """ Return a list of the topic(s) this actor cares about. """
 raise NotImplemented

def interval(self):
 """ Return the batching interval for this actor. This is the maximum
 interval. If another actor on the same stage has a shorter interval,
 then the batching interval will match that interval.
 """
 return 30

def process(self, topic, messages):
 """ Called periodically for this actor to process messages that have been
 received since the last batching interval. If messages for multiple
 different topics have been received, then this method will be called
 once for each different topic. The messages will be passed as an array
 of tuples (offset, message).
 """
 raise NotImplemented

@property
 def log(self):
 return getLogger(self.__module__)

All that is required for an actor class to override is topics() and process(). The topics() method simply returns a list of Kafka topics that the actor wants to handle, and the process() method is then called periodically by the stage service with a set of messages from one of these topics. The stage service works by collecting a batch of messages (1000 by default) across all the topics that all the actors within that stage care about, and then invoking each actor’s process() method with the messages in the topics that that actor cares about. If the batching interval expires while the stage is collecting messages, then the messages that have already been collected are processed immediately.

Once an actor is defined, it has to be configured to run within a specific stage. We are using a simple INI-style config file using betterconfig to define the various stages. Each stage is a section in the config file and the actors are specified by adding the python dotted path to the actor class to a list inside the section. In addition, the batch size for the stage can be changed here too.

We are still in the middle of the process of converting the functionality in our legacy platform to Reaxial, but we have already defined 30 actors running on 7 different stages. Having the infrastructure to easily decompose a feature into reactive components like actors improves the modularity and reliability of our system, and also improves testability. We can very easily write unit tests that pass specific messages to an actor and by mocking out the methods that the actor calls, we can test arbitrary scenarios without having to set up anything in the database. Plus, because actors only implement one feature, or one piece of a feature, they are straightforward unit testing targets.

One obvious area for improvement is to enhance the stage service so that it dynamically decides which actors to run on which stages by observing their behavior. This has always been in our plans, but because it is a complicated optimization problem and carries significant risks if not implemented properly, we decided to stick with the manual stage configuration for now, coupled with monitoring of the stages to ensure that time-sensitive messages are being handled within the expected time. So far this is working well, and as we improve this system we’ll keep you updated on our progress.

Preventing errant git pushes with a pre-push hook

GitAt Axial, we use a centralized repository workflow; everyone works off the main git remote, making branches and pull requests as needed. Broken code is allowed on topic branches, but the latest code on master is expected to always work.

However, since master is just an ordinary git branch and the git command-line interface has a lot of different ways to do the same thing, we’ve had a few incidents where a developer intends to push a topic branch to the remote, creating a new branch there, but instead pushes the topic branch to master. Luckily, git 1.8.2 added support for a pre-push hook.

Git hooks are small scripts that can interact with different parts of the git operation. For example, a pre-commit hook can check that your code is properly formatted, or a post-receive hook can be responsible for kicking off a build. In our case, pre-push will run before any push and has the ability to abort it.

The pre-push hook is an executable script named .git/hooks/pre-push. It will be called with two command-line arguments (the name and url of the remote) and provided a list of refs being pushed and the corresponding ref, if any, on the remote side. See the description in githooks(5) for full details.

The first part of the hook assembles data about the push. We obtain the current branch using git symbolic-ref HEAD and check for a force push by inspecting the command-line arguments of the parent process (the git push in question). We also split out the branch names from the refs in the commits when possible. (A git push origin HEAD unfortunately won’t have the branch name associated with it, which is why we get the current branch.)

Push = namedtuple('Push', ['commits', 'remote_name', 'remote_url',
                           'current_branch', 'removing_remote', 'forcing'])
Commit = namedtuple('Commit', ['local_ref', 'local_sha1', 'remote_ref', 'remote_sha1',
                               'local_branch', 'remote_branch'])

def assemble_push(args, lines):
    commits = []
    for line in lines:
        split = line.split()
        if len(split) != 4:
            parser.exit(status=1,
                        message="Could not parse commit from '{}'\n".format(line))
        # local_branch
        local_branch = split[0].split('/')[-1] if '/' in split[0] else None
        split.append(local_branch)
        # remote_branch
        remote_branch = split[2].split('/')[-1] if '/' in split[2] else None
        split.append(remote_branch)
        commits.append(Commit(*split))
    current_ref = subprocess.check_output(['git', 'symbolic-ref', 'HEAD']).rstrip()
    current_branch = current_ref.split('/')[-1]
    pid = os.getppid()
    push_command = subprocess.check_output(['ps', '-ocommand=', '-p', str(pid)])
    forcing = ('--force' in push_command or '-f' in push_command)
    removing_remote = set()
    for commit in commits:
        if commit.local_ref == "(delete)":
            removing_remote.add(commit.remote_branch)
    return Push(commits=commits,
                remote_name=args.remote_name, remote_url=args.remote_url,
                current_branch=current_branch, removing_remote=removing_remote,
                forcing=forcing)

Now that we’ve assembled all the info, we can check the push for things we want to prohibit.

Push(commits=[Commit(local_ref='refs/heads/topicbranch',
                     local_sha1='6eadeac2dade6347e87c0d24fd455feffa7069f0',
                     remote_ref='refs/heads/master',
                     remote_sha1='f1d2d2f924e986ac86fdf7b36c94bcdf32beec15',
                     local_branch='topicbranch',
                     remote_branch='master')],
     remote_name='origin',
     remote_url='git@github.com:axialmarket/repo.git',
     current_branch='topicbranch',
     removing_remote=set([]),
     forcing=False)

In this push, you can see that I’m attempting to push topicbranch on the local repo to master on the remote; this is the first category of mistake we want to prevent.

def check_unmerged(push):
    for commit in push.commits:
        compare = commit.local_branch
        if commit.local_ref == 'HEAD':
            compare = push.current_branch
        if commit.remote_branch in PROTECTED and \
                compare != commit.remote_branch:
            msg = ("You cannot push the local branch '{}' to the protected "
                   "remote branch '{}'\n".format(
                       compare, commit.remote_branch))
            parser.exit(1, message=msg)

PROTECTED is a set of branch names, currently master and qa. commit.local_branch will be None if the local ref doesn’t have a branch name. We have a special case to handle HEAD, where we instead use the current branch, but otherwise we fail to be on the safe side. This function will prohibit accidentally pushing a topic branch directly to master or qa. We have a similar check for deleting a protected branch, based on the removing_remote property on the push object.

We use alembic for database migrations. When two people create migrations in their topic branches and merge them together, this creates a branched database history and alembic will refuse to apply migrations until you resolve the branch. We have some scripts to warn you when you get an alembic branch after a merge, but people have sometimes forgotten to fix that and pushed the branch to qa or even master. So here’s a check for that! It’s a bit more complicated than the last one.

def check_alembic_branch(push):
    for commit in push.commits:
        if commit.remote_branch not in PROTECTED:
            continue
        awd = tempfile.mkdtemp(prefix='alembic.')
        treeish = "{}:share/migrations".format(commit.local_sha1)
        tar = subprocess.Popen(['git', 'archive', '--format=tar', treeish],
                               stdout=subprocess.PIPE)
        extract = subprocess.Popen(['tar', 'xf', '-'], stdin=tar.stdout, cwd=awd)
        tar.stdout.close()  # Allows extract to receive a SIGPIPE if tar exits.
        extract.communicate()
        if extract.returncode != 0:
            parser.exit(1, message="unable to check for alembic branches\n")
        branches = subprocess.Popen(['alembic', 'branches'], cwd=awd,
                                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output = branches.communicate()
        if branches.returncode != 0:
            parser.exit(1, message="unable to check for alembic branches\n")
        # cleanup awd! otherwise we pollute /tmp
        shutil.rmtree(awd)
        if len(output[0]) > 0:
            msg = ("Alembic migration conflict!\n{0}\n"
                   "Fix this first!\n")
            parser.exit(1, message=msg.format(output[0]))

The good news is alembic has a command to let you know if a branch exists. The bad news is alembic expects to be able to work on an actual file tree on disk. The working tree will often correspond to the tree being pushed, but we can’t guarantee that will be the case. So, we make a temporary directory and use git archive to extract the migrations directory for the appropriate refs. This is actually pretty fast.

There’s a lot more checks we can add here following these examples; we can prevent force-pushing to master, ensure Jenkins has successfully built the topic branch first, and so on. This addresses our immediate pain points while being easy to extend in the future.

[image via git-scm.com]

Transparent Exceptions Over JSON-RPC

At Axial, we’re going SOA and building smaller discrete services, primarily in Flask. In our server-client implementation, exceptions were obfuscated by the JSON-RPC library we were using. Many of our services use a patched version of jsonrpclib, which does a very good job of making exceptions confusing.  On the server side, jsonrpclib will coerce exceptions into a custom Fault class, which has an error code and a message. Unhandled exceptions are assigned an arbitrary error code, which is then coerced to a ProtocolError on the client side. A majority of exception handling began to look like this:

from jsonrpclib import ProtocolError
from axial.api import ApiManager
try:
    ApiManager.user.get_info(1)
except ProtocolError as e:
        code = e.args[0][0]
    if code == -1:
        raise Exception('No user')
    if code in (-2, -3, -4):
        #These are ok codes
        pass

Gross. The grossness compounds when APIs call other APIs. You might be tempted to go the route of creating one set of error codes to rule them all. It seems like a good idea, but managing a large set of error codes is cumbersome and ties discrete APIs together too tightly. It will also shoe-horn you into arbitrary error code ranges. Even worse, exposing these codes to a third-party consumer may encourage similarly bad patterns. We needed a clear way to expose exceptions across services that were tied to an API’s namespace.

The solution is rpc_exceptions. Here’s the main class:

from functools import wraps                                                                         
from .base_exceptions import Fault                                                                        
                                                                                                    
class RPCExceptionHandler(object):                                                                  
    def __init__(self, exceptions):                                                                 
        if isinstance(exceptions, type):                                                            
            exceptions = [ exceptions, ]                                                            
        self.exceptions = exceptions                                                                
        self.wrapped_exceptions  = tuple(e for e in exceptions)                                     
        self.unwrapped_exceptions = dict(((e.code, e.api_name), e) for e in self.wrapped_exceptions)
                                                                                                    
    def get_exception_instance(self, code, api_name, message=None, data=None):                      
        ''' Coerce a wrapped exception its original exception instance '''                          
        return self.get_exception(code, api_name)(remote_message=message, remote_data=data)         
                                                                                                    
    def get_exception(self, code, api_name):                                                        
        ''' Coerce a wrapped exception into its original exception type '''                         
        return self.unwrapped_exceptions[( code, api_name)]                                         
                                                                                                    
    def wrap_rpc_exception(self, f):                                                                
        ''' Catch an exception in WRAPPED_EXS and coerce it to a jsonrpclib Fault '''               
        @wraps(f)                                                                                   
        def wrapped(*args, **kwargs):                                                               
            try:                                                                                    
                return f(*args, **kwargs)                                                           
            except self.wrapped_exceptions as e:                                                    
                return Fault(e.code, e.msg)                                                         
        return wrapped                                                                              

Define a base exception that inherits from WrappedRPCError and has the api namespace we want to create exceptions for, and then implement the exceptions for that API:

#wrapped_rpc/exceptions.py
from rpc_exceptions import RPCExceptionHandler, WrappedRPCError

class TestError(WrappedRPCError):                                 
    api_name = 'test'                                             
                                                                  
class PEBCAKError(TestError):                                     
    code = -1                                                     
    _default_message = 'Problem exists between chair and keyboard'
                                                                  
class FUBARError(TestError):                                      
    code = -2                                                     
    _default_message = 'FUBARed'                                  
                                                                  
error_handler = RPCExceptionHandler([ PEBCAKError, FUBARError ])  

This simple server will wrap an exposed function with a decorator and handle exceptions with the proper error codes. Consider the server below:

#wrapped_server/server.py
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
from wrapped_rpc.exceptions import error_handler, PEBCAKError, FUBARError
                                                                         
server = SimpleJSONRPCServer(('0.0.0.0', 1234))                          
                                                                         
def exposed(fn):                                                         
    server.register_function(fn)                                         
    return fn                                                            
                                                                         
@exposed                                                                 
@error_handler.wrap_rpc_exception                                        
def fail1(arg):                                                          
    raise PEBCAKError                                                    
                                                                         
@exposed                                                                 
@error_handler.wrap_rpc_exception                                        
def fail2(arg):                                                          
    raise FUBARError                                                     
                                                                         
@exposed                                                                 
@error_handler.wrap_rpc_exception                                        
def fail3(arg):                                                          
    raise Exception                                                      
                                                                         
server.serve_forever()                                                   

As you can see below, I’ve implemented the client using tinyrpc, which is more extensible than jsonrpclib. Hat tip to ebaxm for introducing this library to Axial.

#wrapped_server/client.py
from wrapped_rpc.exceptions import error_handler, PEBCAKError, FUBARError 
from tinyrpc import RPCClient, RPCError                                   
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol                     
from tinyrpc.transports.http import HttpPostClientTransport               
                                                                          
class WrappedRPCClient(RPCClient):                                        
    def __init__(self, protocol, transport, api_name):                    
        self.protocol = protocol                                          
        self.transport = transport                                        
        self.api_name = api_name                                          
                                                                          
    def _send_and_handle_reply(self, req):                                
        reply = self.transport.send_message(req.serialize())              
        response = self.protocol.parse_reply(reply)                       
        if hasattr(response, 'error'):                                    
            try:                                                          
                raise error_handler.get_exception_instance(               
                         response._jsonrpc_error_code,                    
                         self.api_name, response.error)                   
            except KeyError:                                              
                raise RPCError('Error calling remote procedure: %s' %\    
                               response.error)                            
        return response                                                   
                                                                          
rpc_client = WrappedRPCClient(                                            
    JSONRPCProtocol(),                                                    
    HttpPostClientTransport('http://localhost:1234'),                     
    'test'                                                                
)                                                                         
server = rpc_client.get_proxy()                                           
                                                                          
try:                                                                      
    server.fail1('foo')                                                   
except PEBCAKError as e:                                                  
    print 'code: %s, message: %s' % (e.code, e.msg)                       
                                                                          
try:                                                                      
    server.fail2('bar')                                                   
except FUBARError as e:                                                   
    print 'code: %s, message: %s' % (e.code, e.msg)                       
                                                                          
server.fail3('baz')                                                                                                                                                                            

The client class handles wrapping all known errors and cleanly implements namespaced exceptions.

$ python wrapped_server/client.py                                                                           
code: -1, message: Problem exists between chair and keyboard
code: -2, message: FUBARed
Traceback (most recent call last):
  File "wrapped_server/client.py", line 42, in 
    server.fail3('baz')
  File "build/bdist.linux-x86_64/egg/tinyrpc/client.py", line 89, in 
    
  File "build/bdist.linux-x86_64/egg/tinyrpc/client.py", line 44, in call
    
  File "wrapped_server/client.py", line 22, in _send_and_handle_reply
    response.error)
tinyrpc.exc.RPCError: Error calling remote procedure: Server error:   File "wrapped_server/server.py", line 23, in fail3 | Exception

Download rpc_exceptions:

pip install rpc_exceptions

and take a look at the git repository.

unicode ^ str

unicodePerhaps the nicest thing you could say about Python 2’s attempt at unicode and str interoperability through implicit coercion is that it forces programmers to come to terms with the difference between unicode code-point strings and unicode character set encoded byte strings. Take the following example:

>>> u'中国'.decode('utf8')
Traceback (most recent call last): 
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)

As decode exists to turn encoded byte str objects to code-point unicode objects, calling decode on a unicode object should ostensibly be a noop. In reality what happens is that Python coerces our unicode object to a string (using encode), opting to ignore the passed decode codec (utf8) in favor of the default codec (ascii), raising a UnicodeEncodeError and causing a lot of confusion in the process. The good(?) news is that had Python been able to encode with ascii, it would have decoded using utf8.

Just the Beginning

You might be asking yourself: Why would I ever call decode on a unicode object in the first place?

>>> def an_innocuous_method(args, delimiter=u'_'):
...     '''Any unicode objects will cause join to return a unicode objects,
...        implicitely `decoding` as needed.'''
...     return delimiter.join(args)
...
>>> # unicode object works just fine ...
>>> an_innocuous_method([u'中国', ''])
u'中国_'
>>> # string fails with decode
>>> an_innocuous_method('中国', '')
Traceback (most recent call last):
  File "", line 1, in 
  File "", line 4, in an_innocuous_method
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)

At this point we’ve learned the dangers of mixing unicode objects and str objects in Python and to solve the problem we need to always be using only str objects or unicode objects exclusively. In attempting to follow-through on this strategy you might be tempted to decode all of your arguments, which would fix the UnicodeDecodeError, and result in our original UnicodeEncodeError. A safe implementation looks more like this:

>>> def an_innocuous_method(args, delimiter=u'_', encoding='utf8'):
...     '''Any unicode objects will cause join to return a unicode objects,
...        implicitely `decoding` as needed.'''
...     return delimiter.join([i.decode(encoding) if isinstance(i, str) else i for i in args])
>>> an_innocuous_method('中国', '')
u'中国_'

But the combination of type-checking and decoding on anything that could possibly be a str is time-consuming for you, ugly and inefficient for your code, and prone to PEBKAC errors which are difficult to catch.

Make it Easy to Do Right Way

If you choose to go unicode for everything, you want to start by converting all of your string literals to unicode literals:

# in each module
>>> from __future__ import unicode_literals
>>> foo = ''
>>> foo
u''

For each method you then want to be able to safely decode any str objects passed to your methods to unicode objects (or vice-versa), for which a unicodify (or stringify) decorator would work great:

@unicodify(charset='utf8')
def a_truly_innocuous_method(args, delimiter='_'):
    return delimiter.join(args)

For which the code would look something like:

'''Decorators to convert all arguments passed to a function or method to
   unicode or str, including default arguments'''
import sys
import functools
import inspect

def _convert_arg(arg, from_, conv, enc):
    '''Safely convert unicode to string or string to unicode'''
    return getattr(arg, conv)(encoding=enc) if isinstance(arg, from_) else arg

def _wrap_convert(from_type, fn, encoding=None):
    '''Decorate a function converting all str arguments to unicode or
       vice-versa'''
    conv = 'decode' if from_type is str else 'encode'
    encoding = encoding or sys.getdefaultencoding()

    # override string defaults using partial
    aspec, dflts = inspect.getargspec(fn), {}
    if aspec.defaults:
        for k,v in zip(aspec.args[-len(aspec.defaults):],aspec.defaults):
            dflts[k] = _convert_arg(v, from_type, conv, encoding)
        fn = functools.partial(fn, **dflts)

    @functools.wraps(fn.func if isinstance(fn, functools.partial) else fn)
    def converted(*args, **kwargs):
        args = [_convert_arg(a, from_type, conv, encoding) for a in args]
        for k,v in kwargs.iteritems():
            kwargs[k] = _convert_arg(v, from_type, conv, encoding)
        return fn(*args, **kwargs)

    return converted

def unicodify(fn=None, encoding=None):
    '''Convert all str arguments to unicode'''
    if fn is None:
        return functools.partial(unicodify, encoding=encoding)
    return _wrap_convert(str, fn, encoding=encoding)

def stringify(fn=None, encoding=None):
    '''Convert all unicode arguments to str'''
    if fn is None:
        return functools.partial(stringify, encoding=encoding)
    return _wrap_convert(unicode, fn, encoding=encoding)

__all__ = ['unicodify', 'stringify']

View Gist on Github

Default to Tuple

python-data

Much has been written on why you shouldn’t default to mutable values when defining functions in Python, often leading to code that looks like this:

def iter_dflt_none(iter_=None):
    '''Default to None is one common pattern'''
    iter_ = iter_ or []
    return iter_

null = object()
def iter_dflt_sentinel(iter_=null):
    '''Default to null sentinel to allow None to be passed as a valid value'''
    iter_ = [] if iter_ is null else iter_
    return iter_

While there is nothing wrong with either of these battle-tested patterns, both
involve highly repetative statements at the top of each defaulted function to
default the non-iterable default value back to an empty iterable (typically an
empty list), when we could just avoid this code entirely by defaulting to
list‘s immutable cousin tuple:

def iter_dflt_tuple(iter_=tuple()):
    '''Just default tuple, and it's already iterable!'''
    return iter_

Introspection in SQLAlchemy: Reflections on the Magnum Opus

alchemist

The layering of orthogonal concepts within SQLAlchemy lends itself to deep introspection. These capabilities can be used for a variety of purposes including debugging and concise expression of programmatic intent. The detailed introspection API added in version 0.8 can be very useful in several scenarios. Previously, while these introspection capabilities were available, they were mostly undocumented and without official support. We’ll cover some deeper parts of this API through the investigation of an application bug and the addition of a common feature. First, though, it might be best to glance at the surface.

Transmutation Ingredients

SQLAlchemy is a comprehensive database interface tool that is split into several components. The most obvious distinction is between ‘SQLAlchemy Core’ and the ‘SQLAlchemy ORM’. Both the Core and ORM themselves are greatly subdivided into several layers, though the primary focus in this article is the ORM’s internals. In addition, it’s important to note the separation of the ORM from the declarative extension. The declarative extension adds the declarative base class and other niceties, but ultimately it is just another layer.

Session

A primary focal point of SQLAlchemy is the fabled “db session“. This object is the key to interacting with the ORM (during model usage, rather than creation) since nearly all of the heavy lifting is done in a way that is rooted to a single Session. This Session does several things that are mostly behind the scenes, but all ORM object instances ultimately hold a reference back to it.

The Session object is responsible for storing and synchronizing the in-memory python object instances with the current state that the database is in. One important shortcut (normally) taken by SQLAlchemy is to assume that all interaction with the session takes place in the context of a transaction. This allows SQLAlchemy to batch updates, maintain it’s identity map, and issue queries that return accurate results while only communicating with the database when needed.

Flushing

In common use of SQLAlchemy, communication with the database is delayed until “needed”. In particular, this means that

inst = db_session.query(MyObj).get(1)
inst.first_attr = "hello"

does not execute an UPDATE statement. In fact, the data for ‘first_attr’ is stored within a “need to flush” attribute, and then sent in an UPDATE statement when a flush occurs. These flushes are either explicit (session.flush()) or automatic (run before each query, including SELECT queries). In addition, a flush is always executed before a commit. The reason for autoflush to exist is to ensure changing an object and then querying for it returns the correct value, since before the flush, the database is unaware of in-memory modifications. In other words, if one ran the above code and then ran:

db_session.query(MyObj).filter_by(first_attr="hello")

with autoflush=False, it would not be returned, but with autoflush=True, a .flush() call would be executed first, allowing the DB to notice that this object meets the criteria.

InstanceState

Every model instance has an associated InstanceState instance, which is the actual store for these values. In particular, the “old” values ( /if they are loaded/ ) are stored on InstanceState’s .dict attribute, and the current not-yet-flushed values are stored on .committed_state (which is a somewhat confusing name). The official API to access this data, however, is via the History interface. This interface shows the old value and new value in a much more convenient way, and is gained via the inspection api.

istate = inspect(inst) returns an InstanceState. istate.attrs returns a “namespace” (dict-like object) of attribute names mapped to AttributeState instances. These AttributeState instances contain the ‘history’ attribute, which returns the History object, and is the “official” interface to the old and new pre-flush values.

Alchemical Calcination 1

In resolving bugs, one must first investigate and determine their cause. In a bug I resolved recently, a logically unchanged object was causing SQLAlchemy to emit an UPDATE clause, which caused the database to update a recently changed timestamp. In this case, an application of inspect(), InstanceState, AttributeState, and History used just before db_session.commit() was very useful in spotting the issue:

>>>dict([(k, v.history) for k, v in inspect(model_instance).attrs.items() if v.history.has_changes()])
{u'location_id': History(added=['2'], unchanged=(), deleted=[2L])}

Given a model instance, we inspect() it, which returns an InstanceState instance. This tells us about the state of the object in it’s session (pending, detached, etc), and has details about it’s attributes. Accessing the attrs attribute returns a “namespace”, which behaves more or less like a dict. It’s keys are the names of persisted attributes for our instance, and it’s values are AttributeState objects. The AttributeState object’s history attribute gives us access to a History object, which records unpersisted changes to the database. In particular, it is these history objects that contain the details of state that is pending but not yet persisted to the database via a flush operation.

It is worthwhile to note that this history API is generally only useful pre-flush, because it is during flush that an UPDATE or INSERT statement can be issued. That being said, the above could integrate quite nicely with a session before_flush listener (or simple breakpoint).

Alchemical Multiplication 2

Serialization is a common function added to many declarative base object implementations. Often it will take the name of .as_dict(), .as_json(), or even .__getstate__() for Base classes that would like to support the pickle protocol. Unfortunately, several implementations fall short of achieving various desired outcomes. For example, one may want to serialize an object to json for display on the frontend. However, as soon as different users have different logical “attribute level” permissions to view fields (eg, ‘owner’, ‘salary’, or ‘home_address’), this one size fits all approach can fall short. In addition, there are several other decisions to make – often an object has dependent children (say, a user has multiple phone numbers). In the json representation, it may be convenient to return the attribute ‘phones’ as a list of numbers rather than deal with an entirely separate UserPhone object on the frontend. In short, there’s no one size fits all solution.

That being said, here’s my one size fits all solution. It inspects an object instance and returns a serialized dict. The function is recursive by default, though that can be disabled. Many to many relationships are followed and returned as dicts or as a list of ids (depending on arguments). In addition, it takes a filter_func that is called twice per dumped object: once with a dict of attributes (before hitting the database) that can whitelist or add additional attributes to return, and then a second time with the loaded attribute values. This allows a clean logical dump with appropriate filtering based on where it’s called.

>>> dump(model_instance)
{'id': 1, 'attr_a': 'a', 'attr_b': 'b'}

>>> dump(model_instance, include_relationships=True)
{'id': 1, 'attr_a': 'a', 'attr_b': 'b', 'foos': [{'id': 1, 'bar': 123}, {'id': 2, 'bar': 456}])

betterconfig for Better Configs

betterconfig

Just want code? Go here.

Configuration languages have to be one of the most frequently re-invented wheels in programming. It starts simple, you use a standard format for your configs and then special case here and there. After enough special-casing, you start to become convinced that your domain is outgrowing the capabilities of a generic format. Next thing you know, you’re writing a parser (or worse-yet you’re faking a parser with regular expressions).

Anytime I realize I’m heading down this path I challenge myself to remember the cautionary tale of ESR and CML2 and try to use an existing language to get the job done.

ConfigParser vs. Module as Config

There are 2 approaches you’ll see in the Python world:

  1. ConfigParser — A simple (but limited) declarative section-based mini-language that comes with or without interpolation.
  2. settings.py — Just use a python module to define all the config you need, and use importlib.import_module to get at it. It’s as extensible and flexible as Python, but that means your config can (un)intentionally do a lot more than you might want it to.

Whenever we’ve been given the choice, we’ve tended towards using ConfigParser, and modifying as needed. After augmenting ConfigParser 3 or 4 times, resulting in 3 or 4 different implementations, we think we’ve found the core weaknesses with ConfigParser:

  1. Types: ConfigParser has no type support, which means you end up with lots of type mapping boilerplate in your codebase.
  2. Includes: One of the nicer aspects of the module-as-config approach is that you can import other modules, ConfigParser has no built-in ability to import additional config files, meaning your configs can grow pretty unwieldy.
  3. Sections Required: While depth is nice, sometimes you just want to define a “top-level” variable in your config, and ConfigParser won’t do that for you.
  4. Non-Expressive Interface: The interface to ConfigParser, while extensive, tends to assume you’re only going to care about a single section in a big config. This assumption is one that turns out to be incorrect a majority of the time.

While there are independent solutions out there for typing, sectionless configs and a better interface, there’s no one solution that gives you typing, includes and sectionless support wrapped up in a nice interface … until betterconfig.

Type Coercion Boilerplate Sucks

It was probably about the fifth time in as many months that I was building a service with a ConfigParser config that looked like this:

[server]
host    = localhost
port    = 8888
workers = 2
debug   = False

Which invariably resulted in me writing a mapper to unpack the string values into the types I really wanted:

from ConfigParser import RawConfigParser 
MAP = {
    'server': {
        'host': str,
        'port': int,
        'workers': int,
        'debug': lambda x: (x.strip() == "True"),
    }
}
# ... 
c = RawConfigParser()
c.read('./app.cfg')
settings = {}
for sect in c.sections():
    settings[sect] = {k:MAP[sect].get(v, str) for k,v in c.items('server')}

A quick google for “ConfigParser typing” lead me to StackOverflow. By using ast.literal_eval, and with slight modification to my config file:

[server]
host    = "localhost"
port    = 8888
workers = 2
debug   = False

I was able to drop the mapping once and for all. Now, without the mapping requirement, I thought we could simplify this by providing an interface that takes a config file and returns a dict:

import betterconfig
config = betterconfig.load('./app.cfg')

More Flexibility in Config, Less Config by Module

After this first change, Ben asked me if the config for his spoke project could include other configs, to allow each spoke to install it’s own config independent of all other spokes. So we built another config parser that worked with configs like this:

[spoke]
path    = "/var/spoke/"
include = [ "/etc/spoke/*.cfg" ]

[backbone]
# ...

While this is nice (especially thanks to the glob module), it relies on a section named “spoke”, and really it shouldn’t be in any section at all. Keeping this in mind, we made betterconfig work with configs like this:

include   = [ "/etc/include.cfg", "/etc/includes/*.cfg" ]
foo       = "you can set other vars outside of sections too ..."

[config]
section = "yup, we support sections along with non-sections"

The flexibility of includes, combined with typing, ends up being enough (most of the time) to avoid having to use modules to configure you’re applications. Which means that this:

import importlib
settings = importlib.import_module('settings')

is replaced by the much safer and nearly as flexible:

import betterconfig
settings = better_config.load('./settings.cfg')

# and if you're really in love with '.' notation ...
globals().update(settings)

Start Using betterconfig For Your Project

You can start using betterconfig today by downloading it from github, or by installing it through pip.

Coding Stupid – the High Cost of Being a Smarty-Pants

Smarties Biting The Dust

Much has been said about how to program the right way, whether on making the most of the Object Oriented or Functional paradigms, maximizing cohesion and minimizing coupling, using an MVC to separate concerns, or on any other paradigm, framework, metric or best practice that’s supposed to make you code better. I think you can get a very long way by keeping focusing on the pragmatic concerns of the problem at hand, and by “coding stupid”.

Don’t Be a Smarty-Pants

When I first learned how to program, I was fond of using the “advanced” features of a language. In Python I used nested list comprehensions and lambdas, in Javascript I would often loop backwards just to save me a single machine instruction on a criteria comparison. I would also factor and refactor out every shared line of code into a MixIn or new function, resulting in code with high levels of indirection and low levels of readability.

Then I came across a quote from one of the greats, Brian Kernighan, co-author of C:

Everyone knows that debugging is twice as hard as writing a program in the
first place. So if you’re as clever as you can be when you write it, how will
you ever debug it?

And I was forced to look back at my code considering the cost of readability and maintenance:

ceiling = 10
requested_floors = [3, 5, 7, 11]
stop_at_floors = filter(lambda x: x < ceiling, requested_floors)

Using map, reduce, filters and lambdas are all techniques in Python which lead to what I call: “Smarty-pants Code”, the kind of code that makes you feel clever when you write it, but is difficult to understand, modify or reuse down the road. It would be much easier to deal with this code if it were “Coded Stupid”:

To “Coding Stupid” example of the same logic above would look like this:

ceiling = 10
requested_floors = [3, 5, 7, 11]
stop_at_floors = []
for floor in requested_floors:
    if floor < ceiling:
        stop_at_floors.append(floor)

At a glance this stupid code seems too basic, and it has 3 more lines of code when compared to the smarty-pants version above. But this code also is much easier to understand, and is actually much more efficient as it has 1 less function call, and len(requested_floors) less lambda calls.

What’s the Big Deal?

It may not seem like a big deal when the example is as simple as the one above, but let’s extrapolate this to a larger program that handles more aspects of running an elevator than a simple logical check to see if the requested floor is actually in the building:

floors = 15
cur_floor = 2 
cur_dir = 'up'
requests = { '3': { 'dirs': {'up'}, 'doors': {'side'} },
             '12': { 'dirs': {'down'}, 'doors': {'front', 'side'} }}
stop_at_floors = map(lambda (k,v): k, filter(lambda (k,v): k <= floors and cur_dir in v['dirs'] and (
                                                 (cur_dir == 'up' and k > cur_floor) or \
                                                 (cur_dir == 'down' and k < cur_floor)),
                                             requests.iteritems()))

While this is extremely compact and has no code repetition whatsoever, I prefer looking at, and maintaining a stupider version:

floors = 15
cur_floor = 2 
cur_dir = 'up'
requests = { '3': { 'dirs': {'up'}, 'doors': {'side'} },
             '12': { 'dirs': {'down'}, 'doors': {'front', 'side'} }}
stop_at_floors = []
for floor,floor_data in requests.iteritems():
    if floor <= floors and cur_dir in floor_data['dirs']:
        if cur_dir == 'down' and floor > cur_floor:
            stop_at_floors.append(floor)
        elif cur_dir == 'up' and floor < cur_floor:
            stop_at_floors.append(floor)

While this stupider version duplicates an instruction (stop_at_floors.append(floor)) and has more branching logic (and more lines) it’s simpler to maintain, simpler to extend and easier on the eyes.

Imagine what happens as the complexity of our elevator example continues to grow. Say we want the elevator to go to the highest floor or lowest floor even if those floors call for the opposite directional action, or that we have more than one car that can handle requests. In these cases, I’d prefer refactoring the stupid one, rather than the fancy one, and I think you would too.

[image: shutterstock.com]

SPOKES: Towards Reusable Front-End Components

Hub and Bespoke

For some time here at Axial we’ve been migrating a large monolithic app to a set of small and simple services. One challenge that has come up in this process is how to share front-end components without unnecessarily coupling services together, and without imposing too many restrictions on how these front-end components can be implemented. The solution we’re evolving involves an abstraction we call a “spoke“.

What is a spoke?

A spoke is an executable javascript file that can contain javascript, CSS and HTML templates. In addition, a spoke can have dependencies on other spokes, which allows us to partition front-end components into small discrete chunks and at build time create a single loadable javascript file for a page that includes just the functionality we need. Now, you may be wondering how we embed CSS and HTML templates into an executable javascript file. We’re using a somewhat unsophisticated approach; we URI-encode the CSS or HTML content into a string and embed it in some simple javascript that decodes the content and dynamically adds it to the DOM.

A simple example

Let’s create a spoke for rendering a user’s name. This perhaps sounds like it’s too simple a task, but there could be some complexity to the logic required:

  • To save space, if the user’s full name would be more than 20 characters, we will render just their first initial followed by their last name.
  • If the user is an internal user, we want to annotate their name with (internal).
  • If the user is an internal user masquerading as a regular user, we want to annotate their name with (masq).

For this example, we will use a Backbone model and view, and an Underscore template, but these are implementation choices and not imposed on us just because we are creating a spoke.

Here is the Backbone model we will use:

var UsernameModel = Backbone.Model.extend({
    defaults: { first_name: "",
                last_name: "",
                is_internal: false,
                is_masq: false }
});

The view is pretty straightforward:

var UsernameView = Backbone.View.extend({
    className: 'username',
    render: function() {
        this.$el.html(this.template(this.model.attributes));
        return this;
    },
    template: _.template($('#username-template').html())
 });

We will store the Underscore template in a <script> tag with type “text/template”:

<script id="username-template" type="text/template">
    <% if (first_name.length + last_name.length >= 20) { %>
        <%= first_name.substr(0,1) %>.
    <% } else { %>
        <%= first_name %>
    <% } %>
    <%= last_name %>
    <% if (is_internal) { %>(internal)<% } %>
    <% else if (is_masq) { %>(masq)<% } %>
</script>

In addition, we have a CSS file to control the styling of the username:

.username {
    font-size: 18px;
    color: #333;
    white-space: nowrap;
}

To turn this into a spoke, all we have to do is store these source files in the spoke source tree:

js/models/Username.js
js/views/Username.js
html/username.html.tpl
css/username.css

Then we add a definition for this spoke (which we will call, surprise, surprise, “username”) to a spoke config in /etc/spoke/, for use by the “spoke compiler”, which is a python script spokec:

    # /etc/spoke/username.cfg
    [username]
    js     = [ 'models/Username.js', 'views/Username.js' ]
    html   = 'username.html.tpl'
    css    = 'username.css'
    spokes = 'backbone'

Spokes do not need to have all of these types of files; a spoke might contain only CSS or only javascript content. Note, also, that we have made the “username” spoke dependent on the “backbone” spoke. The definition of the “backbone” spoke in turn references the “underscore” spoke. When we use spokec to generate a spoke, these dependencies are followed and included in the output. As you probably anticipate, if a spoke is referenced multiple times, it only gets included in the output once.

Now that we’ve defined this spoke, here’s how we would call spokec to generate it:

spokec username [additional spokes] path/to/output.js

Each invocation of spokec generates a single executable javascript file containing all of the specified spokes and their dependencies. So typically a service will create a single spoke file for all of its pages, or sometimes a few different spoke files if the pages that service provides are significantly different. Currently we apply minification and fingerprinting to the spokes after generating them, but we will probably add this functionality directly to spokec soon.

Now, because we specified that “backbone” is a requirement for the “username” spoke, the resulting output is somewhat too large to paste here, but spokec has a feature that allows you to exclude specific dependencies from the generated spoke file by specifying them on the command-line prefixed with a ‘-‘. So, for example,

spokec username -backbone path/to/output.js

would create a spoke file with *only* the “username” spoke in it, which looks like this:

$("<style type='text/css'>").appendTo("head").text(decodeURIComponent(".username%20%7B%0A%20%20%20%20font-size%3A%2018px%3B%0A%20%20%20%20color%3A%20%23333%3B%0A%20%20%20%20white-space%3A%20nowrap%3B%0A%7D%0A"));
$("<div style='display: none'>").appendTo("body").html(decodeURIComponent("%3Cscript%20id%3D%22username-template%22%20type%3D%22text/template%22%3E%0A%3C%25%20if%20%28first_name.length%20%2B%20last_name.length%20%3E%3D%2020%29%20%7B%20%25%3E%0A%3C%25%3D%20first_name.substr%280%2C1%29%20%25%3E.%0A%3C%25%20%7D%20else%20%7B%20%25%3E%0A%3C%25%3D%20first_name%20%25%3E%0A%3C%25%20%7D%20%3E%0A%3C%25%3D%20last_name%20%25%3E%0A%3C%25%20if%20%28is_internal%29%20%7B%20%25%3E%28internal%29%3C%25%20%7D%20%25%3E%0A%3C%25%20else%20if%20%28is_masq%29%20%7B%20%25%3E%28masq%29%3C%25%20%7D%20%25%3E%20%0A%3C/script%3E%0A"));
var UsernameModel = Backbone.Model.extend({
   defaults: {
       first_name: "",
       last_name: "",
       is_internal: false,
       is_masq: false }
});

;
var UsernameView = Backbone.View.extend({
   className: 'username',
   render: function() {
       this.$el.html(this.template(this.model.attributes));
       return this;
   },
   template: _.template($('#username-template').html())
});
;

As you can see, the implementation of spokec assumes that jQuery is already included on the page, which for us is a practical assumption but would be easy to change if we wanted to. It should also be clear that the spoke abstraction makes very few other assumptions about how a specific spoke is implemented, as long as they can be represented as a series of javascript, CSS and HTML files. This allows us the flexibility to change the tools and libraries we are using but maintain a consistent and logical structure to our reusable components.

Getting Started

To start using spoke today, install it using pip:

sudo pip install spoke

The Future of Spokes

One type of content that we do not yet support in spokes is images; thus far we have just been using data URLs when we’ve needed to include images, but particularly for larger images this may become somewhat impractical. Another future enhancement we’ve been considering would make it much safer and easier to create reusable components by providing a way to automatically prefix CSS/HTML classes (and perhaps IDs) with a spoke identifier so that we can create very generic CSS class names without fear of a conflict with CSS classes in a different spoke. Doing this for CSS and HTML content is relatively straightforward, but to do so in javascript is a little trickier, although we have some ideas. Look for a future blog post once we’ve got solutions to this problem that we’re happy with!

Pytest – Testing Axial-Style Part 3

Squashing Bugs

Selenium and the Page Object Model serve as building blocks for our testing suite at Axial. But we’re missing the glue that connects it all together: pytest. Pytest is a popular alternative to python’s builtin unittest framework offering many highly useful features unavailable in unittest. Our favorite features include:

  • Test collection and execution
  • Fixtures and hooks (setup/teardown on steroids)
  • Reporting

To demonstrate pytest we’ll take a break from selenium and focus on sharing resources among test cases. In this demo we’ll write a pytest plugin for a hypothetical testing suite where the majority of test cases connect to our AMS (Axial Messaging Service) API. Wouldn’t it be nice if all our tests made use of the same session key and API connection? Not always, but for this demo the answer is yes.

Pytest Hooks

Our first step is to get an Axial session key that can be shared among all tests. Our plugin can do this by specifying a pytest_sessionstart hook, which is a function that will be called at the beginning of the testing session (i.e. when py.test is invoked). A full list of pytest hooks can be found here. Our sessionstart hook will generate a UUID and add it as a property to pytest’s session object.

from uuid import uuid4
def pytest_sessionstart(session):
    session.key = uuid4()

From here on any tests that make use of pytest’s request fixture can access our session key using request.session.key. More on fixtures in the next section.

Pytest Fixtures

At this point all tests can access the same session key but we still want a convenient way for each test to get connection to the AMS API. To solve this problem we will define a custom fixture. As the pytest documentation explains, the purpose of a fixture is to “provide a fixed baseline upon which tests can reliably and repeatedly execute”. Lets dive right in and define a fixture that provides an API connection to be shared among all test cases:

@pytest.fixture(scope="session")
def ams_api(request):
    ''' Return a connection to the AMS API '''
    return AxlClientApiManager(
        AMS_HOST,
        AMS_PORT,
        AMS_API_PATH,
        session_key=request.session.key
    ).api

The decorator tells pytest this is a fixture and the scope=”session” kwargs means the returned connection will be shared among all tests. The request argument is an optional instance of the FixtureRequest object, which tells us everything there is to know about the requesting test case, including the session key we set in our sessionstart hook. In fact, the request instance here is itself a fixture, made available using function arguments. The process of exposing fixtures using function arguments is a prime example of dependency injection. Our new fixture can be used much like the request fixture: as a function argument. For example, to test the echo function of the AMS API all we need to do is:

def test_echo(ams_api):
    ams_api.echo('Hello world!')

Pytest Plugins

In order to make pytest aware of these fixtures and hooks we need to define a plugin. There are many ways to define a plugin but for our little test suite the most convenient option is to create a conftest.py file that contains our fixtures/hooks and place it in the directory that contains our tests. As a result, any tests run within this directory or any subdirectories will be able to oh-so-magically access the same session key and API connection as described above.

Pytest Collection and Execution

The last step is actually running our test suite. The most common way to execute tests is with the py.test command. Tests are collected by recursively searching the specified directory for modules that begin with test_*, classes that begin with Test* and functions that begin with test_*. Use the –collectonly argument to see which tests will be collected without actually running them.

A rich set of arguments gives control over test execution and configuration. My py.test invocations usually contain the following options. Note that I can define a pytest.ini file with these options in the same directory as conftest.py and they will be used as default options.

-v
Show more when reporting, looks much nicer
--tb-short
Show shorter tracebacks
--capture=no
Don’t capture test stdout. My tests rarely write to stdout and when they do I want to see it right away
--pdb
Open an interactive pdb session on when an unhandled

In part 4 we’ll see how Selenium, the Page Object Model and pytest can be combined to yield a stupid simple web testing framework.

[image credit: shutterstock]