Skip to content

Using Stream2segment in your Python code

rizac edited this page Apr 27, 2021 · 22 revisions

This notebook assumes you have already run the download subroutine (s2s download -c <config_file.yaml> on the terminal) and you desire to run custom processing on it

Table of contents

Introduction

Stream2segment has two main functions for working with downloaded data, process and imap. Both functions run custom code (a so-called processing function pyfunc) on a selection of downloaded waveform segments. imap yields the output of pyfunc for each selected segment, process writes the output of pyfunc for each selected segment in a row of a chosen tabular file (HDF or CSV)

from stream2segment.process import process, imap
# you can type help(process) or help(imap) for documentation

By running the command s2s init on the terminal, a fully documented Python module (paramtable.py) with a processing function called main is available. The user can edit it immediately and in two ways:

  • Produce the desired tabular output with the s2s process command (s2s process -p paramtable.py ...)
  • Run customized code by invoking the module as script (python paramtable.py), usually calling imap or process into the usual if __name__ == "__main__": bolerplate code

In general, and especially for big data processing, these are the recommended way to process segments, as it avoids the unnecessary overhead of a Notebook: imap and process, if run within a notebook, are not guaranteed to fully leverage multiple processors on a given machine, if this is required (this depends on how the Notebook handles multi processing, which is we did not investigate), and will not be able to show a progress bar with the % of task done and the estimated reamining time

Writing a simple processing function

Database URL

DISCLAIMER: Database URLs with passwords should never be distributed (e.g. git commited or pushed) or stored in easily trackable places (e.g., typed on the terminal)

In most cases, the database where the data has been downloaded and ready needs a simple string URL. For simplicity, a database URL is usually extracted from the download configuration file:

from stream2segment.process import yaml_load
# uncomment the line below using an existing file on your OS:
# dburl = yaml_load(download_file.yaml)['dburl']

or written here following the RFC-1738 syntax as in this notebook, where we will use an example database (2 segments) available in the same directory of this notebook if you run the s2s init command. If you have problem loading the database, check the current working directory and be sure the databse is in it:

# The database URL should be the parameter 'dburl' of the download configuration file.
# Here we use an example database (2 segments) provided in the same directory of this notebook:
import os
dburl = 'sqlite:///' + os.path.join(os.getcwd(), 'jupyter.example.db')

Selection of segments

The selection of suitable segments is performed by creating a dict mapping one or more Segment attributes to a selection expression for that attribute (for details on the segment object and its attributes, see the segment object)

from stream2segment.process import Segment, get_segments

# create the selection dict. This dict select a single segment (id=2) for illustrative purposes:
segments_selection = {
  'has_data': 'true',
  'maxgap_numsamples': '[-0.5, 0.5]',
  'event_distance_deg': '[70, 80]'
  # other optional attributes (see cheatsheet below for details):
  # missing_data_sec: '<120'
  # missing_data_ratio: '<0.5'
  # id: '<300'
  # event.time: "(2014-01-01T00:00:00, 2014-12-31T23:59:59)"
  # event.latitude: "[24, 70]"
  # event.longitude: "[-11, 24]"
}

Processing function

A processing function has signature (arguments) (segment, config) where the first argument is a segment object and the second is a custom dictionary of argument that can be passed to imap and process (in this example, we will not provide any config, thus the second argument will not be used).

The function has no limitation on what can be implemented, you should only avoid to return the whole segment object as it might be detached (see next section for details), i.e. its properties not accessible outside the processing function. However, you can safely return any of the segment attributes separately

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # Get ObsPy Trace object. If the waveform has no gapos/overlaps, the trace is the only element
    # of the segment stream object (otherwise the stream will have several traces):
    trace = segment.stream()[0]
    # Now let's remove the instrumental response of the segment strem
    # Get ObsPy Inventory object:
    inventory = segment.inventory()
    # remove the response:
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    # return the segment.id, the event magnitude, the original trace and the trace with response removed
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

Complete processing routine with imap

With the database URL, the selection of segment, and our processing function, we can now iullustrate a simple usage of, e.g., imap. For this purpose, we simply print the segment event magnitude, and the segment trace with and without the respsonse removed in order to show that segment.stream() is now returning the stream with the response removed

# and now run `imap`. The function yields the output of our proceessing function (segment, trace) and the
# segment_id

for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]
  - Physical units (response removed):     [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]

Caveat: As you can see, the returned trace expected in count units is actually the same as the trace with response removed. This happened because many Stream and Trace methods (check ObsPy documentation in case of doubts) modify the objects in-place, i.e. they permanently modify the returned value of segment.stream().

A solution to call segment.stream(reload=True) that forces the complete reload of the stream from the database, or simply work on copies of those objects (which is generally faster), e.g.:

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # COPY a trace, so that segment.stream() is not affected:
    trace = segment.stream()[0].copy()
    # Apply the same code as above, but on a copy:
    inventory = segment.inventory()
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

