Source code for apachelogs.parser

import re
import attr
from   .directives import format2regex
from   .errors     import InvalidEntryError
from   .util       import assemble_datetime

[docs]@attr.s class LogParser: """ A class for parsing Apache access log entries in a given log format. Instantiate with a log format string, and then use the `~LogParser.parse()` and/or `~LogParser.parse_lines()` methods to parse log entries in that format. :param str format: an Apache log format :param str encoding: The encoding to use for decoding certain strings in log entries (see :ref:`directives`); defaults to ``'iso-8859-1'``. Set to ``'bytes'`` to cause the strings to be returned as `bytes` values instead of `str`. :param str errors: the error handling scheme to use when decoding; defaults to ``'strict'`` :raises InvalidDirectiveError: if an invalid directive occurs in ``format`` :raises UnknownDirectiveError: if an unknown directive occurs in ``format`` """ format = attr.ib() encoding = attr.ib(default='iso-8859-1') errors = attr.ib(default=None) def __attrs_post_init__(self): self._group_defs, self._rgx = format2regex(self.format) self._rgx = re.compile(self._rgx)
[docs] def parse(self, entry): """ Parse an access log entry according to the log format and return a `LogEntry` object. :param str entry: an access log entry to parse :rtype: LogEntry :raises InvalidEntryError: if ``entry`` does not match the log format """ entry = entry.rstrip('\r\n') m = self._rgx.fullmatch(entry) if not m: raise InvalidEntryError(entry, self.format) groups = [ conv(gr) for (_, _, conv), gr in zip(self._group_defs, m.groups()) ] if self.encoding != 'bytes': groups = [ gr.decode(self.encoding, self.errors or 'strict') if isinstance(gr, bytes) else gr for gr in groups ] return LogEntry( entry, self.format, [gdef[:2] for gdef in self._group_defs], groups, )
[docs] def parse_lines(self, entries, ignore_invalid=False): r""" Parse the elements in an iterable of access log entries (e.g., an open text file handle) and return a generator of `LogEntry`\s. If ``ignore_invalid`` is `True`, any entries that do not match the log format will be silently discarded; otherwise, such an entry will cause an `InvalidEntryError` to be raised. :param entries: an iterable of `str` :param bool ignore_invalid: whether to silently discard entries that do not match the log format :rtype: `LogEntry` generator :raises InvalidEntryError: if an element of ``entries`` does not match the log format and ``ignore_invalid`` is `False` """ for e in entries: try: yield self.parse(e) except InvalidEntryError: if not ignore_invalid: raise
[docs]class LogEntry: """ A parsed Apache access log entry. The value associated with each directive in the log format is stored as an attribute on the `LogEntry` object; for example, if the log format contains a ``%s`` directive, the `LogEntry` for a parsed entry will have a ``status`` attribute containing the status value from the entry as an `int`. See :ref:`directives` for the attribute names & types of each directive supported by this library. If the log format contains two or more directives that are stored in the same attribute (e.g., ``%D`` and ``%{us}T``), the given attribute will contain the first non-`None` directive value. The values of date & time directives are stored in a ``request_time_fields: dict`` attribute. If this `dict` contains enough information to assemble a complete (possibly naïve) `datetime.datetime`, then the `LogEntry` will have a ``request_time`` attribute equal to that `datetime.datetime`. """ def __init__(self, entry, format, group_names, groups): #: The original logfile entry with trailing newlines removed self.entry = entry #: The entry's log format string self.format = format for (k,_), v in zip(group_names, groups): d = self.__dict__ if isinstance(k, tuple): for k2 in k[:-1]: d = d.setdefault(k2, {}) k = k[-1] if d.get(k) is None: d[k] = v #else: Assume d[k] == v for prefix in ('original_', '', 'final_'): for midfix in ('begin_', '', 'end_'): target = prefix + midfix + 'request_time' if getattr(self, target + '_fields', None): setattr( self, target, assemble_datetime(getattr(self, target + '_fields')), ) def __eq__(self, other): return type(self) is type(other) and vars(self) == vars(other)