Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,19 @@ For CSV functionality:
*/


===============================================================================
For dev/sparktestsupport/toposort.py:

Copyright 2014 True Blade Systems, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
25 changes: 15 additions & 10 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from sparktestsupport import SPARK_HOME, USER_HOME, ERROR_CODES
from sparktestsupport.shellutils import exit_from_command_with_retcode, run_cmd, rm_r, which
from sparktestsupport.toposort import toposort_flatten, toposort
import sparktestsupport.modules as modules


Expand All @@ -43,7 +44,7 @@ def determine_modules_for_files(filenames):
If a file is not associated with a more specific submodule, then this method will consider that
file to belong to the 'root' module.

>>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/test/foo"]))
>>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/core/foo"]))
['pyspark-core', 'sql']
>>> [x.name for x in determine_modules_for_files(["file_not_matched_by_any_subproject"])]
['root']
Expand Down Expand Up @@ -99,14 +100,16 @@ def determine_modules_to_test(changed_modules):
Given a set of modules that have changed, compute the transitive closure of those modules'
dependent modules in order to determine the set of modules that should be tested.

>>> sorted(x.name for x in determine_modules_to_test([modules.root]))
Returns a topologically-sorted list of modules (ties are broken by sorting on module names).

>>> [x.name for x in determine_modules_to_test([modules.root])]
['root']
>>> sorted(x.name for x in determine_modules_to_test([modules.graphx]))
['examples', 'graphx']
>>> x = sorted(x.name for x in determine_modules_to_test([modules.sql]))
>>> [x.name for x in determine_modules_to_test([modules.graphx])]
['graphx', 'examples']
>>> x = [x.name for x in determine_modules_to_test([modules.sql])]
>>> x # doctest: +NORMALIZE_WHITESPACE
['examples', 'hive-thriftserver', 'mllib', 'pyspark-ml', \
'pyspark-mllib', 'pyspark-sql', 'sparkr', 'sql']
['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver', 'pyspark-sql', 'sparkr',
'pyspark-mllib', 'pyspark-ml']
"""
# If we're going to have to run all of the tests, then we can just short-circuit
# and return 'root'. No module depends on root, so if it appears then it will be
Expand All @@ -116,7 +119,9 @@ def determine_modules_to_test(changed_modules):
modules_to_test = set()
for module in changed_modules:
modules_to_test = modules_to_test.union(determine_modules_to_test(module.dependent_modules))
return modules_to_test.union(set(changed_modules))
modules_to_test = modules_to_test.union(set(changed_modules))
return toposort_flatten(
{m: set(m.dependencies).intersection(modules_to_test) for m in modules_to_test}, sort=True)


def determine_tags_to_exclude(changed_modules):
Expand Down Expand Up @@ -377,12 +382,12 @@ def run_scala_tests_maven(test_profiles):

def run_scala_tests_sbt(test_modules, test_profiles):

sbt_test_goals = set(itertools.chain.from_iterable(m.sbt_test_goals for m in test_modules))
sbt_test_goals = list(itertools.chain.from_iterable(m.sbt_test_goals for m in test_modules))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK there aren't any duplicates here, so we might as well maintain the sorted module order.


if not sbt_test_goals:
return

profiles_and_goals = test_profiles + list(sbt_test_goals)
profiles_and_goals = test_profiles + sbt_test_goals

print("[info] Running Spark tests using SBT with these arguments: ",
" ".join(profiles_and_goals))
Expand Down
54 changes: 46 additions & 8 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# limitations under the License.
#

from functools import total_ordering
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only available in Python 2.7+, but that might be fine: AMPLab Jenkins runs a compatible Python version and I figure that people have control over their dev environments and can upgrade if necessary.

import itertools
import re

all_modules = []


@total_ordering
class Module(object):
"""
A module is the basic abstraction in our test runner script. Each module consists of a set of
Expand Down Expand Up @@ -75,20 +77,56 @@ def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=
def contains_file(self, filename):
return any(re.match(p, filename) for p in self.source_file_prefixes)

def __repr__(self):
return "Module<%s>" % self.name

def __lt__(self, other):
return self.name < other.name

def __eq__(self, other):
return self.name == other.name

def __ne__(self, other):
return not (self.name == other.name)

def __hash__(self):
return hash(self.name)


catalyst = Module(
name="catalyst",
dependencies=[],
source_file_regexes=[
"sql/catalyst/",
],
sbt_test_goals=[
"catalyst/test",
],
)


sql = Module(
name="sql",
dependencies=[],
dependencies=[catalyst],
source_file_regexes=[
"sql/(?!hive-thriftserver)",
"sql/core/",
],
sbt_test_goals=[
"sql/test",
],
)