# And now we get the expected result:
for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [-314 -280 -251]
  - Physical units (response removed):     [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]

Iterating over selected segments

If you need even more customization, a more low-level approach consists of simply work iteratively on the selected segments via get_segments. This is basically what imap and process do under the hood with a given segments selection:

from stream2segment.process import get_session, get_segments
segment = None
for seg in get_segments(dburl, segments_selection):
    # do your work here... Let's just store the first segment and exit the loop:
    segment = seg
    break

get_segments opens a database session, yields selected segments and closes the session afterwards. A database session is an object that establishes all conversations with the database and represents a "holding zone" for all the objects which you’ve loaded or associated with it during its lifespan.

Closing a session is recommended after you finished your work as it releases memory on the computer and (if the db is remote) on the server, avoiding potential problems. Note that after a session is closed, all segment objects are detached from the database, which means we can not access anymore all of its properties, but only those previously loaded. E.g., accessing the segment related objects (e.g. the event object) outside the for loop, raises an error:

try:
    seg.event
except Exception as exc:
    print('ERROR: ' + str(exc))
ERROR: Parent instance <Segment at 0x160754670> is not bound to a Session; lazy load operation of attribute 'event' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)

In very specific cases where you want to keep the segments and all related objects accessible (i.e. attached to a session) also outside a get_segments for-loop, you can call get_segments with a session object instead of a db url. Just remember to close the session manually at the end of your processing routine (see at the end of this notebook):

from stream2segment.process import get_session, get_segments
session = get_session(dburl)

for seg in get_segments(session, segments_selection):
    # Let's just store again the first segment and exit the loop:
    segment = seg
    break

Backreferences (on related object's)

With the session still open, we can print some of the segments related objects:

evt = seg.event
print(str(evt))

sta = seg.station
print(str(sta))
Event
 attributes (16 of 16 loaded):
  event_id: 20170908_0 (str, 16 characters, showing first 10 only)
  time: 2017-09-08 04:49:21.200000 (datetime)
  latitude: 15.02 (float)
  longitude: -93.81 (float)
  depth_km: 72.0 (float)
  author: EMSC (str)
  catalog: EMSC-RTS (str)
  contributor: EMSC (str)
  contributor_id: 616600 (str)
  mag_type: mw (str)
  magnitude: 8.1 (float)
  mag_author: EMSC (str)
  event_location_name: OFFSHORE C (str, 24 characters, showing first 10 only)
  event_type: None (NoneType)
  webservice_id: 1 (int)
  id: 1 (int)
 related_objects (0 of 1 loaded):
  segments
Station
 attributes (11 of 11 loaded):
  network: GE (str)
  station: MTE (str)
  latitude: 40.3997 (float)
  longitude: -7.5442 (float)
  elevation: 815.0 (float)
  site_name: None (NoneType)
  start_time: 2014-10-09 00:00:00 (datetime)
  end_time: None (NoneType)
  inventory_xml: b'\x1f\x8b\x08\x00\xa4\x99\x1b\\\x02\xff' (bytes, 45120 elements, showing first 10 only)
  datacenter_id: 1 (int)
  id: 1 (int)
 related_objects (0 of 3 loaded):
  datacenter
  channels
  segments

Note that both objects have, among their related objects, a so-called back-reference segments. This is a list-like object of Segments (among which the original seg object) and not a single entity, because by design one segment is always related to one event/station, whereas one event generates many segments at different station, and one station generates many segments at different events

This extremely powerful feature connecting several entities effortless is a natural consequence of the database storage employed and it would not be easily achievable with a classical file system storage. Neverthless, it should be used with care with massive downloads (huge databases), as one might load in the session huge amount of data, causing memory overflows. A solution might be to call session.expunge_all() which remove all object instances from this Session (possibly freeing memory) or load only each object id, deferring the load of all other attributes from the database upon access, e.g.:

from sqlalchemy.orm import load_only

evt = seg.event
# load event related segments (*risk of memory overflow: low):
segments = evt.segments.options(load_only('id')).all()

cha = seg.channel
# load channel related segments (*risk of memory overflow: medium):
segments = cha.segments.options(load_only('id')).all()

sta = seg.station
# load station related segments (*risk of memory overflow: high):
segments = sta.segments.options(load_only('id')).all()

dct = seg.datacenter
# load data center (e.g. IRIS) related segments (*risk of memory overflow: very high):
segments = dct.segments.options(load_only('id')).all()

* The levels of risk reported are just heuristically estimated and have to be considered reliable only relative to each other (an event has almost certainly less related segments than a channel, which has almost certainly less related segments than a station, and so on)

In any case, for really memory consuming or long tasks, as already recommended, consider moving the Notebook code into a custom Python module and execute the module as a script or via use the command s2s process

# finally close session
from stream2segment.process import close_session
close_session(session)
True
Clone this wiki locally