Skip to content

Using Stream2segment in your Python code

rizac edited this page Apr 28, 2021 · 22 revisions

This notebook assumes you have already run the download subroutine (s2s download -c <config_file.yaml> on the terminal) and you desire to run custom processing on it

Table of contents

Introduction

Stream2segment has two main functions for working with downloaded data, process and imap. Both functions run custom code, a so-called processing function (pyfunc), on a selection of downloaded waveform segments. imap yields the output of pyfunc for each selected segment, process writes the output of pyfunc for each selected segment in a row of a chosen tabular file (HDF or CSV)

from stream2segment.process import process, imap
# you can type help(process) or help(imap) for documentation

By running the command s2s init on the terminal, a fully documented Python module (paramtable.py) with a processing function called main is available. The function can be edited and used in two ways:

  • Produce the desired tabular output with the s2s process command (s2s process -p paramtable.py ...)
  • Run customized code by invoking the module as script (python paramtable.py), usually calling imap or process into the usual if __name__ == "__main__": bolerplate code

In general, and especially for big data processing, these are the recommended way to process segments, as it avoids the unnecessary overhead of a Notebook, which is designed for other purposes. For instance, imap and process can be run with parallel sub-processes for faster execution, but we do not know how efficiently a Notebook handles multiprocessing. Moreover, they can show on the terminal a progress bar with the % of task done and the estimated reamining time, quite useful for hours-long tasks, but the progressbar can not be dispolayed on a Notebook

Writing a custom processing routine

Regardless of your environment choice (terminal vs. notebook), a processing routine always involves the same operations:

  • Connect to a database
  • Select which segments to fetch
  • Run a processing function on the selected segments

Database URL

DISCLAIMER: Pay attention when typing database URLs with passwords: avoid committing code with hard-coded sentitive information, try to avoid to type passwords on the terminal if you do not want them to be stored in the command history

In most cases, the database where the data has been downloaded and ready needs a simple string URL. For simplicity, a database URL is usually extracted from the download configuration file:

from stream2segment.process import yaml_load
# uncomment the line below using an existing file on your OS:
# dburl = yaml_load(download_file.yaml)['dburl']

In this notebook, where we will use an example database (2 segments) available in the same directory of this notebook if you run the s2s init command. If you will have problem later accessing the database, check the current working directory and be sure the databse is in it. A database URL can be written according to the RFC-1738 syntax:

import os
dburl = 'sqlite:///' + os.path.join(os.getcwd(), 'example.db.sqlite')

Selection of segments

The selection of suitable segments is performed by creating a dict mapping one or more Segment attributes to a selection expression for that attribute (for details on the segment object and its attributes, see the segment object)

from stream2segment.process import Segment, get_segments

# create the selection dict. This dict select a single segment (id=2) for illustrative purposes:
segments_selection = {
  'has_data': 'true',
  'maxgap_numsamples': '[-0.5, 0.5]',
  'event_distance_deg': '[70, 80]'
  # other optional attributes (see cheatsheet below for details):
  # missing_data_sec: '<120'
  # missing_data_ratio: '<0.5'
  # id: '<300'
  # event.time: "(2014-01-01T00:00:00, 2014-12-31T23:59:59)"
  # event.latitude: "[24, 70]"
  # event.longitude: "[-11, 24]"
}

Processing function

A processing function has signature (arguments) (segment, config) where the first argument is a segment object and the second is a custom dictionary of argument that can be passed to imap and process (in this example, we will not provide any config, thus the second argument will not be used).

The function has no limitation on what can be implemented, you should only avoid to return the whole segment object as it might be detached (see next section for details), i.e. its properties not accessible outside the processing function. However, you can safely return any of the segment attributes separately

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # Get ObsPy Trace object. If the waveform has no gapos/overlaps, the trace is the only element
    # of the segment stream object (otherwise the stream will have several traces):
    trace = segment.stream()[0]
    # remove the instrumental response of the Trace:
    # get ObsPy Inventory object:
    inventory = segment.inventory()
    # remove the response:
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    # return the segment.id, the event magnitude, the original trace and the trace with response removed
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

Complete processing routine with imap

With the database URL, the selection of segment, and our processing function, we can now iullustrate a simple usage of, e.g., imap. For this purpose, we simply print the segment event magnitude, and the segment trace with and without the respsonse removed

from stream2segment.process import imap

for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]
  - Physical units (response removed):     [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]

Caveat: As you can see, the returned trace expected in count units is actually the same as the trace with response removed. This happened because many Stream and Trace methods (check ObsPy documentation in case of doubts) modify the objects in-place, i.e. they permanently modify the returned value of segment.stream() (which is cached in the segment object for performance reasons).

A solution is to call segment.stream(reload=True) that forces the complete reload of the stream from the database, or simply work on copies of those objects (which is generally faster), e.g.:

def my_processing_function(segment, config):
    """simple processing function. Take the segment stream and remove its instrumental response"""
    # Get ObsPy Trace object and make a COPY of IT:
    trace = segment.stream()[0].copy()

    # now, segment.stream()[0] and trace are two different objects.
    # By just running the same code as in the previous processing function:
    # we will remove the response to trace and preserve segment.stream()
    # (but you can do also the other way around, depending on your needs)
    inventory = segment.inventory()
    trace_remresp = trace.remove_response(inventory)  # see caveat below
    return segment.id, segment.event.magnitude, segment.stream()[0], trace_remresp

