Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
16f7661
add lazy register for actors
pitr-ch Jul 4, 2014
006149e
Moving lazy register under Concurrent adding docs
pitr-ch Jul 6, 2014
99ef541
Fix Context logging
pitr-ch Jul 6, 2014
99163df
Adding dead letter routing
pitr-ch Jul 6, 2014
8c6eac4
Allow to set custom Reference class
pitr-ch Jul 7, 2014
f5ec2bd
Fix broken Delay#value!
pitr-ch Jul 7, 2014
b9136c5
Add missing dependency of Future on IVar
pitr-ch Jul 7, 2014
7e07ab4
Break up actor core into series of behaviors
pitr-ch Jul 7, 2014
e36f7bb
Add ability to link Actors
pitr-ch Jul 8, 2014
33a19b9
Rename Actress to Actor
pitr-ch Jul 8, 2014
d1a223a
Add Context#default_reference_class
pitr-ch Jul 9, 2014
d3f55da
Update doc
pitr-ch Jul 11, 2014
2b27090
Pass the envelope explicitly next in Context if not recognized
pitr-ch Jul 11, 2014
44884cb
Remove Context#spawn
pitr-ch Jul 11, 2014
2adb509
Do not silently ignore :class option in AContext.spawn
pitr-ch Jul 12, 2014
d88f51d
Add await behavior
pitr-ch Jul 12, 2014
1ba0da9
Doc update
pitr-ch Jul 12, 2014
c3efc2f
Buffer optimization
pitr-ch Jul 12, 2014
838c4dc
Add atomic linking
pitr-ch Jul 12, 2014
3a32d3e
Adding supervision and restarts
pitr-ch Jul 13, 2014
7d3a0fb
Broadcast to linked actors error the cause of pausing
pitr-ch Jul 13, 2014
a62529d
Fix brittle test
pitr-ch Jul 13, 2014
827402b
Tweak logging levels
pitr-ch Jul 13, 2014
85e4fab
Split behaviors into files
pitr-ch Jul 13, 2014
0c903e4
Add blank Context#on_event method
pitr-ch Jul 13, 2014
30a2a8e
Doc update
pitr-ch Jul 13, 2014
2a8e408
Use always behaviour_definition instead just behaviour
pitr-ch Jul 14, 2014
8339106
Rename Supervising to Supervised, Add Supervising
pitr-ch Jul 14, 2014
3cf3be7
Add :restart! directive
pitr-ch Jul 14, 2014
bbd050d
Removing old #terminated? and #terminated methods
pitr-ch Jul 15, 2014
6f57449
Update strategy names of SetResults to correspond with Pausing commands
pitr-ch Jul 16, 2014
7dbc8d8
Add utility actor for broadcasting messages
pitr-ch Jul 19, 2014
ece7f24
doc update
pitr-ch Jul 19, 2014
2693941
Broadcast to filtered_receivers only not all receivers
pitr-ch Jul 19, 2014
b17b70a
Ensure proper synchronization of Actor instances
pitr-ch Jul 19, 2014
4d6a6a6
update benchmark
pitr-ch Jul 29, 2014
8fb209b
Ensure error is risen on failed root initialization
pitr-ch Jul 29, 2014
67a0730
Log actor events
pitr-ch Jul 29, 2014
7d3d154
remove dependency of Pausing on Supervised
pitr-ch Jul 29, 2014
ae0bf3b
Add #tell method to Context for convenience
pitr-ch Jul 29, 2014
96cea54
allow to retrieve default dead_letter_routing instance from Root
pitr-ch Jul 29, 2014
dc87b3d
Broadcast returns true/false
pitr-ch Jul 29, 2014
495e5e6
Allow benchmarking of ThreadLocalVar
pitr-ch Jul 29, 2014
345bd4b
Doc/TODO update
pitr-ch Jul 29, 2014
799cfe6
Merge remote-tracking branch 'upstream/master' into actress
pitr-ch Jul 29, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
require 'benchmark'
require 'concurrent/actress'
Concurrent::Actress.i_know_it_is_experimental!
require 'concurrent/actor'
Concurrent::Actor.i_know_it_is_experimental!

require 'celluloid'
require 'celluloid/autostart'

# require 'stackprof'
# require 'profiler'

logger = Logger.new($stderr)
logger.level = Logger::INFO
Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block|
Expand All @@ -30,26 +34,26 @@ def counting(count, ivar)
ivar.set count
end
end
end
end if defined? Celluloid

threads = []

# Profiler__.start_profile
# StackProf.run(mode: :cpu,
# interval: 10,
# out: File.join(File.dirname(__FILE__), 'stackprof-cpu-myapp.dump')) do
Benchmark.bmbm(10) do |b|
[2, adders_size, adders_size*2, adders_size*3].each do |adders_size|

