Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 67 additions & 36 deletions sc2/bot_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def main_base_ramp(self) -> "Ramp":
ParaSite map has 5 upper points, and most other maps have 2 upper points at the main ramp. The map Acolyte has 4 upper points at the wrong ramp (which is closest to the start position) """
self.cached_main_base_ramp = min(
(ramp for ramp in self.game_info.map_ramps if len(ramp.upper) in {2, 5}),
key=lambda r: self.start_location._distance_squared(r.top_center),
key=lambda r: self.start_location.distance_to(r.top_center),
)
return self.cached_main_base_ramp

Expand All @@ -148,7 +148,7 @@ def expansion_locations(self) -> Dict[Point2, Units]:
# any resource in a group is closer than 6 to any resource of another group

# Distance we group resources by
RESOURCE_SPREAD_THRESHOLD = 36
RESOURCE_SPREAD_THRESHOLD = 8.5
minerals = self.state.mineral_field
geysers = self.state.vespene_geyser
all_resources = minerals | geysers
Expand All @@ -164,7 +164,7 @@ def expansion_locations(self) -> Dict[Point2, Units]:
for group_a, group_b in itertools.combinations(resource_groups, 2):
# Check if any pair of resource of these groups is closer than threshold together
if any(
resource_a.position._distance_squared(resource_b.position) <= RESOURCE_SPREAD_THRESHOLD
resource_a.distance_to(resource_b) <= RESOURCE_SPREAD_THRESHOLD
for resource_a, resource_b in itertools.product(group_a, group_b)
):
# Remove the single groups and add the merged group
Expand All @@ -179,7 +179,7 @@ def expansion_locations(self) -> Dict[Point2, Units]:
(x, y)
for x in range(-offset_range, offset_range + 1)
for y in range(-offset_range, offset_range + 1)
if 49 >= x ** 2 + y ** 2 >= 16
if 4 <= math.hypot(x, y) <= 7
]
# Dict we want to return
centers = {}
Expand All @@ -199,17 +199,11 @@ def expansion_locations(self) -> Dict[Point2, Units]:
# Check if point can be built on
if self._game_info.placement_grid[point.rounded] != 0
# Check if all resources have enough space to point
and all(
point._distance_squared(resource.position) >= (49 if resource in geysers else 36)
for resource in resources
)
and all(point.distance_to(resource) >= (7 if resource in geysers else 6) for resource in resources)
)
# Choose best fitting point
# TODO can we improve this by calculating the distance only one time?
result = min(
possible_points,
key=lambda point: sum(point._distance_squared(resource.position) for resource in resources),
)
result = min(possible_points, key=lambda point: sum(point.distance_to(resource) for resource in resources))
centers[result] = resources
return centers

Expand Down Expand Up @@ -271,7 +265,7 @@ async def get_next_expansion(self) -> Optional[Point2]:
for el in self.expansion_locations:

def is_near_to_expansion(t):
return t.position._distance_squared(el) < self.EXPANSION_GAP_THRESHOLD ** 2
return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD

if any(map(is_near_to_expansion, self.townhalls)):
# already taken
Expand All @@ -288,16 +282,27 @@ def is_near_to_expansion(t):

return closest

async def distribute_workers(self):
async def distribute_workers(self, resource_ratio: float = 2):
"""
Distributes workers across all the bases taken.
Keyword `resource_ratio` takes a float. If the current minerals to gas
ratio is bigger than `resource_ratio`, this function prefer filling geysers
first, if it is lower, it will prefer sending workers to minerals first.
This is only for workers that need to be moved anyways, it will NOT will
geysers on its own.

NOTE: This function is far from optimal, if you really want to have
refined worker control, you should write your own distribution function.
For example long distance mining control and moving workers if a base was killed
are not being handled.

