From 0ca9492f0a8cc7aa3da314a3e9906c570307e484 Mon Sep 17 00:00:00 2001 From: ThePsyjo <131891+ThePsyjo@users.noreply.github.com> Date: Thu, 25 Sep 2025 18:26:45 +0200 Subject: [PATCH] feat(DataDownloader): add method `IterReportRows` for record streaming --- googleads/ad_manager.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/googleads/ad_manager.py b/googleads/ad_manager.py index c2393b8c..520c8a8e 100644 --- a/googleads/ad_manager.py +++ b/googleads/ad_manager.py @@ -17,11 +17,14 @@ import csv import datetime +import gzip +import io import logging import numbers import os import sys import time +import threading from urllib.request import build_opener import pytz @@ -1099,6 +1102,39 @@ def _ConvertDateTimeToOffset(self, date_time_value): return date_time_str[:-6] + 'Z' else: return date_time_str + + def _run_downloader_thread(self, outfile, **kwargs): + """ + Run `DownloadReportToFile` and afterward flush and close `outfile`. + + :param outfile: Write pipe. + :param kwargs: Remaining keyword arguments. + """ + kwargs['outfile'] = outfile + self.DownloadReportToFile(**kwargs) + outfile.flush() + outfile.close() + + def IterReportRows(self, **kwargs): + """ + Stream data from `DownloadReportToFile` via a pipe and generate records from it. All kwargs but `outfile`, + `export_format` and `use_gzip_compression` are passed to `_run_downloader_thread` which passes them to + `DownloadReportToFile`. + + :param kwargs: Passed into `DownloadReportToFile`. + """ + r_fd, w_fd = os.pipe() + r_pipe = gzip.GzipFile(fileobj=os.fdopen(r_fd, 'rb', buffering=io.DEFAULT_BUFFER_SIZE), mode='rb') + w_pipe = open(w_fd, 'wb', buffering=io.DEFAULT_BUFFER_SIZE) + + kwargs['outfile'] = w_pipe + kwargs['export_format'] = 'TSV' + kwargs['use_gzip_compression'] = True + t = threading.Thread(target=self._run_downloader_thread, kwargs=kwargs) + t.start() + reader = csv.DictReader((line.decode() for line in r_pipe), delimiter='\t') + for row in reader: + yield row def AdManagerClassType(value):