b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'actress')) do
b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'concurrent')) do
counts = Array.new(counts_size) { [0, Concurrent::IVar.new] }
adders = Array.new(adders_size) do |i|
Concurrent::Actress::AdHoc.spawn("adder#{i}") do
Concurrent::Actor::AdHoc.spawn("adder#{i}") do
lambda do |(count, ivar)|
if count.nil?
terminate!
if count < ADD_TO
adders[(i+1) % adders_size].tell [count+1, ivar]
else
if count < ADD_TO
adders[(i+1) % adders_size].tell [count+1, ivar]
else
ivar.set count
end
ivar.set count
end
end
end
Expand All @@ -65,32 +69,38 @@ def counting(count, ivar)

threads << Thread.list.size

adders.each { |a| a << [nil, nil] }
adders.each { |a| a << :terminate! }
end

b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'celluloid')) do
counts = []
counts_size.times { counts << [0, Concurrent::IVar.new] }
if defined? Celluloid
b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'celluloid')) do
counts = []
counts_size.times { counts << [0, Concurrent::IVar.new] }

adders = []
adders_size.times do |i|
adders << Counter.new(adders, i)
end
adders = []
adders_size.times do |i|
adders << Counter.new(adders, i)
end

counts.each_with_index do |count, i|
adders[i % adders_size].counting *count
end
counts.each_with_index do |count, i|
adders[i % adders_size].counting *count
end

counts.each do |count, ivar|
raise unless ivar.value >= ADD_TO
end
counts.each do |count, ivar|
raise unless ivar.value >= ADD_TO
end

threads << Thread.list.size
threads << Thread.list.size

adders.each(&:terminate)
adders.each(&:terminate)
end
end
end
end
# end
# Profiler__.stop_profile

p threads

# Profiler__.print_profile $stdout

File renamed without changes.
3 changes: 3 additions & 0 deletions doc/actor/examples.out.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@



File renamed without changes.
2 changes: 2 additions & 0 deletions doc/actor/init.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
require 'concurrent/actor'
Concurrent::Actor.i_know_it_is_experimental!
133 changes: 133 additions & 0 deletions doc/actor/main.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Actor model

- Light-weighted.
- Inspired by Akka and Erlang.
- Modular.

Actors are sharing a thread-pool by default which makes them very cheap to create and discard.
Thousands of actors can be created, allowing you to break the program into small maintainable pieces,
without violating the single responsibility principle.

## What is an actor model?