hive = Module(
name="hive",
dependencies=[sql],
source_file_regexes=[
"sql/hive/",
"bin/spark-sql",
],
build_profile_flags=[
"-Phive",
],
sbt_test_goals=[
"catalyst/test",
"sql/test",
"hive/test",
],
test_tags=[
Expand All @@ -99,7 +137,7 @@ def contains_file(self, filename):

hive_thriftserver = Module(
name="hive-thriftserver",
dependencies=[sql],
dependencies=[hive],
source_file_regexes=[
"sql/hive-thriftserver",
"sbin/start-thriftserver.sh",
Expand Down Expand Up @@ -282,7 +320,7 @@ def contains_file(self, filename):

examples = Module(
name="examples",
dependencies=[graphx, mllib, streaming, sql],
dependencies=[graphx, mllib, streaming, hive],
source_file_regexes=[
"examples/",
],
Expand Down Expand Up @@ -314,7 +352,7 @@ def contains_file(self, filename):

pyspark_sql = Module(
name="pyspark-sql",
dependencies=[pyspark_core, sql],
dependencies=[pyspark_core, hive],
source_file_regexes=[
"python/pyspark/sql"
],
Expand Down Expand Up @@ -404,7 +442,7 @@ def contains_file(self, filename):

sparkr = Module(
name="sparkr",
dependencies=[sql, mllib],
dependencies=[hive, mllib],
source_file_regexes=[
"R/",
],
Expand Down
85 changes: 85 additions & 0 deletions dev/sparktestsupport/toposort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#######################################################################
# Implements a topological sort algorithm.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen, I didn't want to write my own sort so I just copied this one from the toposort library. Do I need to update the NOTICE file in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping again. @srowen @pwendell, can you comment on the NOTICE file considerations here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I missed this. It looks like it's Apache licensed and you have correctly preserved the header. https://bitbucket.org/ericvsmith/toposort/src/25b5894c4229cb888f77cf0c077c05e2464446ac/LICENSE.txt?fileviewer=file-view-default

You'll need to put the NOTICE contents in our NOTICE:
https://bitbucket.org/ericvsmith/toposort/src/25b5894c4229cb888f77cf0c077c05e2464446ac/NOTICE?fileviewer=file-view-default

That should be all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I just append the entire contents of the notice verbatim? Is there a shorthand way of doing this (e.g. just including just the copyright)? I got a bit confused looking at the existing NOTICE file and since it hasn't been updated in a while I didn't spot any smaller patches / diffs to model my change on.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes just append. I don't know if we've got a strong format going, except to try to precede the text from Foo with a line like "Foo" or "For Foo:". Anything sane should be OK.

Technically you reproduce all relevant parts of the NOTICE and only relevant parts. All is relevant here. Really, not sure the project needed this file but hey. Practically you could argue you can abbreviate it, but given it's short, I'd just copy it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, updated the NOTICE to append it verbatim.

#
# Copyright 2014 True Blade Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Notes:
# Based on http://code.activestate.com/recipes/578272-topological-sort
# with these major changes:
# Added unittests.
# Deleted doctests (maybe not the best idea in the world, but it cleans
# up the docstring).
# Moved functools import to the top of the file.
# Changed assert to a ValueError.
# Changed iter[items|keys] to [items|keys], for python 3
# compatibility. I don't think it matters for python 2 these are
# now lists instead of iterables.
# Copy the input so as to leave it unmodified.
# Renamed function from toposort2 to toposort.
# Handle empty input.
# Switch tests to use set literals.
#
########################################################################

from functools import reduce as _reduce


__all__ = ['toposort', 'toposort_flatten']


def toposort(data):
"""Dependencies are expressed as a dictionary whose keys are items
and whose values are a set of dependent items. Output is a list of
sets in topological order. The first set consists of items with no
dependences, each subsequent set consists of items that depend upon
items in the preceeding sets.
"""

# Special case empty input.
if len(data) == 0:
return

# Copy the input so as to leave it unmodified.
data = data.copy()

# Ignore self dependencies.
for k, v in data.items():
v.discard(k)
# Find all items that don't depend on anything.
extra_items_in_deps = _reduce(set.union, data.values()) - set(data.keys())
# Add empty dependences where needed.
data.update({item: set() for item in extra_items_in_deps})
while True:
ordered = set(item for item, dep in data.items() if len(dep) == 0)
if not ordered:
break
yield ordered
data = {item: (dep - ordered)
for item, dep in data.items()
if item not in ordered}
if len(data) != 0:
raise ValueError('Cyclic dependencies exist among these items: {}'.format(
', '.join(repr(x) for x in data.items())))


def toposort_flatten(data, sort=True):
"""Returns a single list of dependencies. For any set returned by
toposort(), those items are sorted and appended to the result (just to
make the results deterministic)."""

result = []
for d in toposort(data):
result.extend((sorted if sort else list)(d))
return result