Skip to content

feat: add async 'for' loop support to LogScanner (#424)#438

Open
qzyu999 wants to merge 6 commits intoapache:mainfrom
qzyu999:feat/424-python-async-iterator
Open

feat: add async 'for' loop support to LogScanner (#424)#438
qzyu999 wants to merge 6 commits intoapache:mainfrom
qzyu999:feat/424-python-async-iterator

Conversation

@qzyu999
Copy link

@qzyu999 qzyu999 commented Mar 9, 2026

Purpose

Linked issue: close #424

This pull request completes Issue #424 by enabling standard cross-boundary native Python async for language built-ins over the high-performance PyO3 wrapped LogScanner stream instance.

Brief change log

Previously, PyFluss developers had to manually orchestrate while True polling loops over network boundaries using scanner.poll(timeout). This PR refactors the Python LogScanner iterator logic by implementing the async traversal natively via Rust __anext__ polling bindings and Python Generator __aiter__ context adapters:

  • State Independence: Refactored ScannerKind internals into a safely buffered Arc<tokio::sync::Mutex<ScannerState>>. This guarantees strict thread-safety and fulfills Rust's lifetime constraints enabling unboxed state transitions inside the python_async_runtimes tokio closure.
  • Asynchronous Execution: Polling evaluates non-blocking loops. PyFluss automatically maps Arrow records onto the .await future yield sequence smoothly without blocking event cycles or hardware threads directly!
  • Iterable Compliance: To correctly resolve runtime inspect.isasyncgen() compliance checks within strictly versioned Python 3.12+ engines (such as modern IPython Jupyter servers), __aiter__ dynamically generates a properly wrapped coroutine generator dynamically inside the codebase via py.run(). This completely masks the Python ecosystem's iterator type limitations automatically out-of-the-box.

Tests

  • [NEW] test_log_table.py::test_async_iterator: Integrated a testcontainers ecosystem confirming zero-configuration iteration capabilities function natively evaluating async for record in scanner perfectly without pipeline interruptions while yielding thousands of appended instances sequentially backwards matching existing legacy data frameworks.

API and Format

Yes, this expands the API natively extending capabilities allowing async for loops gracefully. Existing user logic leveraging explicit implementations of .poll_arrow() or legacy functions are untouched.

Documentation

Yes, I updated integration tests acting as live documentation proof demonstrating the capability natively.

@fresh-borzoni
Copy link
Contributor

fresh-borzoni commented Mar 10, 2026

@qzyu999 Ty for the PR, but I checked this branch out and integration tests for python still hang even when I run them locally. PTAL

@qzyu999
Copy link
Author

qzyu999 commented Mar 10, 2026

@qzyu999 Ty for the PR, but I checked this branch out and integration tests for python still hang even when I run them locally. PTAL

Hi @fresh-borzoni, applied the fix. Ran cargo fmt --all locally and it passed. Please run the CI again.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Python async for support for the PyO3 LogScanner binding to address Issue #424, aiming to let PyFluss users iterate scanner results via the native async-iterator protocol instead of manual polling loops.

Changes:

  • Added a new Python integration test covering async for record in scanner on a record-based LogScanner.
  • Refactored the Rust LogScanner binding to store scanner state behind Arc<tokio::sync::Mutex<_>> with a pending-records buffer for per-record yielding.
  • Implemented __aiter__ / __anext__ in the Rust binding (via future_into_py) to produce awaitable next-items for async iteration.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 11 comments.

File Description
bindings/python/test/test_log_table.py Adds an async-iterator integration test for LogScanner.
bindings/python/src/table.rs Introduces async iterator support and refactors scanner state management for Python bindings.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +2229 to +2246
fn __aiter__<'py>(slf: PyRef<'py, Self>) -> PyResult<Bound<'py, PyAny>> {
let py = slf.py();
let code = pyo3::ffi::c_str!(
r#"
async def _adapter(obj):
while True:
try:
yield await obj.__anext__()
except StopAsyncIteration:
break
"#
);
let globals = pyo3::types::PyDict::new(py);
py.run(code, Some(&globals), None)?;
let adapter = globals.get_item("_adapter")?.unwrap();
// Return adapt(self)
adapter.call1((slf.into_bound_py_any(py)?,))
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__aiter__ recompiles and executes Python source via py.run() on every iteration start. Consider caching the adapter function (e.g., in a PyOnceLock) or returning self directly as the async iterator if possible; this avoids repeated code compilation and reduces overhead per async for loop.

Copilot uses AI. Check for mistakes.
Comment on lines +2254 to +2287
let mut state = state_arc.lock().await;

// 1. If we already have buffered records, pop and return immediately
if let Some(record) = state.pending_records.pop_front() {
return Ok(record.into_any());
}

// 2. Buffer is empty, we must poll the network for the next batch
// The underlying kind must be a Record-based scanner.
let scanner = match state.kind.as_record() {
Ok(s) => s,
Err(_) => {
return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
"Stream Ended",
));
}
};

// Poll with a reasonable internal timeout before unblocking the event loop
let timeout = core::time::Duration::from_millis(5000);

let mut current_records = scanner
.poll(timeout)
.await
.map_err(|e| FlussError::from_core_error(&e))?;

// If it's a real timeout with zero records, loop or throw StopAsyncIteration?
// Since it's a streaming log, we can yield None or block. Blocking requires a loop in the future.
while current_records.is_empty() {
current_records = scanner
.poll(timeout)
.await
.map_err(|e| FlussError::from_core_error(&e))?;
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__anext__ holds state_arc.lock() across scanner.poll(timeout).await (and the retry loop). This blocks all other methods needing the same mutex (e.g., subscribe/unsubscribe/poll/to_arrow) for the full network wait time and can lead to poor responsiveness or deadlock-like behavior under concurrent use. Consider narrowing the critical section (e.g., split locks for kind vs pending_records, or temporarily take/move the scanner out of the state while polling).

Copilot uses AI. Check for mistakes.
Comment on lines +2044 to +2046
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid the unsafe pointer cast when accessing self.state. You can lock the mutex directly via self.state.lock() (or clone the Arc first) without unsafe; the current cast is unnecessary and introduces unsoundness risk if the field type ever changes.

Suggested change
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });

Copilot uses AI. Check for mistakes.
Comment on lines +2381 to +2383
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_latest_offsets() uses the same unsafe cast pattern to lock self.state. Please replace this with a safe lock on self.state (or a cloned Arc) to avoid unnecessary unsafe in the Python bindings.

Suggested change
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });

Copilot uses AI. Check for mistakes.
Comment on lines +2266 to +2267
return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
"Stream Ended",
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__anext__ treats the batch-based scanner variant as end-of-stream (StopAsyncIteration). That will silently terminate async for on scanners created via create_record_batch_log_scanner(), and it also masks the helpful error message from as_record(). Either implement async iteration for the batch variant (yielding RecordBatch/Arrow), or raise a TypeError explaining that async iteration is only supported for record scanners.

Suggested change
return Err(pyo3::exceptions::PyStopAsyncIteration::new_err(
"Stream Ended",
return Err(PyTypeError::new_err(
"Async iteration is only supported for record scanners; \
use create_record_log_scanner() instead.",

Copilot uses AI. Check for mistakes.
Comment on lines +755 to +757
scanner = await table.new_scan().create_log_scanner()
num_buckets = (await admin.get_table_info(table_path)).num_buckets
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only covers async for on a record-based scanner (create_log_scanner()). Since LogScanner can also wrap the batch variant (create_record_batch_log_scanner()), consider adding a companion test for async iteration on the batch scanner (or explicitly asserting that async iteration is unsupported there) so the intended behavior is locked in by tests.

Copilot uses AI. Check for mistakes.
Comment on lines 2096 to 2100
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
let scanner = lock.kind.as_batch()?;

Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as in poll(): please remove the unsafe cast used to get scanner_ref. Lock self.state directly; keeping this unsafe here makes the method harder to reason about and can hide real lifetime/aliasing issues.

Suggested change
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
let scanner = lock.kind.as_batch()?;
let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });
let scanner = lock.kind.as_batch()?;

Copilot uses AI. Check for mistakes.
Comment on lines +2134 to +2136
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unsafe pointer cast when locking self.state in poll_arrow(). This can be expressed safely with self.state.lock().await (via TOKIO_RUNTIME.block_on) and avoids introducing UB hazards.

Suggested change
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let lock = TOKIO_RUNTIME.block_on(async { scanner_ref.lock().await });
let lock = TOKIO_RUNTIME.block_on(async { self.state.lock().await });

Copilot uses AI. Check for mistakes.
Comment on lines +2191 to +2193
let scanner_ref = unsafe {
&*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>)
};
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_arrow() also uses an unsafe cast to access self.state. This should be rewritten to safely clone/borrow self.state and lock it without unsafe to keep the bindings memory-safe.

Suggested change
let scanner_ref = unsafe {
&*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>)
};
let scanner_ref = &self.state;

Copilot uses AI. Check for mistakes.
Comment on lines +2487 to +2488
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_until_offsets() also relies on the unsafe cast to access self.state. This should be refactored to lock self.state safely; keeping unsafe here is especially risky because this method can run for a long time and is on a hot path for to_arrow().

Suggested change
let scanner_ref =
unsafe { &*(&self.state as *const std::sync::Arc<tokio::sync::Mutex<ScannerState>>) };
let scanner_ref = &self.state;

Copilot uses AI. Check for mistakes.
@fresh-borzoni
Copy link
Contributor

fresh-borzoni commented Mar 11, 2026

@qzyu999 Ty, took a look at the approach, have some ideas. PTAL

The scanner is already thread-safe internally (&self on all methods), so the Mutex isn't needed, it just adds locking to every call and forces 5 unsafe pointer casts to work around borrow issues it created. The anext loop is also problematic: it runs inside tokio::spawn, so breaking out of async for leaves it polling forever in the background.

Simpler idea: store the scanner in an Arc, keep existing methods as-is. Add _async_poll(timeout_ms) that does one bounded poll and returns a list. aiter returns a small Python async generator that calls _async_poll and yields records. Break stops the generator naturally, so no leaks, no unsafe, no mutex.

@qzyu999
Copy link
Author

qzyu999 commented Mar 11, 2026

Hi @fresh-borzoni, thanks for the recommendations. I've taken a look and came up with the following changes, PTAL when available:

  1. Replace Arc<tokio::sync::Mutex<ScannerState>> with Arc<ScannerKind>
  2. Remove ScannerState struct and VecDeque buffer
  3. Remove all 6 unsafe pointer casts
  4. Replace __anext__ with _async_poll(timeout_ms) (single bounded poll)
  5. Replace __aiter__ with PyOnceLock-cached Python async generator
  6. Change batch scanner error from StopAsyncIteration to TypeError
  7. Update with_scanner! macro or inline to use &self.kind directly
  8. Add break-safety and batch-scanner-error tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: add async 'for' loop support to LogScanner

3 participants