Skip to content

Merge Streaming Client changes from Adam Hicks#76

Open
YinYin-blip wants to merge 11 commits intoluno:mainfrom
YinYin-blip:master
Open

Merge Streaming Client changes from Adam Hicks#76
YinYin-blip wants to merge 11 commits intoluno:mainfrom
YinYin-blip:master

Conversation

@YinYin-blip
Copy link

@YinYin-blip YinYin-blip commented Mar 23, 2026

Issue: #75

This code create an example streaming client that successfully creates and manages the web-socket connection

Summary by CodeRabbit

  • New Features

    • Added asynchronous streaming for real-time order book and market state updates.
    • Included a runnable example demonstrating the streaming client.
    • Added utilities to format and present market/orderbook state.
  • Documentation

    • Updated examples and docs to use Python 3 print syntax.

@coderabbitai
Copy link

coderabbitai bot commented Mar 23, 2026

📝 Walkthrough

Walkthrough

The PR updates Python 2-style print usage to Python 3 syntax in docs/README and client examples, adds a runnable example script (examples/stream.py), introduces luno_python/api_types.py with namedtuple types (Order, MarketState, Pair), formatting helpers (format_orderbook, format_state), and adds luno_python/stream_client.py, an async websocket streaming client that authenticates, maintains orderbook state with sequence validation, emits snapshots/updates via a callback, and sends periodic keepalive messages.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 I hop on websockets, ears held high,
I munch on orders as they fly,
Keepalives thump — a steady beat,
Snapshots tumble, tidy and neat,
Python three carrots, fresh and spry. 🥕

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Merge Streaming Client changes from Adam Hicks' is vague and generic, referencing an author merge rather than describing the actual technical changes being introduced. Revise the title to describe the technical changes, e.g., 'Add streaming client implementation for orderbook updates' or 'Implement WebSocket streaming client with order updates'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description is related to the changeset as it mentions creating an example streaming client that manages WebSocket connections, which aligns with the actual changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
examples/stream.py (1)

11-18: Use asyncio.run() instead of deprecated get_event_loop().run_until_complete().

asyncio.get_event_loop() is deprecated since Python 3.10 and emits a DeprecationWarning when called from the main thread without a running event loop. Use asyncio.run() for cleaner, forward-compatible code.

