Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content/exporting/pushgateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ g.set_to_current_time()
push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler)
```

# Compressing data before sending to pushgateway
Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments.
To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`:
```python
push_to_gateway(
'localhost:9091',
job='batchA',
registry=registry,
handler=my_auth_handler,
compression='gzip',
)
```
Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package.

TLS Auth is also supported when using the push gateway with a special handler.

```python
Expand Down
66 changes: 56 additions & 10 deletions prometheus_client/exposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import ssl
import sys
import threading
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import (
Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union,
)
from urllib.error import HTTPError
from urllib.parse import parse_qs, quote_plus, urlparse
from urllib.request import (
Expand All @@ -22,6 +24,13 @@
from .registry import CollectorRegistry, REGISTRY
from .utils import floatToGoString, parse_version

try:
import snappy # type: ignore
SNAPPY_AVAILABLE = True
except ImportError:
snappy = None # type: ignore
SNAPPY_AVAILABLE = False

__all__ = (
'CONTENT_TYPE_LATEST',
'CONTENT_TYPE_PLAIN_0_0_4',
Expand All @@ -46,6 +55,7 @@
"""Content type of the latest format"""

CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
CompressionType = Optional[Literal['gzip', 'snappy']]


class _PrometheusRedirectHandler(HTTPRedirectHandler):
Expand Down Expand Up @@ -596,6 +606,7 @@ def push_to_gateway(
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
compression: CompressionType = None,
) -> None:
"""Push metrics to the given pushgateway.

Expand Down Expand Up @@ -632,10 +643,12 @@ def push_to_gateway(
failure.
'content' is the data which should be used to form the HTTP
Message Body.
`compression` selects the payload compression. Supported values are 'gzip'
and 'snappy'. Defaults to None (no compression).

This overwrites all metrics with the same job and grouping_key.
This uses the PUT HTTP method."""
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)


def pushadd_to_gateway(
Expand All @@ -645,6 +658,7 @@ def pushadd_to_gateway(
grouping_key: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = 30,
handler: Callable = default_handler,
compression: CompressionType = None,
) -> None:
"""PushAdd metrics to the given pushgateway.

Expand All @@ -663,10 +677,12 @@ def pushadd_to_gateway(
will be carried out by a default handler.
See the 'prometheus_client.push_to_gateway' documentation
for implementation requirements.
`compression` selects the payload compression. Supported values are 'gzip'
and 'snappy'. Defaults to None (no compression).

This replaces metrics with the same name, job and grouping_key.
This uses the POST HTTP method."""
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)


def delete_from_gateway(
Expand Down Expand Up @@ -706,6 +722,7 @@ def _use_gateway(
grouping_key: Optional[Dict[str, Any]],
timeout: Optional[float],
handler: Callable,
compression: CompressionType = None,
) -> None:
gateway_url = urlparse(gateway)
# See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
Expand All @@ -715,24 +732,53 @@ def _use_gateway(
gateway = gateway.rstrip('/')
url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))

data = b''
if method != 'DELETE':
if registry is None:
registry = REGISTRY
data = generate_latest(registry)

if grouping_key is None:
grouping_key = {}
url += ''.join(
'/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
for k, v in sorted(grouping_key.items()))

data = b''
headers: List[Tuple[str, str]] = []
if method != 'DELETE':
if registry is None:
registry = REGISTRY
data = generate_latest(registry)
data, headers = _compress_payload(data, compression)
else:
# DELETE requests still need Content-Type header per test expectations
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
if compression is not None:
raise ValueError('Compression is not supported for DELETE requests.')

handler(
url=url, method=method, timeout=timeout,
headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data,
headers=headers, data=data,
)()


def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
if compression is None:
return data, headers

encoding = compression.lower()
if encoding == 'gzip':
headers.append(('Content-Encoding', 'gzip'))
return gzip.compress(data), headers
if encoding == 'snappy':
if not SNAPPY_AVAILABLE:
raise RuntimeError('Snappy compression requires the python-snappy package to be installed.')
headers.append(('Content-Encoding', 'snappy'))
compressor = snappy.StreamCompressor()
compressed = compressor.compress(data)
flush = getattr(compressor, 'flush', None)
if callable(flush):
compressed += flush()
return compressed, headers
raise ValueError(f"Unsupported compression type: {compression}")


def _escape_grouping_key(k, v):
if v == "":
# Per https://github.com/prometheus/pushgateway/pull/346.
Expand Down
25 changes: 25 additions & 0 deletions tests/test_exposition.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
from http.server import BaseHTTPRequestHandler, HTTPServer
import os
import threading
Expand Down Expand Up @@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self):

self.assertNotIn('//', self.requests[0][0].path)

def test_push_with_gzip_compression(self):
push_to_gateway(self.address, "my_job", self.registry, compression='gzip')
request, body = self.requests[0]
self.assertEqual(request.headers.get('content-encoding'), 'gzip')
decompressed = gzip.decompress(body)
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')

def test_push_with_snappy_compression(self):
snappy = pytest.importorskip('snappy')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add snappy to one of the test envs such as

{py3.9,pypy3.9}: twisted
so that CI runs this test in at least one environment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, have added.

push_to_gateway(self.address, "my_job", self.registry, compression='snappy')
request, body = self.requests[0]
self.assertEqual(request.headers.get('content-encoding'), 'snappy')
decompressor = snappy.StreamDecompressor()
decompressed = decompressor.decompress(body)
flush = getattr(decompressor, 'flush', None)
if callable(flush):
decompressed += flush()
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')

def test_push_with_invalid_compression(self):
with self.assertRaisesRegex(ValueError, 'Unsupported compression type'):
push_to_gateway(self.address, "my_job", self.registry, compression='brotli')
self.assertEqual(self.requests, [])

def test_instance_ip_grouping_key(self):
self.assertTrue('' != instance_ip_grouping_key()['instance'])

Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ deps =
attrs
{py3.9,pypy3.9}: twisted
{py3.9,pypy3.9}: aiohttp
{py3.9}: python-snappy
commands = coverage run --parallel -m pytest {posargs}

[testenv:py3.9-nooptionals]
Expand Down