Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN apt-get update && \
libsnappy1v5 libsnappy-dev libonig2 make gcc g++ curl openjdk-8-jre && \
curl https://bootstrap.pypa.io/get-pip.py | python3 && \
pip3 install --no-cache-dir PyStemmer bblfsh py4j==0.10.4 modelforge parquet jinja2 libMHCUDA datasketch cassandra_driver python-igraph numpy humanize pygments && \
apt-get remove -y python3-dev libxml2-dev libsnappy-dev zlib1g-dev make gcc g++ curl && \
apt-get remove -y python3-dev libxml2-dev libsnappy-dev zlib1g-dev make gcc g++ && \
apt-get remove -y *-doc *-man >/dev/null && \
apt-get autoremove -y && \
apt-get clean && \
Expand Down
94 changes: 51 additions & 43 deletions apollo/hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,37 +99,44 @@ def __call__(self, record):
value=bytearray(wmh[hti * self.band_size:(hti + 1) * self.band_size].data))


def modify_feature_weights(batches, arguments, **kwargs):
log = logging.getLogger("hash")
extractors = {}
for ex in __extractors__.values():
if "%s_weight" % ex.NAME in dir(arguments) and \
getattr(arguments, "%s_weight" % ex.NAME) != 1:
extractors[ex.NAME] = (ex.NAMESPACE, getattr(arguments, "%s_weight" % ex.NAME))

if not extractors:
return batches

err = "You must specify location of docfreq file to modify weights of features"
assert arguments.docfreq is not None, err
assert os.path.isfile(arguments.docfreq), "docfreq should be a file"

model = OrderedDocumentFrequencies().load(arguments.docfreq)
feature_mapping = model.order

voc_size = batches[0].matrix.shape[-1]
weights = numpy.ones((voc_size,))

for ext in extractors:
namespace = extractors[ext][0]
ind = [feature_mapping[k] for k in feature_mapping if k.startswith(namespace)]
weights[ind] = extractors[ext][1]

for batch in batches:
# hack to modify attribute in namedtuple
batch.matrix.data = batch.matrix.multiply(weights).tocsr().data.astype(numpy.float32)

return batches
class Reweighter:
def __init__(self, arguments):
log = logging.getLogger("reweighter")
self.extractors = {}
for ex in __extractors__.values():
if "%s_weight" % ex.NAME in dir(arguments) and \
getattr(arguments, "%s_weight" % ex.NAME) != 1:
self.extractors[ex.NAME] = (ex.NAMESPACE, getattr(arguments,
"%s_weight" % ex.NAME))
if not self.extractors:
log.info("No extractors found, reweighting will be skipped")
return

err = "You must specify location of docfreq file to modify weights of features"
assert arguments.docfreq is not None, err
assert os.path.isfile(arguments.docfreq), "docfreq should be a file"

model = OrderedDocumentFrequencies().load(arguments.docfreq)
self.feature_mapping = model.order

self.weights = numpy.ones((len(self.feature_mapping),))

for ext in self.extractors:
namespace = self.extractors[ext][0]
ind = [self.feature_mapping[k] for k in self.feature_mapping
if k.startswith(namespace)]
self.weights[ind] = self.extractors[ext][1]

def reweight(self, batch):
"""
Modify batch according to weights of extractors.
:param batch: batch from BOWLoader.
:return: batch with modified weights.
"""
if not self.extractors:
return batch
batch.matrix.data = batch.matrix.multiply(self.weights).tocsr().data.astype(numpy.float32)
return batch


def hash_batches(args):
Expand All @@ -153,15 +160,15 @@ def hash_batches(args):
tables = args.tables
gen = voc_size = None
try:
reweighter = Reweighter(args)
for i, bow in enumerate(loader):
if voc_size is None:
voc_size = bow.matrix.shape[-1]
log.info("Initializing the generator")
deferred = os.path.isfile(args.params)
gen = libMHCUDA.minhash_cuda_init(
voc_size, args.size, seed=args.seed, devices=args.devices,
verbosity=args.mhc_verbosity,
deferred=deferred)
verbosity=args.mhc_verbosity, deferred=deferred)
if deferred:
model = WeightedMinHashParameters().load(args.params)
libMHCUDA.minhash_cuda_assign_vars(gen, model.rs, model.ln_cs, model.betas)
Expand All @@ -173,15 +180,15 @@ def hash_batches(args):
raise ValueError("The vocabulary sizes do not match: %d != %d"
% (bow.matrix.shape[-1], voc_size))
log.info("Processing batch %d / %d", i + 1, len(loader))
# Modify features if needed
# TODO(vmarkovtsev): port to the new structure
# batches = modify_feature_weights(batches, args)
reweighter.reweight(bow)
hashes = libMHCUDA.minhash_cuda_calc(gen, bow.matrix)
job = [(k, h) for k, h in zip(bow.documents, hashes)]
log.info("Saving the hashtables")
df = spark.parallelize(job).flatMap(HashExploder(htnum, band_size)) \
.coalesce(args.partitions, args.shuffle) \
.toDF()
df = spark.parallelize(job).flatMap(HashExploder(htnum, band_size))
if args.partitions is not None:
df = df.coalesce(args.partitions, args.shuffle)
df = df.toDF()

df.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
Expand All @@ -193,10 +200,11 @@ def hash_batches(args):
.options(table=tables["hashtables2"], keyspace=args.keyspace) \
.save()
log.info("Saving the hashes")
spark.parallelize(job) \
.map(lambda x: Row(sha1=x[0], value=bytearray(x[1].data))) \
.coalesce(args.partitions, args.shuffle) \
.toDF() \
to_save = spark.parallelize(job) \
.map(lambda x: Row(sha1=x[0], value=bytearray(x[1].data)))
if args.partitions is not None:
to_save = to_save.coalesce(args.partitions, args.shuffle)
to_save.toDF() \
.write \
.format("org.apache.spark.sql.cassandra") \
.mode("append") \
Expand Down