from pywb.warcserver.index.indexsource import FileIndexSource
from pywb.warcserver.index.aggregator import DirectoryIndexSource, CacheDirectoryMixin
from pywb.warcserver.index.aggregator import SimpleAggregator
from pywb.warcserver.index.cdxobject import CDXObject
from pywb.utils.binsearch import search
from pywb.utils.merge import merge
from warcio.timeutils import timestamp_to_datetime
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import os
# ============================================================================
[docs]class FileAccessIndexSource(FileIndexSource):
"""An Index Source class specific to access control lists"""
[docs] @staticmethod
def rev_cmp(a, b):
"""Performs a comparison between two items using the
algorithm of the removed builtin cmp
:param a: A value to be compared
:param b: A value to be compared
:return: The result of the comparison
:rtype: int
"""
return (a < b) - (a > b)
def _do_iter(self, fh, params):
"""Iterates over the supplied file handle to an access control list
yielding the results of the search for the params key
:param TextIO fh: The file handle to an access control list
:param dict params: The params of the
:return: A generator yielding the results of the param search
"""
exact_suffix = params.get('exact_match_suffix')
key = params['key']
if exact_suffix:
key += exact_suffix
for line in search(fh, key, prev_size=1, compare_func=self.rev_cmp):
yield line
# ============================================================================
[docs]class ReverseMergeMixin(object):
"""A mixin that provides revered merge functionality"""
def _merge(self, iter_list):
"""Merges the supplied list of iterators in reverse
:param iter_list: The list of iterators to be merged
:return: An iterator that yields the results of the reverse merge
"""
return merge(*(iter_list), reverse=True)
# ============================================================================
[docs]class AccessRulesAggregator(ReverseMergeMixin, SimpleAggregator):
"""An Aggregator specific to access control"""
# ============================================================================
[docs]class DirectoryAccessSource(ReverseMergeMixin, DirectoryIndexSource):
"""An directory index source specific to access control"""
INDEX_SOURCES = [('.aclj', FileAccessIndexSource)] # type: list[tuple]
# ============================================================================
[docs]class CacheDirectoryAccessSource(CacheDirectoryMixin, DirectoryAccessSource):
"""An cache directory index source specific to access control"""
# ============================================================================
[docs]class AccessChecker(object):
"""An access checker class"""
EXACT_SUFFIX = '###' # type: str
EXACT_SUFFIX_B = b'###' # type: bytes
# rules in the ACL file are followed by a white space (U+0020):
# for searching we need a match suffix which sorts/compares after
# (resp. before because we use the rev_cmp function). Simply add
# another '#' (U+0023 > U+0020)
EXACT_SUFFIX_SEARCH_B = b'####' # type: bytes
def __init__(self, access_source, default_access='allow', embargo=None):
"""Initialize a new AccessChecker
:param str|list[str]|AccessRulesAggregator access_source: An access source
:param str default_access: The default access action (allow)
:param dict embargo: A dict specifying optional embargo setting
"""
if isinstance(access_source, str):
self.aggregator = self.create_access_aggregator([access_source])
elif isinstance(access_source, list):
self.aggregator = self.create_access_aggregator(access_source)
else:
self.aggregator = access_source
self.default_rule = CDXObject()
self.default_rule['urlkey'] = ''
self.default_rule['timestamp'] = '-'
self.default_rule['access'] = default_access
self.default_rule['default'] = 'true'
self.embargo = self.parse_embargo(embargo)
[docs] def parse_embargo(self, embargo):
if not embargo:
return None
value = embargo.get('before')
if value:
embargo['before'] = timestamp_to_datetime(str(value))
value = embargo.get('after')
if value:
embargo['after'] = timestamp_to_datetime(str(value))
value = embargo.get('older')
if value:
delta = relativedelta(
years=value.get('years', 0),
months=value.get('months', 0),
weeks=value.get('weeks', 0),
days=value.get('days', 0))
embargo['older'] = delta
value = embargo.get('newer')
if value:
delta = relativedelta(
years=value.get('years', 0),
months=value.get('months', 0),
weeks=value.get('weeks', 0),
days=value.get('days', 0))
embargo['newer'] = delta
return embargo
[docs] def check_embargo(self, url, ts):
if not self.embargo:
return None
dt = timestamp_to_datetime(ts)
access = self.embargo.get('access', 'exclude')
# embargo before
before = self.embargo.get('before')
if before:
print(dt, before)
return access if dt < before else None
# embargo after
after = self.embargo.get('after')
if after:
return access if dt > after else None
# embargo if newser than
newer = self.embargo.get('newer')
if newer:
actual = datetime.utcnow() - newer
return access if actual < dt else None
# embargo if older than
older = self.embargo.get('older')
if older:
actual = datetime.utcnow() - older
return access if actual > dt else None
[docs] def create_access_aggregator(self, source_files):
"""Creates a new AccessRulesAggregator using the supplied list
of access control file names
:param list[str] source_files: The list of access control file names
:return: The created AccessRulesAggregator
:rtype: AccessRulesAggregator
"""
sources = {}
for filename in source_files:
sources[filename] = self.create_access_source(filename)
aggregator = AccessRulesAggregator(sources)
return aggregator
[docs] def create_access_source(self, filename):
"""Creates a new access source for the supplied filename.
If the filename is for a directory an CacheDirectoryAccessSource
instance is returned otherwise an FileAccessIndexSource instance
:param str filename: The name of an file/directory
:return: An instance of CacheDirectoryAccessSource or FileAccessIndexSource
depending on if the supplied filename is for a directory or file
:rtype: CacheDirectoryAccessSource|FileAccessIndexSource
:raises Exception: Indicates an invalid access source was supplied
"""
if os.path.isdir(filename):
return CacheDirectoryAccessSource(filename)
elif os.path.isfile(filename):
return FileAccessIndexSource(filename)
else:
raise Exception('Invalid Access Source: ' + filename)
[docs] def find_access_rule(self, url, ts=None, urlkey=None, collection=None, acl_user=None):
"""Attempts to find the access control rule for the
supplied URL otherwise returns the default rule
:param str url: The URL for the rule to be found
:param str|None ts: A timestamp (not used)
:param str|None urlkey: The access control url key
:param str|None collection: The collection, if any
:param str|None acl_user: The access control user, if any
:return: The access control rule for the supplied URL
if one exists otherwise the default rule
:rtype: CDXObject
"""
params = {'url': url,
'urlkey': urlkey,
'nosource': 'true',
'exact_match_suffix': self.EXACT_SUFFIX_SEARCH_B
}
if collection:
params['param.coll'] = collection
acl_iter, errs = self.aggregator(params)
if errs:
print(errs)
key = params['key']
key_exact = key + self.EXACT_SUFFIX_B
tld = key.split(b',')[0]
last_obj = None
last_key = None
for acl in acl_iter:
# skip empty/invalid lines
if not acl:
continue
acl_key = acl.split(b' ')[0]
acl_obj = None
if acl_key != last_key and last_obj:
return last_obj
if key_exact == acl_key:
acl_obj = CDXObject(acl)
if key.startswith(acl_key):
acl_obj = CDXObject(acl)
if acl_obj:
user = acl_obj.get('user')
if user == acl_user:
return acl_obj
elif not user:
last_key = acl_key
last_obj = acl_obj
# if acl key already less than first tld,
# no match can be found
if acl_key < tld:
break
return last_obj if last_obj else self.default_rule
def __call__(self, res, acl_user):
"""Wraps the cdx iter in the supplied tuple returning a
the wrapped cdx iter and the other members of the supplied
tuple in same order
:param tuple res: The result tuple
:param str acl_user: The user associated with this request (optional)
:return: An tuple
"""
cdx_iter, errs = res
return self.wrap_iter(cdx_iter, acl_user), errs
[docs] def wrap_iter(self, cdx_iter, acl_user):
"""Wraps the supplied cdx iter and yields cdx objects
that contain the access control results for the cdx object
being yielded
:param cdx_iter: The cdx object iterator to be wrapped
:param str acl_user: The user associated with this request (optional)
:return: The wrapped cdx object iterator
"""
last_rule = None
last_url = None
last_user = None
rule = None
for cdx in cdx_iter:
url = cdx.get('url')
timestamp = cdx.get('timestamp')
# if no url, possible idx or other object, don't apply any checks and pass through
if not url:
yield cdx
continue
access = None
if self.aggregator:
# TODO: optimization until date range support is included
if url == last_url and acl_user == last_user:
rule = last_rule
else:
rule = self.find_access_rule(url, timestamp,
cdx.get('urlkey'),
cdx.get('source-coll'),
acl_user)
access = rule.get('access', 'exclude')
if access != 'allow_ignore_embargo' and access != 'exclude':
embargo_access = self.check_embargo(url, timestamp)
if embargo_access and embargo_access != 'allow':
access = embargo_access
if access == 'exclude':
continue
if not access:
access = self.default_rule['access']
if access == 'allow_ignore_embargo':
access = 'allow'
cdx['access'] = access
yield cdx
last_rule = rule
last_url = url
last_user = acl_user