WARNING: This is quite slow when there are lots of workers or multiple bases.
"""
if not self.state.mineral_field or not self.workers or not self.owned_expansions.ready:
if not self.state.mineral_field or not self.workers or not self.townhalls.ready:
return
actions = []
worker_pool = [worker for worker in self.workers.idle]
bases = self.owned_expansions.ready
bases = self.townhalls.ready
geysers = self.geysers.ready

# list of places that need more workers
Expand All @@ -308,53 +313,79 @@ async def distribute_workers(self):
# perfect amount of workers, skip mining place
if not difference:
continue
if mining_place.has_vespene:
if mining_place.is_vespene_geyser:
# get all workers that target the gas extraction site
# or are on their way back from it
local_workers = self.workers.filter(
lambda unit: unit.order_target == mining_place.tag
or (unit.is_carrying_vespene and unit.order_target == bases.closest_to(mining_place).tag)
)
else:
# get minerals around expansion
local_minerals = self.expansion_locations[mining_place.position].filter(
lambda resource: resource.has_minerals
)
# get tags of minerals around expansion
local_minerals_tags = {
mineral.tag for mineral in self.state.mineral_field if mineral.distance_to(mining_place) <= 8
}
# get all target tags a worker can have
# tags of the minerals he could mine at that base
# get workers that work at that gather site
local_workers = self.workers.filter(
lambda unit: unit.order_target in local_minerals.tags
lambda unit: unit.order_target in local_minerals_tags
or (unit.is_carrying_minerals and unit.order_target == mining_place.tag)
)
# too many workers
if difference > 0:
worker_pool.append(local_workers[:difference])
for worker in local_workers[:difference]:
worker_pool.append(worker)
# too few workers
# add mining place to deficit bases for every missing worker
else:
deficit_mining_places += [mining_place for _ in range(-difference)]

# prepare all minerals near a base if we have too many workers
# and need to send them to the closest patch
if len(worker_pool) > len(deficit_mining_places):
all_minerals_near_base = [
mineral
for mineral in self.state.mineral_field
if any(mineral.distance_to(base) <= 8 for base in self.townhalls.ready)
]
# distribute every worker in the pool
for worker in worker_pool:
# as long as have workers and mining places
if deficit_mining_places:
# remove current place from the list for next loop
current_place = deficit_mining_places.pop(0)
# choose only mineral fields first if current mineral to gas ratio is less than target ratio
if self.vespene and self.minerals / self.vespene < resource_ratio:
possible_mining_places = [place for place in deficit_mining_places if not place.vespene_contents]
# else prefer gas
else:
possible_mining_places = [place for place in deficit_mining_places if place.vespene_contents]
# if preferred type is not available any more, get all other places
if not possible_mining_places:
possible_mining_places = deficit_mining_places
# find closest mining place
current_place = min(deficit_mining_places, key=lambda place: place.distance_to(worker))
# remove it from the list
deficit_mining_places.remove(current_place)
# if current place is a gas extraction site, go there
if current_place.has_vespene:
if current_place.vespene_contents:
actions.append(worker.gather(current_place))
# if current place is a gas extraction site,
# go to the mineral field that is near and has the most minerals left
else:
local_minerals = self.expansion_locations[current_place.position].filter(
lambda resource: resource.has_minerals
)
local_minerals = [
mineral for mineral in self.state.mineral_field if mineral.distance_to(current_place) <= 8
]
target_mineral = max(local_minerals, key=lambda mineral: mineral.mineral_contents)
actions.append(worker.gather(target_mineral))
# more workers to distribute than free mining spots
# else:
# pass
# send to closest if worker is doing nothing
elif worker.is_idle and all_minerals_near_base:
target_mineral = min(all_minerals_near_base, key=lambda mineral: mineral.distance_to(worker))
actions.append(worker.gather(target_mineral))
else:
# there are no deficit mining places and worker is not idle
# so dont move him
pass

await self.do_actions(actions)

Expand All @@ -366,7 +397,7 @@ def owned_expansions(self) -> Dict[Point2, Unit]:
for el in self.expansion_locations:

def is_near_to_expansion(t):
return t.position._distance_squared(el) < self.EXPANSION_GAP_THRESHOLD ** 2
return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD

th = next((x for x in self.townhalls if is_near_to_expansion(x)), None)
if th:
Expand Down Expand Up @@ -425,21 +456,21 @@ async def can_cast(
ability_target == 1
or ability_target == Target.PointOrNone.value
and isinstance(target, (Point2, Point3))
and unit.position._distance_squared(target.position) <= cast_range ** 2
and unit.distance_to(target) <= cast_range
): # cant replace 1 with "Target.None.value" because ".None" doesnt seem to be a valid enum name
return True
# Check if able to use ability on a unit
elif (
ability_target in {Target.Unit.value, Target.PointOrUnit.value}
and isinstance(target, Unit)
and unit.position._distance_squared(target.position) <= cast_range ** 2
and unit.distance_to(target) <= cast_range
):
return True
# Check if able to use ability on a position
elif (
ability_target in {Target.Point.value, Target.PointOrUnit.value}
and isinstance(target, (Point2, Point3))
and unit.position._distance_squared(target) <= cast_range ** 2
and unit.distance_to(target) <= cast_range
):
return True
return False
Expand Down Expand Up @@ -514,7 +545,7 @@ async def find_placement(
if random_alternative:
return random.choice(possible)
else:
return min(possible, key=lambda p: p._distance_squared(near))
return min(possible, key=lambda p: p.distance_to_point2(near))
return None

def already_pending_upgrade(self, upgrade_type: UpgradeId) -> Union[int, float]:
Expand Down
2 changes: 1 addition & 1 deletion sc2/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def property_cache_once_per_frame(f):
then clears it if it is accessed in a different game loop.
Only works on properties of the bot object, because it requires
access to self.state.game_loop """

