Merge Streaming Client changes from Adam Hicks#76
Merge Streaming Client changes from Adam Hicks#76YinYin-blip wants to merge 11 commits intoluno:mainfrom
Conversation
As recommended by reviewer comment
Streamingapi
Streamingapi
📝 WalkthroughWalkthroughThe PR updates Python 2-style Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
examples/stream.py (1)
11-18: Useasyncio.run()instead of deprecatedget_event_loop().run_until_complete().
asyncio.get_event_loop()is deprecated since Python 3.10 and emits aDeprecationWarningwhen called from the main thread without a running event loop. Useasyncio.run()for cleaner, forward-compatible code.Proposed fix
-asyncio.get_event_loop().run_until_complete( +asyncio.run( stream_market( pair="XBTZAR", api_key_id="", # API Key goes here api_key_secret="", # and API Secret goes here update_callback=handle_update, ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/stream.py` around lines 11 - 18, Replace the deprecated asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by invoking asyncio.run(stream_market(...)) to run the top-level coroutine; specifically update the call that currently wraps stream_market(pair="XBTZAR", api_key_id="", api_key_secret="", update_callback=handle_update) so it uses asyncio.run and otherwise passes the same arguments to stream_market and keeps handle_update as the callback.luno_python/stream_client.py (2)
39-41: Use a specific exception type for initialisation errors.As flagged by static analysis, replace the generic
Exceptionwith a more descriptive custom exception.Proposed fix
+class StreamInitException(Exception): + pass + class _MarketStreamState: def __init__(self, first: dict): if first is None: - raise Exception("Unable to use empty message to initialise market state") + raise StreamInitException("Unable to use empty message to initialise market state")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@luno_python/stream_client.py` around lines 39 - 41, The constructor StreamClient.__init__ currently raises a generic Exception when given a None initial message; define and use a specific exception type (e.g., InvalidInitialMessageError or InitializationError) in the module and replace the raise Exception(...) with raise InvalidInitialMessageError("Unable to use empty message to initialise market state") (or similar message) so callers can catch this specific error; ensure the new exception class inherits from Exception and is declared at module level near other exceptions or top of stream_client.py.
179-180: Use a specific exception type for validation errors.As flagged by static analysis, replace the generic
Exception. Also note that the 6-character assumption may not hold for all currency pairs if Luno introduces currencies with different code lengths.Proposed fix
+class InvalidPairException(Exception): + pass + async def stream_market( pair: str, ... ): if len(pair) != 6: - raise Exception("Invalid pair") + raise InvalidPairException(f"Invalid pair: {pair}. Expected 6-character pair code.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@luno_python/stream_client.py` around lines 179 - 180, Replace the generic Exception raised for an invalid pair with a specific validation exception and relax the hard-coded 6-char assumption: validate the pair variable using a pattern (e.g. import re and use re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows variable-length currency codes, and raise a ValueError or a new InvalidPairError instead of Exception; update the check around pair in stream_client.py (the block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to use the new validation and exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@luno_python/stream_client.py`:
- Around line 198-201: The current asyncio.gather call launching
_read_from_websocket(websocket, p, update_callback) and
_write_keep_alive(websocket) can leave the keep-alive task running if the reader
raises; change this to create managed tasks and cancel the other when one fails:
either use asyncio.TaskGroup (Python 3.11+) to run both _read_from_websocket and
_write_keep_alive so the group cancels remaining tasks on exception, or for
older Python create two tasks via asyncio.create_task, await asyncio.wait(...,
return_when=FIRST_EXCEPTION), detect the first finished task raising, cancel the
pending task(s) (e.g., the keep-alive), and await their cancellation; apply
these changes around the code that currently calls asyncio.gather so
_write_keep_alive is reliably stopped when _read_from_websocket fails.
- Around line 137-140: The current try/except around json.loads(message) should
preserve the original traceback and raise a more specific error: change the
except ValueError handler to catch json.JSONDecodeError (or "except ValueError
as e" if JSONDecodeError isn't imported) and re-raise a specific exception while
chaining the original (e.g. raise ValueError(f"Failed to parse JSON message:
{message}") from e or raise a custom ParseError from e); update the handler that
surrounds the json.loads call (the body = json.loads(message) block) to use "as
e" and include "from e" when re-raising so the original exception is preserved.
---
Nitpick comments:
In `@examples/stream.py`:
- Around line 11-18: Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.
In `@luno_python/stream_client.py`:
- Around line 39-41: The constructor StreamClient.__init__ currently raises a
generic Exception when given a None initial message; define and use a specific
exception type (e.g., InvalidInitialMessageError or InitializationError) in the
module and replace the raise Exception(...) with raise
InvalidInitialMessageError("Unable to use empty message to initialise market
state") (or similar message) so callers can catch this specific error; ensure
the new exception class inherits from Exception and is declared at module level
near other exceptions or top of stream_client.py.
- Around line 179-180: Replace the generic Exception raised for an invalid pair
with a specific validation exception and relax the hard-coded 6-char assumption:
validate the pair variable using a pattern (e.g. import re and use
re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows
variable-length currency codes, and raise a ValueError or a new InvalidPairError
instead of Exception; update the check around pair in stream_client.py (the
block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to
use the new validation and exception.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a089ae67-fb3d-4396-a002-6fa13a8342e6
📒 Files selected for processing (5)
README.mdexamples/stream.pyluno_python/api_types.pyluno_python/client.pyluno_python/stream_client.py
| await asyncio.gather( | ||
| _read_from_websocket(websocket, p, update_callback), | ||
| _write_keep_alive(websocket), | ||
| ) |
There was a problem hiding this comment.
Handle task cancellation when one coroutine fails.
If _read_from_websocket raises an exception (e.g., OutOfOrderMessageException), the _write_keep_alive task will continue running indefinitely. Consider using asyncio.TaskGroup (Python 3.11+) or wrapping with explicit cancellation logic.
Proposed fix for Python 3.11+
- await asyncio.gather(
- _read_from_websocket(websocket, p, update_callback),
- _write_keep_alive(websocket),
- )
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(_read_from_websocket(websocket, p, update_callback))
+ tg.create_task(_write_keep_alive(websocket))Alternative fix for older Python versions
+ reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+ keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+ try:
+ await reader_task
+ finally:
+ keepalive_task.cancel()
+ try:
+ await keepalive_task
+ except asyncio.CancelledError:
+ pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await asyncio.gather( | |
| _read_from_websocket(websocket, p, update_callback), | |
| _write_keep_alive(websocket), | |
| ) | |
| async with asyncio.TaskGroup() as tg: | |
| tg.create_task(_read_from_websocket(websocket, p, update_callback)) | |
| tg.create_task(_write_keep_alive(websocket)) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` around lines 198 - 201, The current
asyncio.gather call launching _read_from_websocket(websocket, p,
update_callback) and _write_keep_alive(websocket) can leave the keep-alive task
running if the reader raises; change this to create managed tasks and cancel the
other when one fails: either use asyncio.TaskGroup (Python 3.11+) to run both
_read_from_websocket and _write_keep_alive so the group cancels remaining tasks
on exception, or for older Python create two tasks via asyncio.create_task,
await asyncio.wait(..., return_when=FIRST_EXCEPTION), detect the first finished
task raising, cancel the pending task(s) (e.g., the keep-alive), and await their
cancellation; apply these changes around the code that currently calls
asyncio.gather so _write_keep_alive is reliably stopped when
_read_from_websocket fails.
improved general exceptions to be more specific
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@luno_python/stream_client.py`:
- Line 177: The docstring for the parameter named "pair" currently describes an
amount instead of the market pair; update the docstring in
luno_python/stream_client.py for the function/method that accepts the parameter
"pair" so it correctly describes it as the market trading pair (e.g., "market
pair in the format 'BASE/QUOTE' or 'XBTZAR'") and remove the incorrect "Amount
to buy or sell..." wording; locate the docstring containing ":param pair:" and
replace the description with a concise, accurate explanation referencing the
expected format and purpose of the pair parameter.
- Around line 145-152: The first-frame keepalive is being treated as an empty
body and causing initialization to fail; change the handling so that if body ==
"" and is_first is True you skip the frame (continue) instead of setting body =
None and initializing. Concretely, in the loop containing body, is_first,
_MarketStreamState and update_f, move or add a guard: if body == "" and
is_first: continue; then keep the existing behavior for non-first keepalives
(body = None) and the existing is_first initialization path that constructs
_MarketStreamState and calls update_f(pair, state.get_snapshot(), None).
- Line 18: Update the StateUpdate type alias to accept None for the third
parameter by changing its annotation from dict to Optional[dict] and ensure
Optional is imported from typing; i.e., modify StateUpdate = Callable[[Pair,
MarketState, dict], None] to StateUpdate = Callable[[Pair, MarketState,
Optional[dict]], None] and add the necessary typing import so callbacks that
receive None (where None is passed) type-check correctly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0b0c56e0-b626-4159-8ece-551d392ef2cf
📒 Files selected for processing (1)
luno_python/stream_client.py
|
|
||
| DEFAULT_URL = "wss://ws.luno.com" | ||
|
|
||
| StateUpdate = Callable[[Pair, MarketState, dict], None] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the stream_client.py file structure
wc -l luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 92
🏁 Script executed:
# Read the area around line 18 to see the StateUpdate definition
sed -n '1,30p' luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 764
🏁 Script executed:
# Check lines 151 and 157 as mentioned
sed -n '145,165p' luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 561
🏁 Script executed:
# Search for where StateUpdate callbacks are invoked with None
rg -n "StateUpdate|state_update" luno_python/stream_client.py -A 3Repository: luno/luno-python
Length of output: 659
🏁 Script executed:
# Search for callback invocations with None as third argument
rg -n "callback\(" luno_python/stream_client.py -B 2 -A 2Repository: luno/luno-python
Length of output: 42
🏁 Script executed:
# Search for all update_f invocations to see all the places where callbacks are called
rg -n "update_f\(" luno_python/stream_client.py -B 1 -A 1Repository: luno/luno-python
Length of output: 260
🏁 Script executed:
# Also check for update_callback invocations
rg -n "update_callback\(" luno_python/stream_client.py -B 1 -A 1Repository: luno/luno-python
Length of output: 42
🏁 Script executed:
# Let me see the complete context around the callback invocations
sed -n '135,165p' luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 857
🏁 Script executed:
# Check what imports are at the top of the file
head -20 luno_python/stream_client.pyRepository: luno/luno-python
Length of output: 580
Correct StateUpdate typing to accept None as the third parameter.
The callback's third argument is typed as dict, but None is explicitly passed at line 151 and can be None at line 157 when body is empty. Change the type annotation to Optional[dict]:
Required change
-from typing import Callable, Dict, List
+from typing import Callable, Dict, List, Optional
...
-StateUpdate = Callable[[Pair, MarketState, dict], None]
+StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` at line 18, Update the StateUpdate type alias
to accept None for the third parameter by changing its annotation from dict to
Optional[dict] and ensure Optional is imported from typing; i.e., modify
StateUpdate = Callable[[Pair, MarketState, dict], None] to StateUpdate =
Callable[[Pair, MarketState, Optional[dict]], None] and add the necessary typing
import so callbacks that receive None (where None is passed) type-check
correctly.
| if body == "": # Empty update, used as keepalive | ||
| body = None | ||
|
|
||
| if is_first: | ||
| is_first = False | ||
| state = _MarketStreamState(body) | ||
| update_f(pair, state.get_snapshot(), None) | ||
| continue |
There was a problem hiding this comment.
Ignore keepalive frames until initial snapshot is received.
If the first frame is a keepalive (""), body becomes None and initialisation fails immediately. Skipping keepalives before initial state avoids false start-up failures.
Proposed guard
if body == "": # Empty update, used as keepalive
body = None
+
+ if body is None and is_first:
+ continue
if is_first:
is_first = False
state = _MarketStreamState(body)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` around lines 145 - 152, The first-frame
keepalive is being treated as an empty body and causing initialization to fail;
change the handling so that if body == "" and is_first is True you skip the
frame (continue) instead of setting body = None and initializing. Concretely, in
the loop containing body, is_first, _MarketStreamState and update_f, move or add
a guard: if body == "" and is_first: continue; then keep the existing behavior
for non-first keepalives (body = None) and the existing is_first initialization
path that constructs _MarketStreamState and calls update_f(pair,
state.get_snapshot(), None).
|
|
||
| Stream orderbook information and maintain an orderbook state. | ||
|
|
||
| :param pair: str Amount to buy or sell in the pair base currency. |
There was a problem hiding this comment.
Correct the pair docstring description.
The current text describes an amount, not a market pair, which is misleading for users of this API.
Docstring fix
- :param pair: str Amount to buy or sell in the pair base currency.
+ :param pair: str Currency pair code (for example, "XBTZAR").📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| :param pair: str Amount to buy or sell in the pair base currency. | |
| :param pair: str Currency pair code (for example, "XBTZAR"). |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@luno_python/stream_client.py` at line 177, The docstring for the parameter
named "pair" currently describes an amount instead of the market pair; update
the docstring in luno_python/stream_client.py for the function/method that
accepts the parameter "pair" so it correctly describes it as the market trading
pair (e.g., "market pair in the format 'BASE/QUOTE' or 'XBTZAR'") and remove the
incorrect "Amount to buy or sell..." wording; locate the docstring containing
":param pair:" and replace the description with a concise, accurate explanation
referencing the expected format and purpose of the pair parameter.



Issue: #75
This code create an example streaming client that successfully creates and manages the web-socket connection
Summary by CodeRabbit
New Features
Documentation