# And now we get the expected result:
for (segment_id, mag, trace, trace_remresp) in imap(my_processing_function, dburl, segments_selection):
    print()
    print('Segment Id: %d (event magnitude: %.1f)' % (segment_id, mag))
    print('Segment trace (first three points):')
    print('  - Counts units (no response removed):    %s' % trace.data[:3])
    print('  - Physical units (response removed):     %s' % trace_remresp.data[:3])
Segment Id: 2 (event magnitude: 8.1)
Segment trace (first three points):
  - Counts units (no response removed):    [-314 -280 -251]
  - Physical units (response removed):     [ -1.42699395e-06  -1.43603990e-06  -1.42210201e-06]

Iterating over selected segments

Although builtin functions imap and process should fit most of the user needs, if you need even more customization, a more low-level approach consists of simply work iteratively on the selected segments via get_segments. This is basically what the two builtin functions do under the hood with a given segments selection:

from stream2segment.process import get_session, get_segments

for seg in get_segments(dburl, segments_selection):
    # do your work here... In this case, with the first segment `seg` just loaded, simply exit the loop:
    break

get_segments opens a database session, yields selected segments and closes the session afterwards. A database session is an object that establishes all conversations with the database and represents a "holding zone" for all the objects which you’ve loaded or associated with it during its lifespan.

Closing a session is recommended after you finished your work as it releases memory on the computer and (if the db is remote) on the server, avoiding potential problems. Note that after a session is closed, all segment objects are detached from the database, which means we can not access anymore all of its properties, but only those previously loaded. E.g., accessing the segment related objects (e.g. the event object) outside the for loop, raises an error:

try:
    seg.event
except Exception as exc:
    print('ERROR: ' + str(exc))
ERROR: Parent instance <Segment at 0x156b5b7c0> is not bound to a Session; lazy load operation of attribute 'event' cannot proceed (Background on this error at: http://sqlalche.me/e/13/bhk3)

In very specific cases where you want to keep the segments and all related objects accessible (i.e. attached to a session) also outside a get_segments for-loop, you can call get_segments with a session object instead of a db url. Just remember to close the session manually at the end of your processing routine (see at the end of this notebook):

from stream2segment.process import get_session, get_segments
session = get_session(dburl)

for seg in get_segments(session, segments_selection):
    break

Backreferences (on related object's)

With the session still open, we can access some of the segments related objects (this will issue a query to the database):

evt = seg.event
print(str(evt))

sta = seg.station
print(str(sta))
Event
 attributes (16 of 16 loaded):
  event_id: 20170908_0 (str, 16 characters, showing first 10 only)
  time: 2017-09-08 04:49:21.200000 (datetime)
  latitude: 15.02 (float)
  longitude: -93.81 (float)
  depth_km: 72.0 (float)
  author: EMSC (str)
  catalog: EMSC-RTS (str)
  contributor: EMSC (str)
  contributor_id: 616600 (str)
  mag_type: mw (str)
  magnitude: 8.1 (float)
  mag_author: EMSC (str)
  event_location_name: OFFSHORE C (str, 24 characters, showing first 10 only)
  event_type: None (NoneType)
  webservice_id: 1 (int)
  id: 1 (int)
 related_objects (0 of 1 loaded):
  segments
Station
 attributes (11 of 11 loaded):
  network: GE (str)
  station: MTE (str)
  latitude: 40.3997 (float)
  longitude: -7.5442 (float)
  elevation: 815.0 (float)
  site_name: None (NoneType)
  start_time: 2014-10-09 00:00:00 (datetime)
  end_time: None (NoneType)
  inventory_xml: b'\x1f\x8b\x08\x00\xa4\x99\x1b\\\x02\xff' (bytes, 45120 elements, showing first 10 only)
  datacenter_id: 1 (int)
  id: 1 (int)
 related_objects (0 of 3 loaded):
  datacenter
  channels
  segments

Note that both objects have, among their related objects, a so-called back-reference segments. This is a list-like object of Segments (among which the original seg object) and not a single entity, because of a so-called "many-to-one" relationship: one segment is always related to one event/station, whereas one event generates many segments at different station, and one station generates many segments for different events.

This extremely powerful feature connecting several entities effortless is a natural consequence of being backed by a relational database and it would be neearly impossible to get with a classical file system storage. Neverthless, it should be used with care with massive downloads, as one might load in the session huge amount of data, causing memory overflows. A solution might be to call from times to times session.expunge_all() which remove all object instances from the session (possibly freeing memory) or load only each object id, deferring the load of all other attributes from the database upon access, e.g.:

from sqlalchemy.orm import load_only

evt = seg.event
# load event related segments (*risk of memory overflow: low):
segments = evt.segments.options(load_only('id')).all()

cha = seg.channel
# load channel related segments (*risk of memory overflow: medium):
segments = cha.segments.options(load_only('id')).all()

sta = seg.station
# load station related segments (*risk of memory overflow: high):
segments = sta.segments.options(load_only('id')).all()

dct = seg.datacenter
# load data center (e.g. IRIS) related segments (*risk of memory overflow: very high):
segments = dct.segments.options(load_only('id')).all()

* The levels of risk reported are just heuristically estimated and have to be considered reliable only relative to each other (an event has almost certainly less related segments than a channel, which has almost certainly less related segments than a station, and so on)

In any case, for really memory consuming or long tasks, as already recommended, consider moving the Notebook code into a custom Python module and execute the module as a script or via use the command s2s process

# finally close session
from stream2segment.process import close_session
close_session(session)
# If True is returned, the session and the database connection are now closed.
# If you want to close the session only to reuse it later and free memeory, call:
# close_session(session, False)
True
Clone this wiki locally