@wraps(f)
def inner(self):
property_cache = "_cache_" + f.__name__
Expand Down
10 changes: 5 additions & 5 deletions sc2/game_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def upper2_for_ramp_wall(self) -> Set[Point2]:
return set() # HACK: makes this work for now
# FIXME: please do

upper2 = sorted(list(self.upper), key=lambda x: x._distance_squared(self.bottom_center), reverse=True)
upper2 = sorted(list(self.upper), key=lambda x: x.distance_to_point2(self.bottom_center), reverse=True)
while len(upper2) > 2:
upper2.pop()
return set(upper2)
Expand Down Expand Up @@ -99,7 +99,7 @@ def barracks_in_middle(self) -> Point2:
# Offset from top point to barracks center is (2, 1)
intersects = p1.circle_intersection(p2, 5 ** 0.5)
anyLowerPoint = next(iter(self.lower))
return max(intersects, key=lambda p: p._distance_squared(anyLowerPoint))
return max(intersects, key=lambda p: p.distance_to_point2(anyLowerPoint))
raise Exception("Not implemented. Trying to access a ramp that has a wrong amount of upper points.")

@property_immutable_cache
Expand All @@ -112,7 +112,7 @@ def depot_in_middle(self) -> Point2:
# Offset from top point to depot center is (1.5, 0.5)
intersects = p1.circle_intersection(p2, 2.5 ** 0.5)
anyLowerPoint = next(iter(self.lower))
return max(intersects, key=lambda p: p._distance_squared(anyLowerPoint))
return max(intersects, key=lambda p: p.distance_to_point2(anyLowerPoint))
raise Exception("Not implemented. Trying to access a ramp that has a wrong amount of upper points.")

@property_mutable_cache
Expand All @@ -122,7 +122,7 @@ def corner_depots(self) -> Set[Point2]:
points = self.upper2_for_ramp_wall
p1 = points.pop().offset((self.x_offset, self.y_offset)) # still an error with pixelmap?
p2 = points.pop().offset((self.x_offset, self.y_offset))
center = p1.towards(p2, p1.distance_to(p2) / 2)
center = p1.towards(p2, p1.distance_to_point2(p2) / 2)
depotPosition = self.depot_in_middle
# Offset from middle depot to corner depots is (2, 1)
intersects = center.circle_intersection(depotPosition, 5 ** 0.5)
Expand Down Expand Up @@ -224,7 +224,7 @@ def paint(pt: Point2) -> None:
if picture[py][px] != NOT_COLORED_YET:
continue
point: Point2 = Point2((px, py))
remaining.remove(point)
remaining.discard(point)
paint(point)
queue.append(point)
currentGroup.add(point)
Expand Down
51 changes: 26 additions & 25 deletions sc2/game_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .position import Point2, Point3
from .power_source import PsionicMatrix
from .score import ScoreDetails
from .unit import Unit
from .units import Units