Proposed fix
-asyncio.get_event_loop().run_until_complete(
+asyncio.run(
     stream_market(
         pair="XBTZAR",
         api_key_id="", # API Key goes here
         api_key_secret="", # and API Secret goes here
         update_callback=handle_update,
     )
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/stream.py` around lines 11 - 18, Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.
luno_python/stream_client.py (2)

39-41: Use a specific exception type for initialisation errors.

As flagged by static analysis, replace the generic Exception with a more descriptive custom exception.

Proposed fix
+class StreamInitException(Exception):
+    pass
+
 class _MarketStreamState:
     def __init__(self, first: dict):
         if first is None:
-            raise Exception("Unable to use empty message to initialise market state")
+            raise StreamInitException("Unable to use empty message to initialise market state")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 39 - 41, The constructor
StreamClient.__init__ currently raises a generic Exception when given a None
initial message; define and use a specific exception type (e.g.,
InvalidInitialMessageError or InitializationError) in the module and replace the
raise Exception(...) with raise InvalidInitialMessageError("Unable to use empty
message to initialise market state") (or similar message) so callers can catch
this specific error; ensure the new exception class inherits from Exception and
is declared at module level near other exceptions or top of stream_client.py.

179-180: Use a specific exception type for validation errors.

As flagged by static analysis, replace the generic Exception. Also note that the 6-character assumption may not hold for all currency pairs if Luno introduces currencies with different code lengths.

Proposed fix
+class InvalidPairException(Exception):
+    pass
+
 async def stream_market(
         pair: str,
         ...
 ):
     if len(pair) != 6:
-        raise Exception("Invalid pair")
+        raise InvalidPairException(f"Invalid pair: {pair}. Expected 6-character pair code.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 179 - 180, Replace the generic
Exception raised for an invalid pair with a specific validation exception and
relax the hard-coded 6-char assumption: validate the pair variable using a
pattern (e.g. import re and use re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a
custom rule that allows variable-length currency codes, and raise a ValueError
or a new InvalidPairError instead of Exception; update the check around pair in
stream_client.py (the block that currently does if len(pair) != 6: raise
Exception("Invalid pair")) to use the new validation and exception.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Around line 198-201: The current asyncio.gather call launching
_read_from_websocket(websocket, p, update_callback) and
_write_keep_alive(websocket) can leave the keep-alive task running if the reader
raises; change this to create managed tasks and cancel the other when one fails:
either use asyncio.TaskGroup (Python 3.11+) to run both _read_from_websocket and
_write_keep_alive so the group cancels remaining tasks on exception, or for
older Python create two tasks via asyncio.create_task, await asyncio.wait(...,
return_when=FIRST_EXCEPTION), detect the first finished task raising, cancel the
pending task(s) (e.g., the keep-alive), and await their cancellation; apply
these changes around the code that currently calls asyncio.gather so
_write_keep_alive is reliably stopped when _read_from_websocket fails.
- Around line 137-140: The current try/except around json.loads(message) should
preserve the original traceback and raise a more specific error: change the
except ValueError handler to catch json.JSONDecodeError (or "except ValueError
as e" if JSONDecodeError isn't imported) and re-raise a specific exception while
chaining the original (e.g. raise ValueError(f"Failed to parse JSON message:
{message}") from e or raise a custom ParseError from e); update the handler that
surrounds the json.loads call (the body = json.loads(message) block) to use "as
e" and include "from e" when re-raising so the original exception is preserved.

---

Nitpick comments:
In `@examples/stream.py`:
- Around line 11-18: Replace the deprecated
asyncio.get_event_loop().run_until_complete(...) call with asyncio.run(...) by
invoking asyncio.run(stream_market(...)) to run the top-level coroutine;
specifically update the call that currently wraps stream_market(pair="XBTZAR",
api_key_id="", api_key_secret="", update_callback=handle_update) so it uses
asyncio.run and otherwise passes the same arguments to stream_market and keeps
handle_update as the callback.

In `@luno_python/stream_client.py`:
- Around line 39-41: The constructor StreamClient.__init__ currently raises a
generic Exception when given a None initial message; define and use a specific
exception type (e.g., InvalidInitialMessageError or InitializationError) in the
module and replace the raise Exception(...) with raise
InvalidInitialMessageError("Unable to use empty message to initialise market
state") (or similar message) so callers can catch this specific error; ensure
the new exception class inherits from Exception and is declared at module level
near other exceptions or top of stream_client.py.
- Around line 179-180: Replace the generic Exception raised for an invalid pair
with a specific validation exception and relax the hard-coded 6-char assumption:
validate the pair variable using a pattern (e.g. import re and use
re.match(r'^[A-Z]{3,4}[A-Z]{3,4}$', pair)) or a custom rule that allows
variable-length currency codes, and raise a ValueError or a new InvalidPairError
instead of Exception; update the check around pair in stream_client.py (the
block that currently does if len(pair) != 6: raise Exception("Invalid pair")) to
use the new validation and exception.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a089ae67-fb3d-4396-a002-6fa13a8342e6

📥 Commits

Reviewing files that changed from the base of the PR and between 838df1f and c2393a5.

📒 Files selected for processing (5)
  • README.md
  • examples/stream.py
  • luno_python/api_types.py
  • luno_python/client.py
  • luno_python/stream_client.py

Comment on lines +198 to +201
await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle task cancellation when one coroutine fails.

If _read_from_websocket raises an exception (e.g., OutOfOrderMessageException), the _write_keep_alive task will continue running indefinitely. Consider using asyncio.TaskGroup (Python 3.11+) or wrapping with explicit cancellation logic.

Proposed fix for Python 3.11+
-        await asyncio.gather(
-            _read_from_websocket(websocket, p, update_callback),
-            _write_keep_alive(websocket),
-        )
+        async with asyncio.TaskGroup() as tg:
+            tg.create_task(_read_from_websocket(websocket, p, update_callback))
+            tg.create_task(_write_keep_alive(websocket))
Alternative fix for older Python versions
+        reader_task = asyncio.create_task(_read_from_websocket(websocket, p, update_callback))
+        keepalive_task = asyncio.create_task(_write_keep_alive(websocket))
+        try:
+            await reader_task
+        finally:
+            keepalive_task.cancel()
+            try:
+                await keepalive_task
+            except asyncio.CancelledError:
+                pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await asyncio.gather(
_read_from_websocket(websocket, p, update_callback),
_write_keep_alive(websocket),
)
async with asyncio.TaskGroup() as tg:
tg.create_task(_read_from_websocket(websocket, p, update_callback))
tg.create_task(_write_keep_alive(websocket))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 198 - 201, The current
asyncio.gather call launching _read_from_websocket(websocket, p,
update_callback) and _write_keep_alive(websocket) can leave the keep-alive task
running if the reader raises; change this to create managed tasks and cancel the
other when one fails: either use asyncio.TaskGroup (Python 3.11+) to run both
_read_from_websocket and _write_keep_alive so the group cancels remaining tasks
on exception, or for older Python create two tasks via asyncio.create_task,
await asyncio.wait(..., return_when=FIRST_EXCEPTION), detect the first finished
task raising, cancel the pending task(s) (e.g., the keep-alive), and await their
cancellation; apply these changes around the code that currently calls
asyncio.gather so _write_keep_alive is reliably stopped when
_read_from_websocket fails.

John Groenewald and others added 2 commits March 25, 2026 14:28
@sonarqubecloud
Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@luno_python/stream_client.py`:
- Line 177: The docstring for the parameter named "pair" currently describes an
amount instead of the market pair; update the docstring in
luno_python/stream_client.py for the function/method that accepts the parameter
"pair" so it correctly describes it as the market trading pair (e.g., "market
pair in the format 'BASE/QUOTE' or 'XBTZAR'") and remove the incorrect "Amount
to buy or sell..." wording; locate the docstring containing ":param pair:" and
replace the description with a concise, accurate explanation referencing the
expected format and purpose of the pair parameter.
- Around line 145-152: The first-frame keepalive is being treated as an empty
body and causing initialization to fail; change the handling so that if body ==
"" and is_first is True you skip the frame (continue) instead of setting body =
None and initializing. Concretely, in the loop containing body, is_first,
_MarketStreamState and update_f, move or add a guard: if body == "" and
is_first: continue; then keep the existing behavior for non-first keepalives
(body = None) and the existing is_first initialization path that constructs
_MarketStreamState and calls update_f(pair, state.get_snapshot(), None).
- Line 18: Update the StateUpdate type alias to accept None for the third
parameter by changing its annotation from dict to Optional[dict] and ensure
Optional is imported from typing; i.e., modify StateUpdate = Callable[[Pair,
MarketState, dict], None] to StateUpdate = Callable[[Pair, MarketState,
Optional[dict]], None] and add the necessary typing import so callbacks that
receive None (where None is passed) type-check correctly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0b0c56e0-b626-4159-8ece-551d392ef2cf

📥 Commits

Reviewing files that changed from the base of the PR and between c2393a5 and dd6ef14.

📒 Files selected for processing (1)
  • luno_python/stream_client.py


DEFAULT_URL = "wss://ws.luno.com"

StateUpdate = Callable[[Pair, MarketState, dict], None]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the stream_client.py file structure
wc -l luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 92


🏁 Script executed:

# Read the area around line 18 to see the StateUpdate definition
sed -n '1,30p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 764


🏁 Script executed:

# Check lines 151 and 157 as mentioned
sed -n '145,165p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 561


🏁 Script executed:

# Search for where StateUpdate callbacks are invoked with None
rg -n "StateUpdate|state_update" luno_python/stream_client.py -A 3

Repository: luno/luno-python

Length of output: 659


🏁 Script executed:

# Search for callback invocations with None as third argument
rg -n "callback\(" luno_python/stream_client.py -B 2 -A 2

Repository: luno/luno-python

Length of output: 42


🏁 Script executed:

# Search for all update_f invocations to see all the places where callbacks are called
rg -n "update_f\(" luno_python/stream_client.py -B 1 -A 1

Repository: luno/luno-python

Length of output: 260


🏁 Script executed:

# Also check for update_callback invocations
rg -n "update_callback\(" luno_python/stream_client.py -B 1 -A 1

Repository: luno/luno-python

Length of output: 42


🏁 Script executed:

# Let me see the complete context around the callback invocations
sed -n '135,165p' luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 857


🏁 Script executed:

# Check what imports are at the top of the file
head -20 luno_python/stream_client.py

Repository: luno/luno-python

Length of output: 580


Correct StateUpdate typing to accept None as the third parameter.

The callback's third argument is typed as dict, but None is explicitly passed at line 151 and can be None at line 157 when body is empty. Change the type annotation to Optional[dict]:

Required change
-from typing import Callable, Dict, List
+from typing import Callable, Dict, List, Optional
 
 ...
-StateUpdate = Callable[[Pair, MarketState, dict], None]
+StateUpdate = Callable[[Pair, MarketState, Optional[dict]], None]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` at line 18, Update the StateUpdate type alias
to accept None for the third parameter by changing its annotation from dict to
Optional[dict] and ensure Optional is imported from typing; i.e., modify
StateUpdate = Callable[[Pair, MarketState, dict], None] to StateUpdate =
Callable[[Pair, MarketState, Optional[dict]], None] and add the necessary typing
import so callbacks that receive None (where None is passed) type-check
correctly.

Comment on lines +145 to +152
if body == "": # Empty update, used as keepalive
body = None

if is_first:
is_first = False
state = _MarketStreamState(body)
update_f(pair, state.get_snapshot(), None)
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ignore keepalive frames until initial snapshot is received.

If the first frame is a keepalive (""), body becomes None and initialisation fails immediately. Skipping keepalives before initial state avoids false start-up failures.

Proposed guard
         if body == "": # Empty update, used as keepalive
             body = None
+
+        if body is None and is_first:
+            continue
 
         if is_first:
             is_first = False
             state = _MarketStreamState(body)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` around lines 145 - 152, The first-frame
keepalive is being treated as an empty body and causing initialization to fail;
change the handling so that if body == "" and is_first is True you skip the
frame (continue) instead of setting body = None and initializing. Concretely, in
the loop containing body, is_first, _MarketStreamState and update_f, move or add
a guard: if body == "" and is_first: continue; then keep the existing behavior
for non-first keepalives (body = None) and the existing is_first initialization
path that constructs _MarketStreamState and calls update_f(pair,
state.get_snapshot(), None).


Stream orderbook information and maintain an orderbook state.

:param pair: str Amount to buy or sell in the pair base currency.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Correct the pair docstring description.

The current text describes an amount, not a market pair, which is misleading for users of this API.

Docstring fix
-        :param pair: str Amount to buy or sell in the pair base currency.
+        :param pair: str Currency pair code (for example, "XBTZAR").
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
:param pair: str Amount to buy or sell in the pair base currency.
:param pair: str Currency pair code (for example, "XBTZAR").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@luno_python/stream_client.py` at line 177, The docstring for the parameter
named "pair" currently describes an amount instead of the market pair; update
the docstring in luno_python/stream_client.py for the function/method that
accepts the parameter "pair" so it correctly describes it as the market trading
pair (e.g., "market pair in the format 'BASE/QUOTE' or 'XBTZAR'") and remove the
incorrect "Amount to buy or sell..." wording; locate the docstring containing
":param pair:" and replace the description with a concise, accurate explanation
referencing the expected format and purpose of the pair parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants