Source code for pywb.warcserver.index.cdxops

from pywb.warcserver.index.cdxobject import CDXObject, IDXObject
from pywb.warcserver.index.cdxobject import TIMESTAMP, STATUSCODE, MIMETYPE, DIGEST
from pywb.warcserver.index.cdxobject import OFFSET, LENGTH, FILENAME

from pywb.warcserver.index.query import CDXQuery

from warcio.timeutils import timestamp_to_sec, pad_timestamp
from warcio.timeutils import PAD_14_DOWN, PAD_14_UP

import bisect

from six.moves import zip, range, map
import re

from heapq import merge
from collections import deque


#=================================================================
[docs]def cdx_load(sources, query, process=True): """ merge text CDX lines from sources, return an iterator for filtered and access-checked sequence of CDX objects. :param sources: iterable for text CDX sources. :param process: bool, perform processing sorting/filtering/grouping ops """ cdx_iter = create_merged_cdx_gen(sources, query) # page count is a special case, no further processing if query.page_count: return cdx_iter cdx_iter = make_obj_iter(cdx_iter, query) if process and not query.secondary_index_only: cdx_iter = process_cdx(cdx_iter, query) custom_ops = query.custom_ops for op in custom_ops: cdx_iter = op(cdx_iter, query) if query.output == 'text': cdx_iter = cdx_to_text(cdx_iter, query.fields) elif query.output == 'json': cdx_iter = cdx_to_json(cdx_iter, query.fields) return cdx_iter
#=================================================================
[docs]def cdx_to_text(cdx_iter, fields): for cdx in cdx_iter: yield cdx.to_text(fields)
#=================================================================
[docs]def cdx_to_json(cdx_iter, fields): for cdx in cdx_iter: yield cdx.to_json(fields)
#=================================================================
[docs]def process_cdx(cdx_iter, query): if query.resolve_revisits: cdx_iter = cdx_resolve_revisits(cdx_iter) filters = query.filters if filters: cdx_iter = cdx_filter(cdx_iter, filters) if query.from_ts or query.to_ts: cdx_iter = cdx_clamp(cdx_iter, query.from_ts, query.to_ts) collapse_time = query.collapse_time if collapse_time: cdx_iter = cdx_collapse_time_status(cdx_iter, collapse_time) closest = query.closest reverse = query.reverse limit = query.limit if closest: cdx_iter = cdx_sort_closest(closest, cdx_iter, limit) elif reverse: cdx_iter = cdx_reverse(cdx_iter, limit) elif limit: cdx_iter = cdx_limit(cdx_iter, limit) return cdx_iter
#=================================================================
[docs]def create_merged_cdx_gen(sources, query): """ create a generator which loads and merges cdx streams ensures cdxs are lazy loaded """ # Optimize: no need to merge if just one input if len(sources) == 1: cdx_iter = sources[0].load_cdx(query) else: source_iters = map(lambda src: src.load_cdx(query), sources) cdx_iter = merge(*(source_iters)) for cdx in cdx_iter: yield cdx
#=================================================================
[docs]def make_obj_iter(text_iter, query): """ convert text cdx stream to CDXObject/IDXObject. """ if query.secondary_index_only: cls = IDXObject else: cls = CDXObject return (cls(line) for line in text_iter)
#=================================================================
[docs]def cdx_limit(cdx_iter, limit): """ limit cdx to at most `limit`. """ # for cdx, _ in itertools.izip(cdx_iter, xrange(limit)): # yield cdx return (cdx for cdx, _ in zip(cdx_iter, range(limit)))
#=================================================================
[docs]def cdx_reverse(cdx_iter, limit): """ return cdx records in reverse order. """ # optimize for single last if limit == 1: last = None for cdx in cdx_iter: last = cdx if not last: return yield last reverse_cdxs = deque(maxlen=limit) for cdx in cdx_iter: reverse_cdxs.appendleft(cdx) for cdx in reverse_cdxs: yield cdx
#=================================================================
[docs]class CDXFilter(object): def __init__(self, string): # invert filter self.invert = string.startswith('!') if self.invert: string = string[1:] # exact match if string.startswith('='): string = string[1:] self.compare_func = self.exact # regex match elif string.startswith('~'): string = string[1:] self.compare_func = self.rx_match # contains match else: self.compare_func = self.contains parts = string.split(':', 1) # no field set, apply filter to entire cdx if len(parts) == 1: self.field = '' # apply filter to cdx[field] else: self.field = parts[0] self.field = CDXObject.CDX_ALT_FIELDS.get(self.field, self.field) string = parts[1] # make regex if regex mode if self.compare_func == self.rx_match: self.regex = re.compile(string) else: self.filter_str = string def __call__(self, cdx): if not self.field: val = str(cdx) else: val = str(cdx.get(self.field, '')) matched = self.compare_func(val) return matched ^ self.invert
[docs] def exact(self, val): return (self.filter_str == val)
[docs] def contains(self, val): return (self.filter_str in val)
[docs] def rx_match(self, val): res = self.regex.match(val) return res is not None
#=================================================================
[docs]def cdx_filter(cdx_iter, filter_strings): """ filter CDX by regex if each filter is :samp:`{field}:{regex}` form, apply filter to :samp:`cdx[{field}]`. """ # Support single strings as well if isinstance(filter_strings, str): filter_strings = [filter_strings] filters = [CDXFilter(filter_str) for filter_str in filter_strings] for cdx in cdx_iter: if all(x(cdx) for x in filters): yield cdx
#=================================================================
[docs]def cdx_clamp(cdx_iter, from_ts, to_ts): """ Clamp by start and end ts """ if from_ts and len(from_ts) < 14: from_ts = pad_timestamp(from_ts, PAD_14_DOWN) if to_ts and len(to_ts) < 14: to_ts = pad_timestamp(to_ts, PAD_14_UP) for cdx in cdx_iter: if from_ts and cdx[TIMESTAMP] < from_ts: continue if to_ts and cdx[TIMESTAMP] > to_ts: continue yield cdx
#=================================================================
[docs]def cdx_collapse_time_status(cdx_iter, timelen=10): """ collapse by timestamp and status code. """ timelen = int(timelen) last_token = None for cdx in cdx_iter: curr_token = (cdx[TIMESTAMP][:timelen], cdx.get(STATUSCODE, '')) # yield if last_dedup_time is diff, otherwise skip if curr_token != last_token: last_token = curr_token yield cdx
#=================================================================
[docs]def cdx_sort_closest(closest, cdx_iter, limit=10): """ sort CDXCaptureResult by closest to timestamp. """ closest_cdx = [] closest_keys = [] closest_sec = timestamp_to_sec(closest) for cdx in cdx_iter: sec = timestamp_to_sec(cdx[TIMESTAMP]) key = abs(closest_sec - sec) # create tuple to sort by key #bisect.insort(closest_cdx, (key, cdx)) i = bisect.bisect_right(closest_keys, key) closest_keys.insert(i, key) closest_cdx.insert(i, cdx) if len(closest_cdx) == limit: # assuming cdx in ascending order and keys have started increasing if key > closest_keys[-1]: break if len(closest_cdx) > limit: closest_cdx.pop() for cdx in closest_cdx: yield cdx
#for cdx in map(lambda x: x[1], closest_cdx): # yield cdx #================================================================= # resolve revisits # Fields to append from cdx original to revisit ORIG_TUPLE = [LENGTH, OFFSET, FILENAME]
[docs]def cdx_resolve_revisits(cdx_iter): """ resolve revisits. this filter adds three fields to CDX: ``orig.length``, ``orig.offset``, and ``orig.filename``. for revisit records, these fields have corresponding field values in previous non-revisit (original) CDX record. They are all ``"-"`` for non-revisit records. """ originals = {} for cdx in cdx_iter: is_revisit = cdx.is_revisit() digest = cdx.get(DIGEST) original_cdx = None # only set if digest is valid, otherwise no way to resolve if digest: original_cdx = originals.get(digest) if not original_cdx and not is_revisit: originals[digest] = cdx if original_cdx and is_revisit: fill_orig = lambda field: original_cdx.get(field, '-') # Transfer mimetype and statuscode if MIMETYPE in cdx: cdx[MIMETYPE] = original_cdx.get(MIMETYPE, '') if STATUSCODE in cdx: cdx[STATUSCODE] = original_cdx.get(STATUSCODE, '') else: fill_orig = lambda field: '-' # Always add either the original or empty '- - -' for field in ORIG_TUPLE: cdx['orig.' + field] = fill_orig(field) yield cdx