Expand Down Expand Up @@ -124,53 +125,53 @@ def __init__(self, response_observation):
# https://github.com/Blizzard/s2client-proto/blob/33f0ecf615aa06ca845ffe4739ef3133f37265a9/s2clientprotocol/score.proto#L31
self.score: ScoreDetails = ScoreDetails(self.observation.score)
self.abilities = self.observation.abilities # abilities of selected units
# Fix for enemy units detected by my sensor tower, as blips have less unit information than normal visible units
blipUnits, minerals, geysers, destructables, enemy, own, watchtowers = ([] for _ in range(7))

self._blipUnits = []
self.own_units: Units = Units([])
self.enemy_units: Units = Units([])
self.mineral_field: Units = Units([])
self.vespene_geyser: Units = Units([])
self.resources: Units = Units([])
self.destructables: Units = Units([])
self.watchtowers: Units = Units([])
self.units: Units = Units([])

for unit in self.observation_raw.units:
if unit.is_blip:
blipUnits.append(unit)
self._blipUnits.append(unit)
else:
unit_obj = Unit(unit)
self.units.append(unit_obj)
alliance = unit.alliance
# Alliance.Neutral.value = 3
if alliance == 3:
unit_type = unit.unit_type
# XELNAGATOWER = 149
if unit_type == 149:
watchtowers.append(unit)
# all destructable rocks except the one below the main base ramps
elif unit.radius > 1.5:
destructables.append(unit)
self.watchtowers.append(unit_obj)
# mineral field enums
elif unit_type in mineral_ids:
minerals.append(unit)
self.mineral_field.append(unit_obj)
self.resources.append(unit_obj)
# geyser enums
elif unit_type in geyser_ids:
geysers.append(unit)
self.vespene_geyser.append(unit_obj)
self.resources.append(unit_obj)
# all destructable rocks
else:
self.destructables.append(unit_obj)
# Alliance.Self.value = 1
elif alliance == 1:
own.append(unit)
self.own_units.append(unit_obj)
# Alliance.Enemy.value = 4
elif alliance == 4:
enemy.append(unit)

resources = minerals + geysers
visible_units = resources + destructables + enemy + own + watchtowers

self.own_units: Units = Units.from_proto(own)
self.enemy_units: Units = Units.from_proto(enemy)
self.mineral_field: Units = Units.from_proto(minerals)
self.vespene_geyser: Units = Units.from_proto(geysers)
self.resources: Units = Units.from_proto(resources)
self.destructables: Units = Units.from_proto(destructables)
self.watchtowers: Units = Units.from_proto(watchtowers)
self.units: Units = Units.from_proto(visible_units)
self.enemy_units.append(unit_obj)
self.upgrades: Set[UpgradeId] = {UpgradeId(upgrade) for upgrade in self.observation_raw.player.upgrade_ids}

# Set of unit tags that died this step
self.dead_units: Set[int] = {dead_unit_tag for dead_unit_tag in self.observation_raw.event.dead_units}

self.blips: Set[Blip] = {Blip(unit) for unit in blipUnits}
# Set of enemy units detected by own sensor tower, as blips have less unit information than normal visible units
self.blips: Set[Blip] = {Blip(unit) for unit in self._blipUnits}
# self.visibility[point]: 0=Hidden, 1=Fogged, 2=Visible
self.visibility: PixelMap = PixelMap(self.observation_raw.map_state.visibility, mirrored=True)
# self.creep[point]: 0=No creep, 1=creep
Expand Down
Loading