Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -864,24 +864,26 @@ def status(format, type)
puts(format('%<servers>d live servers', servers: status.getServersSize))
status.getServers.each do |server_status|
sl = status.getLoad(server_status)
r_sink_string = ' SINK:'
r_sink_string = ' SINK:'
r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil?

if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
# If we have applied no operations since we've started replication,
# assume that we're not acting as a sink and don't print the normal information
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", Waiting for OPs... "
r_sink_string << "\n TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ",\n Waiting for OPs... "
else
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
r_sink_string << "\n TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ",\n AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ",\n TimeStampsOfLastAppliedOp=" +
r_load_sink.getTimestampsOfLastAppliedOp().to_s
end

r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string)

puts(format(' %<host>s:', host: server_status.getHostname))
if type.casecmp('SOURCE').zero?
puts(format('%<source>s', source: r_source_string))
Expand Down Expand Up @@ -920,19 +922,20 @@ def status(format, type)

def build_source_string(r_load_source_map, r_source_string)
r_load_source_map.each do |peer, sources|
r_source_string << ' PeerID=' + peer
r_source_string << "\n PeerID=" + peer
sources.each do |source_load|
build_queue_title(source_load, r_source_string)
build_running_source_stats(source_load, r_source_string)
r_source_string << "\n"
end
end
end

def build_queue_title(source_load, r_source_string)
r_source_string << if source_load.isRecovered
"\n Recovered Queue: "
",\n Queue(Recovered)="
else
"\n Normal Queue: "
",\n Queue(Normal)="
end
r_source_string << source_load.getQueueId
end
Expand All @@ -941,45 +944,43 @@ def build_running_source_stats(source_load, r_source_string)
if source_load.isRunning
build_shipped_stats(source_load, r_source_string)
build_load_general_stats(source_load, r_source_string)
r_source_string << ', Replication Lag=' +
r_source_string << ",\n ReplicationLag=" +
source_load.getReplicationLag.to_s
else
r_source_string << "\n "
r_source_string << ",\n IsRunning=false, "
r_source_string << 'No Reader/Shipper threads runnning yet.'
end
end

def build_shipped_stats(source_load, r_source_string)
r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
"\n " \
",\n TimeStampOfLastShippedOp=0, " \
'No Ops shipped since last restart'
else
"\n AgeOfLastShippedOp=" +
",\n AgeOfLastShippedOp=" +
source_load.getAgeOfLastShippedOp.to_s +
', TimeStampOfLastShippedOp=' +
java.util.Date.new(source_load
.getTimestampOfLastShippedOp).toString
",\n TimeStampOfLastShippedOp=" +
source_load.getTimestampOfLastShippedOp.to_s
end
end

def build_load_general_stats(source_load, r_source_string)
r_source_string << ', SizeOfLogQueue=' +
r_source_string << ",\n SizeOfLogQueue=" +
source_load.getSizeOfLogQueue.to_s
r_source_string << ', EditsReadFromLogQueue=' +
r_source_string << ",\n EditsReadFromLogQueue=" +
source_load.getEditsRead.to_s
r_source_string << ', OpsShippedToTarget=' +
r_source_string << ",\n OpsShippedToTarget=" +
source_load.getOPsShipped.to_s
build_edits_for_source(source_load, r_source_string)
end

def build_edits_for_source(source_load, r_source_string)
if source_load.hasEditsSinceRestart
r_source_string << ', TimeStampOfNextToReplicate=' +
java.util.Date.new(source_load
.getTimeStampOfNextToReplicate).toString
r_source_string << ",\n TimeStampOfNextToReplicate=" +
source_load.getTimeStampOfNextToReplicate.to_s
else
r_source_string << ', No edits for this source'
r_source_string << ' since it started'
r_source_string << ",\n HasEditsSinceRestart=false, "
r_source_string << 'No edits for this source since it started'
end
end

Expand Down
11 changes: 11 additions & 0 deletions hbase-shell/src/test/ruby/hbase/admin2_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,25 @@ class StatusTest < Test::Unit::TestCase
include HBaseConstants

def setup
@peer_id1 = '1'
@peer_id2 = '2'
@dummy_endpoint = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint'

setup_hbase
# Create test table if it does not exist
@test_name = 'hbase_shell_admin2_test_table'
drop_test_table(@test_name)
create_test_table(@test_name)

cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id1, args)
command(:add_peer, @peer_id2, args)
end

def teardown
command(:remove_peer, @peer_id1)
command(:remove_peer, @peer_id2)
shutdown
end

Expand Down