Skip to content

streaming ingestion support for PUT operation #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

sreekanth-db
Copy link

@sreekanth-db sreekanth-db commented Jul 21, 2025

What type of PR is this?

  • Refactor
  • Feature
  • Bug Fix
  • Other

Description

Currently if users want to ingest an in-memory object or input stream (which they might have generated by some processing) using the PUT command, they first need to save it into a file on the disk and need to pass the the file path in the PUT command. This involves unnecessary disk I/O for writing and reading back from the disk. In this PR we are providing an interface to directly pass the in-memory object to the command to remove disk I/O

Related issue: #435

Example usage: https://github.com/sreekanth-db/databricks-sql-python/blob/4cd2cee3d793760ebab964191136bbd19b33d399/examples/streaming_put.py (used for manual testing as well)

Unit tests and e2e tests are also included in the changed files

Integration tests successful run: https://github.com/databricks/databricks-sql-python/actions/runs/16444053628

How is this tested?

  • Unit tests
  • E2E Tests
  • Manually
  • N/A

Related Tickets & Documents

Signed-off-by: Sreekanth Vadigi <[email protected]>
Signed-off-by: Sreekanth Vadigi <[email protected]>
import io
import os
from databricks import sql

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We don't define an executable file in example, ref the other examples just a connect and execution part is enough. The main, etc are not needed

"""

# Prepare headers
http_headers = dict(headers) if headers else {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant, we already are checking and passing a dict, otherwise you can define in the func argument as headers: dict = {} because expectation is to either not pass header or give a dict, never None

try:
# Stream directly to presigned URL
response = requests.put(
url=presigned_url,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz try to integrated the pre existing http client

class DatabricksHttpClient:

Comment on lines 748 to 759
# Check response codes
OK = requests.codes.ok # 200
CREATED = requests.codes.created # 201
ACCEPTED = requests.codes.accepted # 202
NO_CONTENT = requests.codes.no_content # 204

if response.status_code not in [OK, CREATED, NO_CONTENT, ACCEPTED]:
raise OperationalError(
f"Staging operation over HTTP was unsuccessful: {response.status_code}-{response.text}",
session_id_hex=self.connection.get_session_id_hex(),
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is being repeated in both streaming and the existing flow, can we separated out this in a volume error handing util

Comment on lines 898 to 905
# Store stream data if provided
self._input_stream_data = None
if input_stream is not None:
# Validate stream has required methods
if not hasattr(input_stream, "read"):
raise TypeError(
"input_stream must be a binary stream with read() method"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel we need to be try catching this, any Runtime error such as user providing incorrect data should fail naturally

"""Unit tests for streaming PUT functionality."""

def setUp(self):
"""Set up test fixtures."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pytest fixtures

mock_handler.assert_called_once()

# Verify the finally block cleanup
self.assertIsNone(self.cursor._input_stream_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thoughts as the try catch cleanup in execute

Comment on lines 644 to 648
if not self._input_stream_data:
raise ProgrammingError(
"No input stream provided for streaming operation",
session_id_hex=self.connection.get_session_id_hex(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If needed this should be within the handle_staging_put_stream function as this error handling is the concern of that function


if response.status_code == ACCEPTED:
logger.debug(
f"Response code {ACCEPTED} from server indicates upload was accepted "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you integrate lazy logging in all the logger logs, plz refer other logging examples

@@ -783,6 +856,7 @@ def execute(
self,
operation: str,
parameters: Optional[TParameterCollection] = None,
input_stream: Optional[BinaryIO] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a new argument should be at the last (after enforce_embedded_schema), otherwise this will break the existing users functions

Signed-off-by: Sreekanth Vadigi <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants