The website of a web programmer, footie fan, rubbish photographer, and jack of literally some trades.
At long last, the greatest google image search term has been found: http://t.co/HDsN2bWU ("dansband")
May 22, 2012, 6:07 p.m. via web
@jessemcnelis yeah, not a fan of this, but like I said, conforming to community expectations is more important than my personal preferences
May 15, 2012, 9:38 p.m. via web
@jessemcnelis but the best choice is to follow the community conventions, even if we don't like them; in golang thats \t, in python spaces
May 15, 2012, 7:57 p.m. via web
@jessemcnelis spaces get you alignment (readability) w/o invisible rules; adjustable indent w/ \t is false promise, or inferior, in pactice
May 15, 2012, 7:56 p.m. via web
@jessemcnelis in python code, there are few C-style for's; nearly everything is iteration. go's range could do this with no new semantics
May 15, 2012, 7:51 p.m. via web
@jacobian think of rpython of as a pythonic C flavor with neat compile time optimizations; pypy written in that + a jit for user python code
May 15, 2012, 4:42 p.m. via web
OH: "@rays: Well, `fix` is a strong word."
May 8, 2012, 6:25 p.m. via web
commit #217
to
johnny-cache
May 7, 2012, 5:30 p.m.
Fixed NameError introduced in 4b35eb1009d7.
| johnny/utils.py |
Interesting #GSOC project, #pypi to #debian repos conversion: http://t.co/MPECaSME
April 24, 2012, 4:24 p.m. via web
commit 198fec2f
to
arachne
April 23, 2012, 5:46 p.m.
remove some unnecessary debug logging
-
@@ -237,17 +237,12 @@ def wrapped(*a, **kw): ignore_errors = kw.pop('ignore_errors', True) is_json = kw.pop('json', False) - logger.debug(" >> get %s" % (a[0][:100])) response = func(*a, **kw) # pre-load the content with a read content = response.content - logger.debug(" << get %s" % (a[0][:100])) # parse json if response.headers['content-type'].split(';')[0] in json_types or is_json: - t0 = time() response.json = json.loads(response.content) if response.content else {} - td = time() - t0 - logger.debug(" jj %s in %0.2f" % (naturalsize(len(response.content)), td)) # if an error occured and we waned to raise an exception, do it; we # can still take the response off of this error if response.status_code > 400 and not ignore_errors:
commit 558819f1
to
arachne
April 23, 2012, 6:07 a.m.
fix a serious performance issue in requests, which i will blog about because it's cost me an entire weekend of my life
-
@@ -35,6 +35,59 @@ 'text/json', ) +from requests import models + +import zlib + +def decompress(content, mode='gzip'): + """Decode a string. A copy of requests' stream_decompress.""" + if mode not in ("gzip", "deflate"): + raise ValueError("decompress mode must be gzip or deflate") + zlib_mode = 16 + zlib.MAX_WBITS if mode == 'gzip' else -zlib.MAX_WBITS + return zlib.decompress(content, zlib_mode) + +def untransfer(content, resp): + if "gzip" in resp.headers.get("content-encoding", ""): + content = decompress(content, "gzip") + elif "deflate" in resp.headers.get("content-encoding", ""): + content = decompress(content, "deflate") + return content + +class Response(models.Response): + def __init__(self): + super(Response, self).__init__() + # some versions set to False, some to None; we require False + self._content = False + + def iter_content(self, chunk_size=1, decode_unicode=False): + """Request's default response object will read 1 (one) byte at a time + from the raw response. This normally doesn't make a huge lot of + difference for a gevent application, but it does if some C library + (like ujson, or lxml) has the GIL and will not allow any other python + code while gevent's hub goes loco. This behavior absolutely kills our + spider's performance, so here we read the entire response in, decode, + and return. Requests was also calling str.join a needless amount.""" + content = untransfer(self.raw.read(), self) + return content + + @property + def content(self): + if self._content is False: + if self._content_consumed: + raise RuntimeError("The content for this response was already consumed.") + try: + if self.status_code is 0: + self._content = None + else: + self._content = self.iter_content() + except AttributeError: + self._content = None + self._content_consumed = True + return self._content + +if models.Response != Response: + models.Response = Response + class HttpError(Exception): def __init__(self, response): self.message = "%d encountered getting \"%s\"" % (response.status_code, response.url) @@ -186,6 +239,8 @@ def wrapped(*a, **kw): logger.debug(" >> get %s" % (a[0][:100])) response = func(*a, **kw) + # pre-load the content with a read + content = response.content logger.debug(" << get %s" % (a[0][:100])) # parse json if response.headers['content-type'].split(';')[0] in json_types or is_json:
@stephaniebeth Avignon was papal home for over a century! Chateauneuf du pape (prestigious wine), means "new chateau of the pope"
April 22, 2012, 9 p.m. via web
AMQP.get is a complete waste of time (literally). Set your phasers to "consume". #tmyk #psa
April 21, 2012, 11:30 p.m. via web
commit fb9d9e78
to
arachne
April 21, 2012, 10:37 p.m.
push some debug logging for the workerserver
-
@@ -19,6 +19,8 @@ from arachne.conf import merge, settings, require from arachne.utils import encode, decode +from humanize.filesize import naturalsize + # OAuth v1.0a support from requests-oauth from oauth_hook import OAuthHook @@ -182,10 +184,15 @@ def wrapped(*a, **kw): ignore_errors = kw.pop('ignore_errors', True) is_json = kw.pop('json', False) + logger.debug(" >> get %s" % (a[0][:100])) response = func(*a, **kw) + logger.debug(" << get %s" % (a[0][:100])) # parse json if response.headers['content-type'].split(';')[0] in json_types or is_json: + t0 = time() response.json = json.loads(response.content) if response.content else {} + td = time() - t0 + logger.debug(" jj %s in %0.2f" % (naturalsize(len(response.content)), td)) # if an error occured and we waned to raise an exception, do it; we # can still take the response off of this error if response.status_code > 400 and not ignore_errors:
commit f546808f
to
arachne
April 21, 2012, 10:28 p.m.
add support for consume/cancel and a basic channel consumer which plays well with gevent (much, much more well than polling with get)
-
@@ -4,6 +4,7 @@ """AMQP adapters for the scheduler.""" from functools import wraps +import logging from gevent import queue, sleep, getcurrent from time import time @@ -18,6 +19,8 @@ "poolsize": 5, } +logger = logging.getLogger(__name__) + def autoreconnect(func): @wraps(func) def wrapper(self, *a, **kw): @@ -40,7 +43,10 @@ def new_connection(self): con = AmqpClient(**c) return con -class Amqp(object): +class AmqpPool(object): + """A pooled Amqp client. Multiple connections are made and passed out + on demand, so they cannot be used by two different greenlets at once. It + might be better to use a single amqp connection with a Consumer.""" def __init__(self, **kw): config = merge(defaults, settings.like("amqp"), kw) required = ("port", "username", "password", "host", "vhost", "exchange", "queue") @@ -69,10 +75,21 @@ def poll(self, *a, **kw): with self.pool.connection() as client: return client.poll(*a, **kw) -class AmqpClient(object): + def consume(self, *a, **kw): + with self.pool.connection() as client: + return client.consume(*a, **kw) + + def cancel(self, *a, **kw): + with self.pool.connection() as client: + return client.cancel(*a, **kw) + +class Amqp(object): def __init__(self, **kw): - self.__dict__.update(kw) - self.config = kw + config = merge(defaults, settings.like("amqp"), kw) + required = ("port", "username", "password", "host", "vhost", "exchange", "queue") + require(self, config, required) + self.__dict__.update(config) + self.config = config self.reconnect() def reconnect(self): @@ -121,11 +138,43 @@ def poll(self, queue=None, timeout=None, every=0.1): m = self.get(queue) return m -# FIXME: a joinable queue? -class Queue(queue.Queue): - - def fill(self, client, queue=None): + def consume(self, callback, queue=None, no_ack=True): + """Start consuming messages from a channel. Returns the channel. + Use AmqpClient.cancel() to cancel this consuming.""" + self.tag = self.channel.basic_consume(queue or self.queue, callback=callback, no_ack=no_ack) + return self.channel + + def cancel(self, tag=None): + """Cancel consuming.""" + self.channel.basic_cancel(tag or self.tag) + +class Consumer(object): + """A queue consumer. This queue will consume a channel and fill up a local + synchronized queue which can then be polled by many greenlets. The consume + should be much lower impact than issuing a storm of failing gets.""" + def __init__(self, client=None, size=100): + self.greenlets = [] + self.messages = queue.Queue(int(size)) + self.client = client if client else Amqp() + + def start(self): + channel = self.client.consume(callback=self.fill) + while 1: + try: + channel.wait() + except Exception, e: + self.client.cancel() + logger.error("Error occured while waiting on channel: %s" % e) + self.client.reconnect() + channel = self.client.consume(callback=self.fill) + logger.error("leaving impossible-to-leave loop") + + def stop(self): + logger.debug("Stopping consumer") + self.client.cancel() + + def fill(self, message): """Fill a local gevent-synced queue with items from a client.""" - # FIXME: write this + self.messages.put(message)
@Imposeren not sure it's an easy fix; the cache key is based on the query. similar issue: orm queries using datetime.now() never cache
April 20, 2012, 3:40 p.m. via web
@Imposeren afraid so. Order by RANDOM() is pretty scary, though...
commit 778bca66
to
arachne
April 20, 2012, 6:01 p.m.
guard around header cache a bit differently
-
@@ -205,19 +205,22 @@ def wrapper(*a, **kw): return func(*a, **kw) url = requests_url(*a, **kw) - ch = header_cache.get(url) - if "expires" in ch and ch["expires"] > utcnow(): - raise CacheHit("Expires in the future.") - kw.setdefault("headers", {}).update(ch) + if settings.enable_header_cache: + ch = header_cache.get(url) + if ch: + if "expires" in ch and ch["expires"] > utcnow(): + raise CacheHit("Expires in the future.") + kw.setdefault("headers", {}).update(ch) response = func(*a, **kw) if response.status_code == 304: raise CacheHit("304 status code.") # set cache control headers if available - ch = cache_headers(response.headers) - if ch: - header_cache.set(url, cache_headers(response.headers)) + if settings.enable_header_cache: + ch = cache_headers(response.headers) + if ch: + header_cache.set(url, cache_headers(response.headers)) return response return wrapper @@ -254,19 +257,20 @@ def cache_headers(headers): def disable_header_cache(): """Utility function to disable the header cache.""" global header_cache + settings.enable_header_cache = False header_cache = DummyHeaderCache() def enable_header_cache(**kw): """Utility function to enable the header cache with arguments.""" global header_cache + settings.enable_header_cache = True header_cache = HeaderCache(**kw) class HeaderCache(object): """Keeps a cache of url headers.""" def __init__(self, **kw): # use specialized cache if available, else default cache location - self.config = merge(settings.like("header_cache"), kw) - self.config = merge(settings.like("memcached"), self.config) + self.config = merge(settings.like("memcached"), settings.like("header_cache"), kw) self.client = memcached.Memcached(**self.config) def get(self, url): @@ -288,6 +292,7 @@ def set(self, url, header): return # -- static modifications -- header_cache = HeaderCache() if settings.enable_header_cache else DummyHeaderCache() + get = wrapget(requests.get) post = wrapget(requests.post) head = wrapget(requests.head)
A fast event loop won't help resource-starved green threads. Proper scheduling is still really important, and non-trivial.
April 20, 2012, 12:16 a.m. via web
commit 2c03968d
to
arachne
April 19, 2012, 3:16 p.m.
apply qos to the queue channel
-
@@ -13,8 +13,9 @@ defaults = { "port": 5672, - "prefetch_count": 50, + "prefetch_count": 20, "queue_size": 100, + "poolsize": 5, } def autoreconnect(func): @@ -83,6 +84,7 @@ def reconnect(self): ) qa = dict(durable=False, auto_delete=False) self.channel = self.connection.channel() + self.channel.basic_qos(0, self.prefetch_count, False) self.channel.queue_declare(queue=self.queue,exclusive=False, **qa) self.channel.exchange_declare(self.exchange, type="fanout", **qa) self.channel.queue_bind(queue=self.queue, exchange=self.exchange)
commit a5477ba3
to
arachne
April 19, 2012, 2:52 p.m.
add timing utils to arachne.utils
-
@@ -5,8 +5,11 @@ import re import inspect +import time import zlib import ujson as json +import logging +from functools import wraps from collections import defaultdict class Registry(defaultdict): @@ -108,3 +111,43 @@ def __iter__(self): def __len__(self): return len(self.items) +def timer(f, threshold=0.5): + """Simple timing of a whole function. Does not take into consideration time + this greenlet has spent sleeping.""" + logger = logging.getLogger("%s.%s" % (f.__module__, f.__name__)) + @wraps(f) + def wrapper(*a, **kw): + t0 = time.time() + r = f(*a,**kw) + td = time.time() - t0 + if td > threshold: + logger.info("took %0.2fs (threshold: %0.2f)" % (td, threshold)) + return r + return wrapper + +class Stopwatch(object): + """A timer that allows you to make named ticks and can print a simple + breakdown of the time between ticks after it's stopped.""" + def __init__(self, name='Stopwatch'): + self.name = name + self.start = time.time() + self.ticks = [] + + def tick(self, name): + self.ticks.append((name, time.time())) + + def stop(self): + self.stop = time.time() + + def summary(self): + """Return a summary of timing information.""" + self.stop() + total = self.stop - self.start + s = "%s duration: %0.2f\n" % (self.name, total) + prev = ("start", self.start) + for tick in self.ticks: + s += (" %s => %s" % (prev[0], tick[0])).ljust(30) + "... %0.2fs\n" % (tick[1] - prev[1]) + prev = tick + s += (" %s => end" % (tick[0])).ljust(30) + "... %0.2fs" % (self.stop - tick[1]) + return s +
commit d6c90c73
to
arachne
April 19, 2012, 2:16 p.m.
pool changes, make argument aliasing safer against error
-
@@ -8,6 +8,7 @@ from time import time from arachne.conf import settings, merge, require +from arachne.utils import ConnectionPool from kombu.transport.amqplib import Connection, amqp defaults = { @@ -27,6 +28,17 @@ def wrapper(self, *a, **kw): return ret return wrapper +class AmqpConnectionPool(ConnectionPool): + def __init__(self, config, maxsize=10): + maxsize = int(config.get("poolsize", maxsize)) + super(AmqpConnectionPool, self).__init__(maxsize) + self.config = config + + def new_connection(self): + c = self.config + con = AmqpClient(**c) + return con + class Amqp(object): def __init__(self, **kw): config = merge(defaults, settings.like("amqp"), kw) @@ -34,31 +46,27 @@ def __init__(self, **kw): require(self, config, required) self.__dict__.update(config) self.config = config - self.pool = {} - - def client(self): - current = getcurrent() - if current in self.pool: - return self.pool[current] - c = self.config - client = AmqpClient(**c) - self.pool[current] = client - return client + self.pool = AmqpConnectionPool(config) def reconnect(self): - return self.client().reconnect() + with self.pool.connection() as client: + return client.reconnect() def status(self, **kw): - return self.client().status(**kw) + with self.pool.connection() as client: + return client.status(**kw) def publish(self, *a, **kw): - return self.client().publish(*a, **kw) + with self.pool.connection() as client: + return client.publish(*a, **kw) def get(self, *a, **kw): - return self.client().get(*a, **kw) + with self.pool.connection() as client: + return client.get(*a, **kw) def poll(self, *a, **kw): - return self.client().poll(*a, **kw) + with self.pool.connection() as client: + return client.poll(*a, **kw) class AmqpClient(object): def __init__(self, **kw):
-
@@ -20,7 +20,7 @@ def __init__(self, config, maxsize=10): super(MysqlConnectionPool, self).__init__(maxsize) self.config = config - def connection(self): + def new_connection(self): c = self.config con = umysql.Connection() con.connect(c['host'], c['port'], c['username'], c['password'], c['database']) @@ -35,13 +35,10 @@ def __init__(self, **kw): def query(self, sql, args=None): """Return the results for a query.""" - c = self.pool.get() - try: + with self.pool.connection() as c: if args: return c.query(sql, args) return c.query(sql) - finally: - self.pool.put(c) def getone(self, sql, args=None): return self.query(sql, args)[0]
-
@@ -40,7 +40,7 @@ def argument_alias(method, args): if not hasattr(plugin, 'aliases'): return args for k,v in plugin.aliases.iteritems(): - args[k] = args[v] + args[k] = args.get(v, None) return args def interval(seconds, **kw):
-
@@ -44,6 +44,7 @@ def decode(data): return json.loads(zlib.decompress(data)) from Queue import Queue +import contextlib class ConnectionPool(object): """A simple connection pool which uses a queue to limit how many @@ -60,7 +61,7 @@ def get(self): return pool.get() self.size += 1 try: - con = self.connection() + con = self.new_connection() except: self.size -= 1 raise @@ -69,7 +70,15 @@ def get(self): def put(self, con): self.pool.put(con) - def connection(self, *a, **kw): + @contextlib.contextmanager + def connection(self): + con = self.get() + try: + yield con + finally: + self.put(con) + + def new_connection(self, *a, **kw): raise NotImplementedError
commit 68cd62ab
to
arachne
April 18, 2012, 2:06 p.m.
implement a simple connection pool and use that to limit the number of concurrent mysql connections
-
@@ -7,35 +7,41 @@ import umysql import gevent +from arachne.utils import ConnectionPool from arachne.conf import settings, merge, require defaults = { "port": 3306, } -class Mysql(object): - def __init__(self, **kw): - config = merge(defaults, settings.like("mysql"), kw) - require(self, config, ("host", "password", "username", "database")) +class MysqlConnectionPool(ConnectionPool): + def __init__(self, config, maxsize=10): + maxsize = int(config.get("poolsize", maxsize)) + super(MysqlConnectionPool, self).__init__(maxsize) self.config = config - self.pool = {} - def client(self): - current = gevent.getcurrent() - if current in self.pool: - return self.pool[current] + def connection(self): c = self.config con = umysql.Connection() con.connect(c['host'], c['port'], c['username'], c['password'], c['database']) - self.pool[current] = con return con +class Mysql(object): + def __init__(self, **kw): + config = merge(defaults, settings.like("mysql"), kw) + require(self, config, ("host", "password", "username", "database")) + self.config = config + self.pool = MysqlConnectionPool(config) + def query(self, sql, args=None): """Return the results for a query.""" - c = self.client() - if args: - return c.query(sql, args) - return c.query(sql) + c = self.pool.get() + try: + if args: + return c.query(sql, args) + return c.query(sql) + finally: + self.pool.put(c) def getone(self, sql, args=None): return self.query(sql, args)[0]
-
@@ -43,6 +43,36 @@ def decode(data): """Decode data coming out of storage.""" return json.loads(zlib.decompress(data)) +from Queue import Queue + +class ConnectionPool(object): + """A simple connection pool which uses a queue to limit how many + connections to a single resource are made. Override the `connection` + method to make new connections to your resource.""" + def __init__(self, maxsize=10): + self.maxsize = maxsize + self.pool = Queue() + self.size = 0 + + def get(self): + pool = self.pool + if self.size >= self.maxsize or pool.qsize(): + return pool.get() + self.size += 1 + try: + con = self.connection() + except: + self.size -= 1 + raise + return con + + def put(self, con): + self.pool.put(con) + + def connection(self, *a, **kw): + raise NotImplementedError + + from heapq import heappush, heappop, heapify, heapreplace class Heap(object):
Re-reading Tanenbaum & Steen's Distributed Systems; embarrassing, the old wheels we in the web community proudly show off as new discovery
April 18, 2012, 2:39 a.m. via web
public domain