[Wiki](http://en.wikipedia.org/wiki/Actor_model) says:
The actor model in computer science is a mathematical model of concurrent computation
that treats _actors_ as the universal primitives of concurrent digital computation:
in response to a message that it receives, an actor can make local decisions,
create more actors, send more messages, and determine how to respond to the next
message received.

## Why?

Concurrency is hard this is one of many ways how to simplify the problem.
It is simpler to reason about actors than about locks (and all their possible states).

## How to use it

{include:file:doc/actor/quick.out.rb}

## Messaging

Messages are processed in same order as they are sent by a sender. It may interleaved with
messages form other senders though. There is also a contract in actor model that
messages sent between actors should be immutable. Gems like

- [Algebrick](https://github.com/pitr-ch/algebrick) - Typed struct on steroids based on
algebraic types and pattern matching
- [Hamster](https://github.com/hamstergem/hamster) - Efficient, Immutable, Thread-Safe
Collection classes for Ruby

are very useful.

### Dead letter routing

see {AbstractContext#dead_letter_routing} description:

> {include:Actor::AbstractContext#dead_letter_routing}

## Architecture

Actors are running on shared thread poll which allows user to create many actors cheaply.
Downside is that these actors cannot be directly used to do IO or other blocking operations.
Blocking operations could starve the `default_task_pool`. However there are two options:

- Create an regular actor which will schedule blocking operations in `global_operation_pool`
(which is intended for blocking operations) sending results back to self in messages.
- Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g.
`AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`.

Each actor is composed from 4 parts:

### {Reference}
{include:Actor::Reference}

### {Core}
{include:Actor::Core}

### {AbstractContext}
{include:Actor::AbstractContext}

### {Behaviour}
{include:Actor::Behaviour}

## Speed

Simple benchmark Actor vs Celluloid, the numbers are looking good
but you know how it is with benchmarks. Source code is in
`examples/actor/celluloid_benchmark.rb`. It sends numbers between x actors
and adding 1 until certain limit is reached.

Benchmark legend:

- mes. - number of messages send between the actors
- act. - number of actors exchanging the messages
- impl. - which gem is used

### JRUBY

Rehearsal --------------------------------------------------------
50000 2 concurrent 24.110000 0.800000 24.910000 ( 7.728000)
50000 2 celluloid 28.510000 4.780000 33.290000 ( 14.782000)
50000 500 concurrent 13.700000 0.280000 13.980000 ( 4.307000)
50000 500 celluloid 14.520000 11.740000 26.260000 ( 12.258000)
50000 1000 concurrent 10.890000 0.220000 11.110000 ( 3.760000)
50000 1000 celluloid 15.600000 21.690000 37.290000 ( 18.512000)
50000 1500 concurrent 10.580000 0.270000 10.850000 ( 3.646000)
50000 1500 celluloid 14.490000 29.790000 44.280000 ( 26.043000)
--------------------------------------------- total: 201.970000sec

mes. act. impl. user system total real
50000 2 concurrent 9.820000 0.510000 10.330000 ( 5.735000)
50000 2 celluloid 10.390000 4.030000 14.420000 ( 7.494000)
50000 500 concurrent 9.880000 0.200000 10.080000 ( 3.310000)
50000 500 celluloid 12.430000 11.310000 23.740000 ( 11.727000)
50000 1000 concurrent 10.590000 0.190000 10.780000 ( 4.029000)
50000 1000 celluloid 14.950000 23.260000 38.210000 ( 20.841000)
50000 1500 concurrent 10.710000 0.250000 10.960000 ( 3.892000)
50000 1500 celluloid 13.280000 30.030000 43.310000 ( 24.620000) (1)

### MRI 2.1.0

Rehearsal --------------------------------------------------------
50000 2 concurrent 4.640000 0.080000 4.720000 ( 4.852390)
50000 2 celluloid 6.110000 2.300000 8.410000 ( 7.898069)
50000 500 concurrent 6.260000 2.210000 8.470000 ( 7.400573)
50000 500 celluloid 10.250000 4.930000 15.180000 ( 14.174329)
50000 1000 concurrent 6.300000 1.860000 8.160000 ( 7.303162)
50000 1000 celluloid 12.300000 7.090000 19.390000 ( 17.962621)
50000 1500 concurrent 7.410000 2.610000 10.020000 ( 8.887396)
50000 1500 celluloid 14.850000 10.690000 25.540000 ( 24.489796)
---------------------------------------------- total: 99.890000sec

mes. act. impl. user system total real
50000 2 concurrent 4.190000 0.070000 4.260000 ( 4.306386)
50000 2 celluloid 6.490000 2.210000 8.700000 ( 8.280051)
50000 500 concurrent 7.060000 2.520000 9.580000 ( 8.518707)
50000 500 celluloid 10.550000 4.980000 15.530000 ( 14.699962)
50000 1000 concurrent 6.440000 1.870000 8.310000 ( 7.571059)
50000 1000 celluloid 12.340000 7.510000 19.850000 ( 18.793591)
50000 1500 concurrent 6.720000 2.160000 8.880000 ( 7.929630)
50000 1500 celluloid 14.140000 10.130000 24.270000 ( 22.775288) (1)

*Note (1):* Celluloid is using thread per actor so this bench is creating about 1500
native threads. Actor is using constant number of threads.
26 changes: 8 additions & 18 deletions doc/actress/quick.in.rb → doc/actor/quick.in.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
class Counter
class Counter < Concurrent::Actor::Context
# Include context of an actor which gives this class access to reference and other information
# about the actor, see CoreDelegations.
include Concurrent::Actress::Context
# about the actor, see PublicDelegations.

# use initialize as you wish
def initialize(initial_value)
Expand All @@ -10,16 +9,11 @@ def initialize(initial_value)

# override on_message to define actor's behaviour
def on_message(message)
case message
when Integer
if Integer === message
@count += message
when :terminate
terminate!
else
raise 'unknown'
end
end
end
end #

# Create new actor naming the instance 'first'.
# Return value is a reference to the actor, the actual actor is never returned.
Expand All @@ -35,7 +29,7 @@ def on_message(message)
counter.ask(0).value

# Terminate the actor.
counter.tell(:terminate)
counter.tell(:terminate!)
# Not terminated yet, it takes a while until the message is processed.
counter.terminated?
# Waiting for the termination.
Expand All @@ -52,22 +46,18 @@ def on_message(message)


# Lets define an actor creating children actors.
class Node
include Concurrent::Actress::Context

class Node < Concurrent::Actor::Context
def on_message(message)
case message
when :new_child
spawn self.class, :child
Node.spawn :child
when :how_many_children
children.size
when :terminate
terminate!
else
raise 'unknown'
end
end
end
end #

# Actors are tracking parent-child relationships
parent = Node.spawn :parent
Expand Down
Loading