From fd91251b6ce47d4bf213c89a4b77d895008ab461 Mon Sep 17 00:00:00 2001 From: ritesh-avesha Date: Sat, 8 Nov 2025 18:44:58 +0530 Subject: [PATCH 1/4] feat(): Added compression support in pushgateway Signed-off-by: ritesh-avesha --- docs/content/exporting/pushgateway.md | 14 +++++++ prometheus_client/exposition.py | 56 ++++++++++++++++++++++----- tests/test_exposition.py | 25 ++++++++++++ 3 files changed, 85 insertions(+), 10 deletions(-) diff --git a/docs/content/exporting/pushgateway.md b/docs/content/exporting/pushgateway.md index bf5eb112..d9f9a945 100644 --- a/docs/content/exporting/pushgateway.md +++ b/docs/content/exporting/pushgateway.md @@ -54,6 +54,20 @@ g.set_to_current_time() push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler) ``` +# Compressing data before sending to pushgateway +Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments. +To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`: +```python +push_to_gateway( + 'localhost:9091', + job='batchA', + registry=registry, + handler=my_auth_handler, + compression='gzip', +) +``` +Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package. + TLS Auth is also supported when using the push gateway with a special handler. ```python diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 0d471707..61ba6376 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -9,7 +9,7 @@ import ssl import sys import threading -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Literal from urllib.error import HTTPError from urllib.parse import parse_qs, quote_plus, urlparse from urllib.request import ( @@ -46,6 +46,7 @@ """Content type of the latest format""" CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0 +CompressionType = Optional[Literal['gzip', 'snappy']] class _PrometheusRedirectHandler(HTTPRedirectHandler): @@ -596,6 +597,7 @@ def push_to_gateway( grouping_key: Optional[Dict[str, Any]] = None, timeout: Optional[float] = 30, handler: Callable = default_handler, + compression: CompressionType = None, ) -> None: """Push metrics to the given pushgateway. @@ -632,10 +634,12 @@ def push_to_gateway( failure. 'content' is the data which should be used to form the HTTP Message Body. + `compression` selects the payload compression. Supported values are 'gzip' + and 'snappy'. Defaults to None (no compression). This overwrites all metrics with the same job and grouping_key. This uses the PUT HTTP method.""" - _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler) + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression) def pushadd_to_gateway( @@ -645,6 +649,7 @@ def pushadd_to_gateway( grouping_key: Optional[Dict[str, Any]] = None, timeout: Optional[float] = 30, handler: Callable = default_handler, + compression: CompressionType = None, ) -> None: """PushAdd metrics to the given pushgateway. @@ -663,10 +668,12 @@ def pushadd_to_gateway( will be carried out by a default handler. See the 'prometheus_client.push_to_gateway' documentation for implementation requirements. + `compression` selects the payload compression. Supported values are 'gzip' + and 'snappy'. Defaults to None (no compression). This replaces metrics with the same name, job and grouping_key. This uses the POST HTTP method.""" - _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler) + _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression) def delete_from_gateway( @@ -706,6 +713,7 @@ def _use_gateway( grouping_key: Optional[Dict[str, Any]], timeout: Optional[float], handler: Callable, + compression: CompressionType = None, ) -> None: gateway_url = urlparse(gateway) # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6. @@ -715,24 +723,52 @@ def _use_gateway( gateway = gateway.rstrip('/') url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job)) - data = b'' - if method != 'DELETE': - if registry is None: - registry = REGISTRY - data = generate_latest(registry) - if grouping_key is None: grouping_key = {} url += ''.join( '/{}/{}'.format(*_escape_grouping_key(str(k), str(v))) for k, v in sorted(grouping_key.items())) + data = b'' + headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + if method != 'DELETE': + if registry is None: + registry = REGISTRY + data = generate_latest(registry) + data, headers = _compress_payload(data, compression) + elif compression is not None: + raise ValueError('Compression is not supported for DELETE requests.') + handler( url=url, method=method, timeout=timeout, - headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data, + headers=headers, data=data, )() +def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]: + headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + if compression is None: + return data, headers + + encoding = compression.lower() + if encoding == 'gzip': + headers.append(('Content-Encoding', 'gzip')) + return gzip.compress(data), headers + if encoding == 'snappy': + try: + import snappy + except ImportError as exc: + raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') from exc + headers.append(('Content-Encoding', 'snappy')) + compressor = snappy.StreamCompressor() + compressed = compressor.compress(data) + flush = getattr(compressor, 'flush', None) + if callable(flush): + compressed += flush() + return compressed, headers + raise ValueError(f"Unsupported compression type: {compression}") + + def _escape_grouping_key(k, v): if v == "": # Per https://github.com/prometheus/pushgateway/pull/346. diff --git a/tests/test_exposition.py b/tests/test_exposition.py index 3dd5e378..aceff738 100644 --- a/tests/test_exposition.py +++ b/tests/test_exposition.py @@ -1,3 +1,4 @@ +import gzip from http.server import BaseHTTPRequestHandler, HTTPServer import os import threading @@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self): self.assertNotIn('//', self.requests[0][0].path) + def test_push_with_gzip_compression(self): + push_to_gateway(self.address, "my_job", self.registry, compression='gzip') + request, body = self.requests[0] + self.assertEqual(request.headers.get('content-encoding'), 'gzip') + decompressed = gzip.decompress(body) + self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_snappy_compression(self): + snappy = pytest.importorskip('snappy') + push_to_gateway(self.address, "my_job", self.registry, compression='snappy') + request, body = self.requests[0] + self.assertEqual(request.headers.get('content-encoding'), 'snappy') + decompressor = snappy.StreamDecompressor() + decompressed = decompressor.decompress(body) + flush = getattr(decompressor, 'flush', None) + if callable(flush): + decompressed += flush() + self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_invalid_compression(self): + with self.assertRaisesRegex(ValueError, 'Unsupported compression type'): + push_to_gateway(self.address, "my_job", self.registry, compression='brotli') + self.assertEqual(self.requests, []) + def test_instance_ip_grouping_key(self): self.assertTrue('' != instance_ip_grouping_key()['instance']) From e250838803d65611342a06ecc88336b266e9f29a Mon Sep 17 00:00:00 2001 From: ritesh-avesha Date: Tue, 18 Nov 2025 22:19:26 +0530 Subject: [PATCH 2/4] fix(): Incorporated changes for PR review comments Signed-off-by: ritesh-avesha --- prometheus_client/exposition.py | 17 +++++++++++------ tox.ini | 1 + 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 61ba6376..8e8bcf22 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -9,7 +9,7 @@ import ssl import sys import threading -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union, Literal +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union from urllib.error import HTTPError from urllib.parse import parse_qs, quote_plus, urlparse from urllib.request import ( @@ -22,6 +22,13 @@ from .registry import CollectorRegistry, REGISTRY from .utils import floatToGoString, parse_version +try: + import snappy + SNAPPY_AVAILABLE = True +except ImportError: + snappy = None # type: ignore + SNAPPY_AVAILABLE = False + __all__ = ( 'CONTENT_TYPE_LATEST', 'CONTENT_TYPE_PLAIN_0_0_4', @@ -730,7 +737,7 @@ def _use_gateway( for k, v in sorted(grouping_key.items())) data = b'' - headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + headers = [] if method != 'DELETE': if registry is None: registry = REGISTRY @@ -755,10 +762,8 @@ def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, headers.append(('Content-Encoding', 'gzip')) return gzip.compress(data), headers if encoding == 'snappy': - try: - import snappy - except ImportError as exc: - raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') from exc + if not SNAPPY_AVAILABLE: + raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') headers.append(('Content-Encoding', 'snappy')) compressor = snappy.StreamCompressor() compressed = compressor.compress(data) diff --git a/tox.ini b/tox.ini index ccf95cc2..45a6baf3 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ deps = attrs {py3.9,pypy3.9}: twisted {py3.9,pypy3.9}: aiohttp + {py3.9}: python-snappy commands = coverage run --parallel -m pytest {posargs} [testenv:py3.9-nooptionals] From 9f9b0891baf04d372b95f08b459fd8776bfd175c Mon Sep 17 00:00:00 2001 From: ritesh-avesha Date: Tue, 18 Nov 2025 22:35:22 +0530 Subject: [PATCH 3/4] fix(): Incorporated changes for PR review comments, lint issues Signed-off-by: ritesh-avesha --- prometheus_client/exposition.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 8e8bcf22..2c58b452 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -23,7 +23,7 @@ from .utils import floatToGoString, parse_version try: - import snappy + import snappy # type: ignore SNAPPY_AVAILABLE = True except ImportError: snappy = None # type: ignore @@ -737,14 +737,17 @@ def _use_gateway( for k, v in sorted(grouping_key.items())) data = b'' - headers = [] + headers: List[Tuple[str, str]] = [] if method != 'DELETE': if registry is None: registry = REGISTRY data = generate_latest(registry) data, headers = _compress_payload(data, compression) - elif compression is not None: - raise ValueError('Compression is not supported for DELETE requests.') + else: + # DELETE requests still need Content-Type header per test expectations + headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + if compression is not None: + raise ValueError('Compression is not supported for DELETE requests.') handler( url=url, method=method, timeout=timeout, From 832072bd470e707bc1b278f74e485568b01a1a33 Mon Sep 17 00:00:00 2001 From: ritesh-avesha Date: Tue, 18 Nov 2025 22:54:52 +0530 Subject: [PATCH 4/4] fix(): lint issues Signed-off-by: ritesh-avesha --- prometheus_client/exposition.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 2c58b452..404b1282 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -9,7 +9,9 @@ import ssl import sys import threading -from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union +from typing import ( + Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union, +) from urllib.error import HTTPError from urllib.parse import parse_qs, quote_plus, urlparse from urllib.request import ( @@ -23,7 +25,7 @@ from .utils import floatToGoString, parse_version try: - import snappy # type: ignore + import snappy # type: ignore SNAPPY_AVAILABLE = True except ImportError: snappy = None # type: ignore