|  | 
| 1 |  | -/** | 
|  | 1 | +/* | 
| 2 | 2 |  * Licensed to the Apache Software Foundation (ASF) under one | 
| 3 | 3 |  * or more contributor license agreements.  See the NOTICE file | 
| 4 | 4 |  * distributed with this work for additional information | 
|  | 
| 15 | 15 |  * See the License for the specific language governing permissions and | 
| 16 | 16 |  * limitations under the License. | 
| 17 | 17 |  */ | 
| 18 |  | - | 
| 19 | 18 | package org.apache.hadoop.hbase.replication.regionserver; | 
| 20 | 19 | 
 | 
| 21 |  | -import org.apache.hadoop.metrics2.lib.MutableFastCounter; | 
| 22 |  | -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; | 
| 23 |  | -import org.apache.hadoop.metrics2.lib.MutableHistogram; | 
| 24 | 20 | import org.apache.yetus.audience.InterfaceAudience; | 
| 25 | 21 | 
 | 
| 26 | 22 | @InterfaceAudience.Private | 
| 27 |  | -public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{ | 
| 28 |  | -  private static final String KEY_PREFIX = "source."; | 
| 29 |  | - | 
| 30 |  | -  private final MetricsReplicationSourceImpl rms; | 
| 31 |  | - | 
| 32 |  | -  private final MutableHistogram ageOfLastShippedOpHist; | 
| 33 |  | -  private final MutableGaugeLong sizeOfLogQueueGauge; | 
| 34 |  | -  private final MutableFastCounter logReadInEditsCounter; | 
| 35 |  | -  private final MutableFastCounter walEditsFilteredCounter; | 
| 36 |  | -  private final MutableFastCounter shippedBatchesCounter; | 
| 37 |  | -  private final MutableFastCounter shippedOpsCounter; | 
| 38 |  | -  private final MutableFastCounter shippedBytesCounter; | 
| 39 |  | -  private final MutableFastCounter logReadInBytesCounter; | 
| 40 |  | -  private final MutableFastCounter shippedHFilesCounter; | 
| 41 |  | -  private final MutableGaugeLong sizeOfHFileRefsQueueGauge; | 
| 42 |  | -  private final MutableFastCounter unknownFileLengthForClosedWAL; | 
| 43 |  | -  private final MutableFastCounter uncleanlyClosedWAL; | 
| 44 |  | -  private final MutableFastCounter uncleanlyClosedSkippedBytes; | 
| 45 |  | -  private final MutableFastCounter restartWALReading; | 
| 46 |  | -  private final MutableFastCounter repeatedFileBytes; | 
| 47 |  | -  private final MutableFastCounter completedWAL; | 
| 48 |  | -  private final MutableFastCounter completedRecoveryQueue; | 
| 49 |  | -  private final MutableFastCounter failedRecoveryQueue; | 
| 50 |  | - | 
| 51 |  | -  public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { | 
| 52 |  | -    this.rms = rms; | 
| 53 |  | - | 
| 54 |  | -    ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP); | 
| 55 |  | - | 
| 56 |  | -    sizeOfLogQueueGauge = rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); | 
| 57 |  | - | 
| 58 |  | -    shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); | 
| 59 |  | - | 
| 60 |  | -    shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); | 
| 61 |  | - | 
| 62 |  | -    shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); | 
| 63 |  | - | 
| 64 |  | -    logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); | 
| 65 |  | - | 
| 66 |  | -    logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); | 
| 67 |  | - | 
| 68 |  | -    walEditsFilteredCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_EDITS_FILTERED, 0L); | 
| 69 |  | - | 
| 70 |  | -    shippedHFilesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_HFILES, 0L); | 
| 71 |  | - | 
| 72 |  | -    sizeOfHFileRefsQueueGauge = | 
| 73 |  | -        rms.getMetricsRegistry().getGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L); | 
| 74 |  | - | 
| 75 |  | -    unknownFileLengthForClosedWAL = rms.getMetricsRegistry() | 
| 76 |  | -            .getCounter(SOURCE_CLOSED_LOGS_WITH_UNKNOWN_LENGTH, 0L); | 
| 77 |  | -    uncleanlyClosedWAL = rms.getMetricsRegistry().getCounter(SOURCE_UNCLEANLY_CLOSED_LOGS, 0L); | 
| 78 |  | -    uncleanlyClosedSkippedBytes = rms.getMetricsRegistry() | 
| 79 |  | -            .getCounter(SOURCE_UNCLEANLY_CLOSED_IGNORED_IN_BYTES, 0L); | 
| 80 |  | -    restartWALReading = rms.getMetricsRegistry().getCounter(SOURCE_RESTARTED_LOG_READING, 0L); | 
| 81 |  | -    repeatedFileBytes = rms.getMetricsRegistry().getCounter(SOURCE_REPEATED_LOG_FILE_BYTES, 0L); | 
| 82 |  | -    completedWAL = rms.getMetricsRegistry().getCounter(SOURCE_COMPLETED_LOGS, 0L); | 
| 83 |  | -    completedRecoveryQueue = rms.getMetricsRegistry() | 
| 84 |  | -            .getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L); | 
| 85 |  | -    failedRecoveryQueue = rms.getMetricsRegistry() | 
| 86 |  | -            .getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L); | 
| 87 |  | -  } | 
| 88 |  | - | 
| 89 |  | -  @Override public void setLastShippedAge(long age) { | 
| 90 |  | -    ageOfLastShippedOpHist.add(age); | 
| 91 |  | -  } | 
| 92 |  | - | 
| 93 |  | -  @Override public void incrSizeOfLogQueue(int size) { | 
| 94 |  | -    sizeOfLogQueueGauge.incr(size); | 
| 95 |  | -  } | 
| 96 |  | - | 
| 97 |  | -  @Override public void decrSizeOfLogQueue(int size) { | 
| 98 |  | -    sizeOfLogQueueGauge.decr(size); | 
| 99 |  | -  } | 
| 100 |  | - | 
| 101 |  | -  @Override public void incrLogReadInEdits(long size) { | 
| 102 |  | -    logReadInEditsCounter.incr(size); | 
| 103 |  | -  } | 
| 104 |  | - | 
| 105 |  | -  @Override public void incrLogEditsFiltered(long size) { | 
| 106 |  | -    walEditsFilteredCounter.incr(size); | 
| 107 |  | -  } | 
| 108 |  | - | 
| 109 |  | -  @Override public void incrBatchesShipped(int batches) { | 
| 110 |  | -    shippedBatchesCounter.incr(batches); | 
| 111 |  | -  } | 
| 112 |  | - | 
| 113 |  | -  @Override public void incrOpsShipped(long ops) { | 
| 114 |  | -    shippedOpsCounter.incr(ops); | 
| 115 |  | -  } | 
| 116 |  | - | 
| 117 |  | -  @Override public void incrShippedBytes(long size) { | 
| 118 |  | -    shippedBytesCounter.incr(size); | 
| 119 |  | -  } | 
| 120 |  | - | 
| 121 |  | -  @Override public void incrLogReadInBytes(long size) { | 
| 122 |  | -    logReadInBytesCounter.incr(size); | 
| 123 |  | -  } | 
| 124 |  | - | 
| 125 |  | -  @Override public void clear() { | 
| 126 |  | -  } | 
| 127 |  | - | 
| 128 |  | -  @Override | 
| 129 |  | -  public long getLastShippedAge() { | 
| 130 |  | -    return ageOfLastShippedOpHist.getMax(); | 
| 131 |  | -  } | 
| 132 |  | - | 
| 133 |  | -  @Override public void incrHFilesShipped(long hfiles) { | 
| 134 |  | -    shippedHFilesCounter.incr(hfiles); | 
| 135 |  | -  } | 
| 136 |  | - | 
| 137 |  | -  @Override | 
| 138 |  | -  public void incrSizeOfHFileRefsQueue(long size) { | 
| 139 |  | -    sizeOfHFileRefsQueueGauge.incr(size); | 
| 140 |  | -  } | 
| 141 |  | - | 
| 142 |  | -  @Override | 
| 143 |  | -  public void decrSizeOfHFileRefsQueue(long size) { | 
| 144 |  | -    sizeOfHFileRefsQueueGauge.decr(size); | 
| 145 |  | -  } | 
| 146 |  | - | 
| 147 |  | -  @Override | 
| 148 |  | -  public int getSizeOfLogQueue() { | 
| 149 |  | -    return (int)sizeOfLogQueueGauge.value(); | 
| 150 |  | -  } | 
| 151 |  | - | 
| 152 |  | -  @Override | 
| 153 |  | -  public void incrUnknownFileLengthForClosedWAL() { | 
| 154 |  | -    unknownFileLengthForClosedWAL.incr(1L); | 
| 155 |  | -  } | 
| 156 |  | -  @Override | 
| 157 |  | -  public void incrUncleanlyClosedWALs() { | 
| 158 |  | -    uncleanlyClosedWAL.incr(1L); | 
| 159 |  | -  } | 
| 160 |  | -  @Override | 
| 161 |  | -  public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { | 
| 162 |  | -    uncleanlyClosedSkippedBytes.incr(bytes); | 
| 163 |  | -  } | 
| 164 |  | -  @Override | 
| 165 |  | -  public void incrRestartedWALReading() { | 
| 166 |  | -    restartWALReading.incr(1L); | 
| 167 |  | -  } | 
| 168 |  | -  @Override | 
| 169 |  | -  public void incrRepeatedFileBytes(final long bytes) { | 
| 170 |  | -    repeatedFileBytes.incr(bytes); | 
| 171 |  | -  } | 
| 172 |  | -  @Override | 
| 173 |  | -  public void incrCompletedWAL() { | 
| 174 |  | -    completedWAL.incr(1L); | 
| 175 |  | -  } | 
| 176 |  | -  @Override | 
| 177 |  | -  public void incrCompletedRecoveryQueue() { | 
| 178 |  | -    completedRecoveryQueue.incr(1L); | 
| 179 |  | -  } | 
| 180 |  | -  @Override | 
| 181 |  | -  public void incrFailedRecoveryQueue() { | 
| 182 |  | -    failedRecoveryQueue.incr(1L); | 
| 183 |  | -  } | 
| 184 |  | -  @Override | 
| 185 |  | -  public void init() { | 
| 186 |  | -    rms.init(); | 
| 187 |  | -  } | 
| 188 |  | - | 
| 189 |  | -  @Override | 
| 190 |  | -  public void setGauge(String gaugeName, long value) { | 
| 191 |  | -    rms.setGauge(KEY_PREFIX + gaugeName, value); | 
| 192 |  | -  } | 
| 193 |  | - | 
| 194 |  | -  @Override | 
| 195 |  | -  public void incGauge(String gaugeName, long delta) { | 
| 196 |  | -    rms.incGauge(KEY_PREFIX + gaugeName, delta); | 
| 197 |  | -  } | 
| 198 |  | - | 
| 199 |  | -  @Override | 
| 200 |  | -  public void decGauge(String gaugeName, long delta) { | 
| 201 |  | -    rms.decGauge(KEY_PREFIX + gaugeName, delta); | 
| 202 |  | -  } | 
| 203 |  | - | 
| 204 |  | -  @Override | 
| 205 |  | -  public void removeMetric(String key) { | 
| 206 |  | -    rms.removeMetric(KEY_PREFIX + key); | 
| 207 |  | -  } | 
| 208 |  | - | 
| 209 |  | -  @Override | 
| 210 |  | -  public void incCounters(String counterName, long delta) { | 
| 211 |  | -    rms.incCounters(KEY_PREFIX + counterName, delta); | 
| 212 |  | -  } | 
| 213 |  | - | 
| 214 |  | -  @Override | 
| 215 |  | -  public void updateHistogram(String name, long value) { | 
| 216 |  | -    rms.updateHistogram(KEY_PREFIX + name, value); | 
| 217 |  | -  } | 
| 218 |  | - | 
| 219 |  | -  @Override | 
| 220 |  | -  public String getMetricsContext() { | 
| 221 |  | -    return rms.getMetricsContext(); | 
| 222 |  | -  } | 
| 223 |  | - | 
| 224 |  | -  @Override | 
| 225 |  | -  public String getMetricsDescription() { | 
| 226 |  | -    return rms.getMetricsDescription(); | 
| 227 |  | -  } | 
| 228 |  | - | 
| 229 |  | -  @Override | 
| 230 |  | -  public String getMetricsJmxContext() { | 
| 231 |  | -    return rms.getMetricsJmxContext(); | 
| 232 |  | -  } | 
| 233 |  | - | 
| 234 |  | -  @Override | 
| 235 |  | -  public String getMetricsName() { | 
| 236 |  | -    return rms.getMetricsName(); | 
| 237 |  | -  } | 
| 238 |  | - | 
| 239 |  | -  @Override | 
| 240 |  | -  public long getWALEditsRead() { | 
| 241 |  | -    return this.logReadInEditsCounter.value(); | 
| 242 |  | -  } | 
| 243 |  | - | 
| 244 |  | -  @Override | 
| 245 |  | -  public long getShippedOps() { | 
| 246 |  | -    return this.shippedOpsCounter.value(); | 
| 247 |  | -  } | 
| 248 |  | - | 
| 249 |  | -  @Override | 
| 250 |  | -  public long getEditsFiltered() { | 
| 251 |  | -    return this.walEditsFilteredCounter.value(); | 
| 252 |  | -  } | 
|  | 23 | +public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource { | 
|  | 24 | + | 
|  | 25 | +  public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage"; | 
|  | 26 | + | 
|  | 27 | +  /** | 
|  | 28 | +   * Sets the total usage of memory used by edits in memory read from WALs. The memory represented | 
|  | 29 | +   * by this usage measure is across peers/sources. For example, we may batch the same WAL edits | 
|  | 30 | +   * multiple times for the sake of replicating them to multiple peers.. | 
|  | 31 | +   * @param usage The memory used by edits in bytes | 
|  | 32 | +   */ | 
|  | 33 | +  void setWALReaderEditsBufferBytes(long usage); | 
|  | 34 | + | 
|  | 35 | +  /** | 
|  | 36 | +   * Returns the size, in bytes, of edits held in memory to be replicated across all peers. | 
|  | 37 | +   */ | 
|  | 38 | +  long getWALReaderEditsBufferBytes(); | 
| 253 | 39 | } | 
0 commit comments