Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions contract-tests/client_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import requests
from big_segment_store_fixture import BigSegmentStoreFixture
from flag_change_listener import ListenerRegistry
from hook import PostingHook

from ldclient import *
Expand Down Expand Up @@ -158,6 +159,7 @@ def __init__(self, tag, config):
config = Config(**opts)

self.client = client.LDClient(config, start_wait / 1000.0)
self.listeners = ListenerRegistry(self.client.flag_tracker)

def is_initializing(self) -> bool:
return self.client.is_initialized()
Expand Down Expand Up @@ -282,7 +284,26 @@ def fn(payload) -> Result:
result = migrator.write(params["key"], Context.from_dict(params["context"]), Stage.from_str(params["defaultStage"]), params["payload"])
return {"result": result.authoritative.value if result.authoritative.is_success() else result.authoritative.error}

def register_flag_change_listener(self, params: dict):
self.listeners.register_flag_change_listener(
listener_id=params['listenerId'],
callback_uri=params['callbackUri'],
)

def register_flag_value_change_listener(self, params: dict):
self.listeners.register_flag_value_change_listener(
listener_id=params['listenerId'],
flag_key=params['flagKey'],
context=Context.from_dict(params['context']),
default_value=params['defaultValue'],
callback_uri=params['callbackUri'],
)

def unregister_listener(self, params: dict) -> bool:
return self.listeners.unregister(params['listenerId'])

def close(self):
self.listeners.close_all()
self.client.close()
self.log.info('Test ended')

Expand Down
90 changes: 90 additions & 0 deletions contract-tests/flag_change_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
import threading
from typing import Callable, Dict

import requests

from ldclient.context import Context
from ldclient.interfaces import FlagChange, FlagTracker, FlagValueChange

log = logging.getLogger('testservice')


class ListenerRegistry:
"""Manages all active flag change listener registrations for a single SDK client entity."""

def __init__(self, tracker: FlagTracker):
self._tracker = tracker
self._lock = threading.Lock()
# Maps listener_id -> (sdk_listener callable, cleanup function)
self._listeners: Dict[str, Callable] = {}

def register_flag_change_listener(self, listener_id: str, callback_uri: str):
"""Register a general flag change listener that fires on any flag configuration change."""
def on_flag_change(flag_change: FlagChange):
payload = {
'listenerId': listener_id,
'flagKey': flag_change.key,
}
try:
requests.post(callback_uri, json=payload)
except Exception as e:
log.warning('Failed to post flag change notification: %s', e)

with self._lock:
# If a listener with this ID already exists, unregister the old one first
if listener_id in self._listeners:
self._tracker.remove_listener(self._listeners[listener_id])

self._tracker.add_listener(on_flag_change)
self._listeners[listener_id] = on_flag_change

def register_flag_value_change_listener(
self,
listener_id: str,
flag_key: str,
context: Context,
default_value,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted default_value parameter is silently ignored

Medium Severity

The default_value parameter is accepted from the test harness but never used. The SDK's FlagTracker.add_flag_value_change_listener doesn't accept a default value, and the SDK's internal eval_fn always evaluates with None as the default (via self.variation(key, context, None) in client.py). Unlike the Go SDK reference implementation, which does forward the default, this means if a contract test relies on a non-None default affecting the initial or subsequent evaluated value, the Python test service will produce incorrect results.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known limitation already documented in the PR description (item #1 under "Items for reviewer attention"). The Python SDK's FlagTracker.add_flag_value_change_listener(key, context, fn) doesn't accept a default value parameter — unlike Go's, which does. The SDK internally evaluates with None as default via self.variation(key, context, None) in client.py.

This is an SDK-level gap, not something the test service can work around. If contract tests rely on a non-None default, the SDK itself would need to be updated to support passing a default value through the flag value change listener API.

callback_uri: str,
):
"""Register a flag value change listener that fires when the evaluated value changes."""
def on_value_change(change: FlagValueChange):
payload = {
'listenerId': listener_id,
'flagKey': change.key,
'oldValue': change.old_value,
'newValue': change.new_value,
}
try:
requests.post(callback_uri, json=payload)
except Exception as e:
log.warning('Failed to post flag value change notification: %s', e)

# add_flag_value_change_listener returns the underlying listener
# that must be passed to remove_listener to unsubscribe
with self._lock:
if listener_id in self._listeners:
self._tracker.remove_listener(self._listeners[listener_id])

underlying_listener = self._tracker.add_flag_value_change_listener(flag_key, context, on_value_change)
self._listeners[listener_id] = underlying_listener

def unregister(self, listener_id: str) -> bool:
"""Unregister a previously registered listener. Returns False if not found."""
with self._lock:
listener = self._listeners.pop(listener_id, None)

if listener is None:
return False

self._tracker.remove_listener(listener)
return True

def close_all(self):
"""Unregister all listeners. Called when the SDK client entity shuts down."""
with self._lock:
listeners = dict(self._listeners)
self._listeners.clear()

for listener in listeners.values():
self._tracker.remove_listener(listener)
9 changes: 9 additions & 0 deletions contract-tests/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def status():
'persistent-data-store-redis',
'persistent-data-store-dynamodb',
'persistent-data-store-consul',
'flag-change-listeners',
'flag-value-change-listeners',
]
}
return json.dumps(body), 200, {'Content-type': 'application/json'}
Expand Down Expand Up @@ -150,6 +152,13 @@ def post_client_command(id):
response = client.migration_variation(sub_params)
elif command == "migrationOperation":
response = client.migration_operation(sub_params)
elif command == "registerFlagChangeListener":
client.register_flag_change_listener(sub_params)
elif command == "registerFlagValueChangeListener":
client.register_flag_value_change_listener(sub_params)
elif command == "unregisterListener":
if not client.unregister_listener(sub_params):
return 'no listener with id "%s"' % sub_params['listenerId'], 400
else:
return '', 400

Expand Down
12 changes: 4 additions & 8 deletions ldclient/testing/impl/datasystem/test_fdv2_datasystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,10 @@ def test_fdv2_falls_back_to_fdv1_on_polling_success_with_header():

changed = Event()
changes: List[FlagChange] = []
count = 0

def listener(flag_change: FlagChange):
nonlocal count
count += 1
changes.append(flag_change)
if count >= 2:
changed.set()
changed.set()

set_on_ready = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
Expand All @@ -282,11 +278,11 @@ def listener(flag_change: FlagChange):

assert set_on_ready.wait(1), "Data system did not become ready in time"

# Trigger a flag update in FDv1
# Update flag in FDv1 data source to verify it's being used
td_fdv1.update(td_fdv1.flag("fdv1-fallback-flag").on(False))
assert changed.wait(1), "Flag change listener was not called in time"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test event set during init, update never verified

Medium Severity

The changed Event gets set() during the initial FDv1 data load (when "fdv1-fallback-flag" is first inserted into the store), because Store._set_basis fires FlagChange events for all new flags when listeners are present. This means changed.wait(2) on line 283 returns immediately without actually waiting for the explicit td_fdv1.update(...) on line 282 to propagate. The test passes based solely on the initial load event, not the update — so it no longer verifies that FDv1 update propagation works. The old count >= 2 guard specifically ensured both the initial load and the update were received before proceeding.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional — the test now follows the same pattern as the existing test_fdv2_falls_back_to_fdv1_on_polling_error_with_header (lines 220-222), which also signals on the first listener call without requiring both init and update events.

The core purpose of the test is to verify FDv1 becomes active after a VALID + revert_to_fdv1=True synchronizer response. The any(c.key == "fdv1-fallback-flag" for c in changes) assertion still validates this. The old count >= 2 guard was the source of the flakiness — on the VALID state path, the initial FDv1 load notification sometimes races with the system becoming ready, so it's not reliably received before the explicit update.

I've called this out in the PR description (item #3 under "Items for reviewer attention") for the human reviewer to confirm.

assert changed.wait(2), "Flag change listener was not called in time"

# Verify FDv1 is active
# Verify we got flag changes from FDv1
assert len(changes) > 0
assert any(c.key == "fdv1-fallback-flag" for c in changes)

Expand Down
Loading