Skip to content

API Reference

rlstatsapi.client

Core async TCP client for Rocket League's live Stats exporter.

This module handles socket lifecycle, JSON stream framing, dispatching to user handlers, reconnect strategy, and queue backpressure behavior.

ConnectionState

Bases: Enum

Granular connection lifecycle state for a StatsClient.

Source code in src/rlstatsapi/client.py
53
54
55
56
57
58
59
class ConnectionState(Enum):
    """Granular connection lifecycle state for a StatsClient."""

    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    FAILED = "failed"

StatsClient

Event client for Rocket League Stats API.

Source code in src/rlstatsapi/client.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
class StatsClient:
    """Event client for Rocket League Stats API."""

    def __init__(
        self,
        host: str = "127.0.0.1",
        port: int = 49123,
        reconnect: bool = True,
        reconnect_delay: float = 0.5,
        max_reconnect_delay: float = 30.0,
        max_reconnect_attempts: int | None = None,
        include_raw: bool = False,
        queue_size: int = 2048,
        overflow: Literal["block", "drop", "raise"] = "block",
        connect_timeout: float = 5.0,
        drain_on_disconnect: bool = False,
        handler_timeout: float | None = None,
    ) -> None:
        """Configure the client.

        Does not connect until ``connect()`` or async context manager entry.

        Args:
            host: Stats API host address.
            port: Stats API TCP port.
            reconnect: Whether to reconnect automatically on connection loss.
            reconnect_delay: Initial delay in seconds before the first reconnect attempt.
            max_reconnect_delay: Upper bound for exponential backoff delay.
            max_reconnect_attempts: Stop reconnecting after this many failures.
                None means unlimited.
            include_raw: Attach the original JSON string to each ``EventMessage.raw``.
            queue_size: Max events buffered in the internal queue.
            overflow: Queue-full behavior ``"block"`` waits, ``"drop"`` discards,
                ``"raise"`` kills the connection.
            connect_timeout: Seconds to wait for TCP handshake before raising.
            drain_on_disconnect: Clear the queue when the session ends.
            handler_timeout: Max seconds an async handler may run before being cancelled.
                Sync handlers are not affected. None disables the timeout.
        """
        if reconnect_delay <= 0:
            raise ValueError("reconnect_delay must be positive")
        self.host = host
        self.port = port
        self.reconnect = reconnect
        self.reconnect_delay = reconnect_delay
        self.max_reconnect_delay = max_reconnect_delay
        self.max_reconnect_attempts = max_reconnect_attempts
        self.include_raw = include_raw
        self.connect_timeout = connect_timeout
        self.overflow = overflow
        self.drain_on_disconnect = drain_on_disconnect
        self.handler_timeout = handler_timeout

        self._queue: asyncio.Queue[EventMessage] = asyncio.Queue(maxsize=queue_size)
        self._handlers_by_event: dict[str, list[_AnyCallable]] = defaultdict(list)
        self._handlers_any: list[_AnyCallable] = []
        self._on_connect_handlers: list[_SimpleHandler] = []
        self._on_disconnect_handlers: list[_SimpleHandler] = []
        self._error_handlers: list[_ErrorHandler] = []
        self._logger = logging.getLogger("rlstatsapi")

        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._reader_task: asyncio.Task[None] | None = None
        self._stopping = False
        self._connection_state = ConnectionState.DISCONNECTED
        self._permanently_failed = False
        self._last_error: Exception | None = None
        self._metrics = ClientMetrics()
        self._state_tracker = MatchStateTracker()

    @property
    def connection_state(self) -> ConnectionState:
        """Return the current connection lifecycle state."""
        return self._connection_state

    @property
    def is_connected(self) -> bool:
        """Return True if the client is currently connected to the Stats API."""
        return self._connection_state == ConnectionState.CONNECTED

    @property
    def metrics(self) -> ClientMetrics:
        """Return read-only-style counters for throughput and reconnect behavior."""
        return self._metrics

    @property
    def state(self) -> MatchStateSnapshot:
        """Return the latest convenience match snapshot derived from recent events."""
        return self._state_tracker.snapshot

    @property
    def last_error(self) -> Exception | None:
        """Return the latest connection or reader error seen by the background task."""
        return self._last_error

    @property
    def permanently_failed(self) -> bool:
        """Return True when reconnect attempts were exhausted."""
        return self._permanently_failed

    async def __aenter__(self) -> StatsClient:
        """Connect and return self for use as an async context manager."""
        await self.connect()
        return self

    async def __aexit__(self, *_: object) -> None:
        """Disconnect when leaving the async context manager."""
        await self.disconnect()

    async def connect(self) -> None:
        """Start the background reader task and begin connecting. No-op if already running."""
        if self._reader_task and not self._reader_task.done():
            self._logger.debug("connect() ignored: reader already running")
            return

        self._stopping = False
        self._permanently_failed = False
        self._last_error = None
        self._reader_task = asyncio.create_task(self._run(), name="rlstatsapi-reader")
        self._logger.debug("reader task started")

    async def disconnect(self, timeout: float = 5.0) -> None:
        """Disconnect from the Stats API and stop the reader task.

        Args:
            timeout: Seconds to wait for the reader task to finish before giving up.
        """
        self._stopping = True
        self._logger.debug("disconnect requested")

        if self._writer is not None:
            with contextlib.suppress(OSError, asyncio.TimeoutError):
                self._writer.close()
                await self._writer.wait_closed()
            self._writer = None
            self._reader = None

        if self._reader_task is not None:
            self._reader_task.cancel()
            with contextlib.suppress(asyncio.TimeoutError, asyncio.CancelledError):
                await asyncio.wait_for(self._reader_task, timeout=timeout)
            self._reader_task = None
        if self.drain_on_disconnect:
            self.clear_queue()
        self._logger.debug("disconnected")

    def on_connect(self, handler: _SimpleHandler) -> None:
        """Register a callback fired after TCP connection is established."""
        self._on_connect_handlers.append(handler)

    def on_disconnect(self, handler: _SimpleHandler) -> None:
        """Register a callback fired when the active TCP session is closed."""
        self._on_disconnect_handlers.append(handler)

    def on_handler_error(self, handler: _ErrorHandler) -> None:
        """Register a callback for exceptions raised inside event handlers."""
        self._error_handlers.append(handler)

    @overload
    def on(
        self,
        event_name: Literal["UpdateState"],
        handler: Callable[
            [TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["UpdateState"]) -> Callable[
        [Callable[[TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["BallHit"],
        handler: Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["BallHit"]) -> Callable[
        [Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["ClockUpdatedSeconds"],
        handler: Callable[
            [TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["ClockUpdatedSeconds"]) -> Callable[
        [
            Callable[
                [TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None
            ]
        ],
        Callable[
            [TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None
        ],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["CountdownBegin"],
        handler: Callable[
            [TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["CountdownBegin"]) -> Callable[
        [Callable[[TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["CrossbarHit"],
        handler: Callable[
            [TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["CrossbarHit"]) -> Callable[
        [Callable[[TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["GoalReplayEnd"],
        handler: Callable[
            [TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["GoalReplayEnd"]) -> Callable[
        [Callable[[TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["GoalReplayStart"],
        handler: Callable[
            [TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["GoalReplayStart"]) -> Callable[
        [Callable[[TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["GoalReplayWillEnd"],
        handler: Callable[
            [TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["GoalReplayWillEnd"]) -> Callable[
        [
            Callable[
                [TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None
            ]
        ],
        Callable[[TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["GoalScored"],
        handler: Callable[
            [TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["GoalScored"]) -> Callable[
        [Callable[[TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchCreated"],
        handler: Callable[
            [TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchCreated"]) -> Callable[
        [Callable[[TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchInitialized"],
        handler: Callable[
            [TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchInitialized"]) -> Callable[
        [
            Callable[
                [TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None
            ]
        ],
        Callable[[TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchDestroyed"],
        handler: Callable[
            [TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchDestroyed"]) -> Callable[
        [Callable[[TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchEnded"],
        handler: Callable[
            [TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchEnded"]) -> Callable[
        [Callable[[TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchPaused"],
        handler: Callable[
            [TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchPaused"]) -> Callable[
        [Callable[[TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["MatchUnpaused"],
        handler: Callable[
            [TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["MatchUnpaused"]) -> Callable[
        [Callable[[TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["PodiumStart"],
        handler: Callable[
            [TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["PodiumStart"]) -> Callable[
        [Callable[[TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["ReplayCreated"],
        handler: Callable[
            [TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["ReplayCreated"]) -> Callable[
        [Callable[[TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["RoundStarted"],
        handler: Callable[
            [TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["RoundStarted"]) -> Callable[
        [Callable[[TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(
        self,
        event_name: Literal["StatfeedEvent"],
        handler: Callable[
            [TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None
        ],
    ) -> None: ...
    @overload
    def on(self, event_name: Literal["StatfeedEvent"]) -> Callable[
        [Callable[[TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None]],
        Callable[[TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None],
    ]: ...

    @overload
    def on(self, event_name: str, handler: Handler) -> None: ...
    @overload
    def on(self, event_name: str) -> Callable[[Handler], Handler]: ...

    def on(self, event_name: str, handler: _AnyCallable | None = None) -> Any:
        """Register a handler for an event or return a decorator form of registration."""
        if handler is None:

            def decorator(h: _AnyCallable) -> _AnyCallable:
                @functools.wraps(h)
                def wrapper(*args: Any, **kwargs: Any) -> Any:
                    return h(*args, **kwargs)

                self._handlers_by_event[event_name].append(wrapper)
                return wrapper

            return decorator
        self._handlers_by_event[event_name].append(handler)
        return None

    def on_any(self, handler: Handler) -> None:
        """Register a handler that runs for every incoming event."""
        self._handlers_any.append(handler)

    def on_many(self, event_names: Iterable[str], handler: Handler) -> None:
        """Register the same handler for several event names in one call."""
        for event_name in event_names:
            self._handlers_by_event[event_name].append(handler)

    def off(self, event_name: str, handler: _AnyCallable) -> None:
        """Unregister one handler for a specific event if present."""
        handlers = self._handlers_by_event.get(event_name)
        if handlers:
            with contextlib.suppress(ValueError):
                handlers.remove(handler)

    def off_any(self, handler: _AnyCallable) -> None:
        """Unregister a global handler registered with on_any."""
        with contextlib.suppress(ValueError):
            self._handlers_any.remove(handler)

    def clear_queue(self) -> None:
        """Drop any queued events that have not been consumed yet."""
        while True:
            try:
                self._queue.get_nowait()
            except asyncio.QueueEmpty:
                break

    def once(self, event_name: str, handler: _AnyCallable) -> None:
        """Register a handler that runs once and removes itself automatically."""

        async def wrapper(msg: EventMessage) -> None:
            self.off(event_name, wrapper)
            result = handler(msg)
            if inspect.isawaitable(result):
                await result

        self._handlers_by_event[event_name].append(wrapper)

    def on_update_state(
        self,
        handler: Callable[
            [TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering UpdateState handlers."""
        self._handlers_by_event["UpdateState"].append(handler)

    def on_ball_hit(
        self,
        handler: Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None],
    ) -> None:
        """Typed helper for registering BallHit handlers."""
        self._handlers_by_event["BallHit"].append(handler)

    def on_clock_updated_seconds(
        self,
        handler: Callable[
            [TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering ClockUpdatedSeconds handlers."""
        self._handlers_by_event["ClockUpdatedSeconds"].append(handler)

    def on_countdown_begin(
        self,
        handler: Callable[
            [TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering CountdownBegin handlers."""
        self._handlers_by_event["CountdownBegin"].append(handler)

    def on_crossbar_hit(
        self,
        handler: Callable[
            [TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering CrossbarHit handlers."""
        self._handlers_by_event["CrossbarHit"].append(handler)

    def on_goal_replay_end(
        self,
        handler: Callable[
            [TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering GoalReplayEnd handlers."""
        self._handlers_by_event["GoalReplayEnd"].append(handler)

    def on_goal_replay_start(
        self,
        handler: Callable[
            [TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering GoalReplayStart handlers."""
        self._handlers_by_event["GoalReplayStart"].append(handler)

    def on_goal_replay_will_end(
        self,
        handler: Callable[
            [TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering GoalReplayWillEnd handlers."""
        self._handlers_by_event["GoalReplayWillEnd"].append(handler)

    def on_goal_scored(
        self,
        handler: Callable[
            [TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering GoalScored handlers."""
        self._handlers_by_event["GoalScored"].append(handler)

    def on_match_created(
        self,
        handler: Callable[
            [TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchCreated handlers."""
        self._handlers_by_event["MatchCreated"].append(handler)

    def on_match_initialized(
        self,
        handler: Callable[
            [TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchInitialized handlers."""
        self._handlers_by_event["MatchInitialized"].append(handler)

    def on_match_destroyed(
        self,
        handler: Callable[
            [TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchDestroyed handlers."""
        self._handlers_by_event["MatchDestroyed"].append(handler)

    def on_match_ended(
        self,
        handler: Callable[
            [TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchEnded handlers."""
        self._handlers_by_event["MatchEnded"].append(handler)

    def on_match_paused(
        self,
        handler: Callable[
            [TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchPaused handlers."""
        self._handlers_by_event["MatchPaused"].append(handler)

    def on_match_unpaused(
        self,
        handler: Callable[
            [TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering MatchUnpaused handlers."""
        self._handlers_by_event["MatchUnpaused"].append(handler)

    def on_podium_start(
        self,
        handler: Callable[
            [TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering PodiumStart handlers."""
        self._handlers_by_event["PodiumStart"].append(handler)

    def on_replay_created(
        self,
        handler: Callable[
            [TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering ReplayCreated handlers."""
        self._handlers_by_event["ReplayCreated"].append(handler)

    def on_round_started(
        self,
        handler: Callable[
            [TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering RoundStarted handlers."""
        self._handlers_by_event["RoundStarted"].append(handler)

    def on_statfeed_event(
        self,
        handler: Callable[
            [TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None
        ],
    ) -> None:
        """Typed helper for registering StatfeedEvent handlers."""
        self._handlers_by_event["StatfeedEvent"].append(handler)

    async def events(self, *event_names: str) -> AsyncIterator[EventMessage]:
        """Async iterator that yields incoming events, optionally filtered by name."""
        filters = set(event_names)
        while True:
            message = await self._queue.get()
            if not filters or message.event in filters:
                yield message

    async def wait_for(
        self,
        event_name: str,
        *,
        timeout: float | None = None,
    ) -> EventMessage:
        """Wait until the next occurrence of ``event_name`` and return the message.

        Args:
            event_name: The event to wait for (e.g. ``"GoalScored"``).
            timeout: Seconds before raising ``asyncio.TimeoutError``. None waits forever.

        Returns:
            The first matching ``EventMessage`` received after this call.

        Raises:
            asyncio.TimeoutError: If ``timeout`` elapses before the event arrives.
        """
        loop = asyncio.get_event_loop()
        future: asyncio.Future[EventMessage] = loop.create_future()

        def _handler(msg: EventMessage) -> None:
            if not future.done():
                self.off(event_name, _handler)
                future.set_result(msg)

        self._handlers_by_event[event_name].append(_handler)
        try:
            return await asyncio.wait_for(future, timeout=timeout)
        except asyncio.TimeoutError:
            self.off(event_name, _handler)
            raise

    async def _run(self) -> None:
        """Main loop to manage connection and reading from the Stats API."""
        attempt = 0
        while not self._stopping:
            try:
                self._logger.debug("connecting to tcp://%s:%d", self.host, self.port)
                self._connection_state = ConnectionState.CONNECTING
                self._reader, self._writer = await asyncio.wait_for(
                    asyncio.open_connection(host=self.host, port=self.port),
                    timeout=self.connect_timeout,
                )
                self._connection_state = ConnectionState.CONNECTED
                attempt = 0
                self._logger.debug("tcp socket connected")
                await self._fire_simple(self._on_connect_handlers)
                await self._read_loop(self._reader)
            except asyncio.CancelledError:
                self._logger.debug("reader task cancelled")
                raise
            except Exception as exc:
                self._last_error = exc
                self._metrics.connection_failures += 1
                self._logger.debug("connection/read error: %s", exc)
                if not self.reconnect or self._stopping:
                    self._logger.debug(
                        "stopping reader (reconnect disabled or stopping)"
                    )
                    return
                attempt += 1
                self._metrics.reconnect_count += 1
                if (
                    self.max_reconnect_attempts is not None
                    and attempt > self.max_reconnect_attempts
                ):
                    self._permanently_failed = True
                    self._connection_state = ConnectionState.FAILED
                    self._logger.debug(
                        "max reconnect attempts (%d) reached",
                        self.max_reconnect_attempts,
                    )
                    return
                delay = min(
                    self.reconnect_delay * (2 ** (attempt - 1)),
                    self.max_reconnect_delay,
                ) * random.uniform(0.75, 1.25)
                self._logger.debug("reconnecting in %.2fs (attempt %d)", delay, attempt)
                await asyncio.sleep(delay)
            finally:
                if self._connection_state == ConnectionState.CONNECTED:
                    self._connection_state = ConnectionState.DISCONNECTED
                    await self._fire_simple(self._on_disconnect_handlers)
                if self._writer is not None:
                    with contextlib.suppress(OSError, asyncio.TimeoutError):
                        self._writer.close()
                        await self._writer.wait_closed()
                    self._writer = None
                    self._reader = None
                    self._logger.debug("tcp socket closed")

    async def _read_loop(self, reader: asyncio.StreamReader) -> None:
        """
        Reads data from the Stats API, decodes JSON messages, and dispatches events to handlers.
        """
        decoder = json.JSONDecoder()
        buffer = ""
        while not self._stopping:
            chunk = await reader.read(4096)
            if not chunk:
                raise ConnectionAbortedError("Socket closed by peer")
            decoded_chunk = chunk.decode("utf-8", errors="replace")
            if "�" in decoded_chunk:
                self._logger.warning(
                    "received invalid UTF-8 bytes; replaced with U+FFFD"
                )
            buffer += decoded_chunk

            while True:
                lstripped = buffer.lstrip()
                if not lstripped:
                    buffer = ""
                    break
                if lstripped != buffer:
                    buffer = lstripped

                try:
                    decoded, end_idx = decoder.raw_decode(buffer)
                except json.JSONDecodeError:
                    break

                raw = buffer[:end_idx]
                buffer = buffer[end_idx:]

                message = _parse_message_obj(
                    decoded, raw=raw, include_raw=self.include_raw
                )
                if message is None:
                    self._logger.debug("ignored invalid message (json/envelope)")
                    continue

                self._metrics.received_events += 1
                self._state_tracker.update(message)

                if self.overflow == "block":
                    await self._queue.put(message)
                    self._metrics.queued_events += 1
                elif self.overflow == "drop":
                    try:
                        self._queue.put_nowait(message)
                        self._metrics.queued_events += 1
                    except asyncio.QueueFull:
                        self._metrics.dropped_events += 1
                        self._logger.warning(
                            "queue full, dropping %s event", message.event
                        )
                else:  # "raise" queuefull propagates and kills the connection
                    self._queue.put_nowait(message)
                    self._metrics.queued_events += 1

                await self._dispatch(message)

    async def _dispatch(self, message: EventMessage) -> None:
        """Dispatches to handlers for `message.event` and global handlers."""
        handlers = list(self._handlers_any) + list(
            self._handlers_by_event.get(message.event, [])
        )
        for handler in handlers:
            try:
                result = handler(message)
                if inspect.isawaitable(result):
                    if (
                        self.handler_timeout is not None
                        and inspect.iscoroutinefunction(handler)
                    ):
                        await asyncio.wait_for(result, timeout=self.handler_timeout)
                    else:
                        await result
            except Exception as exc:
                self._metrics.handler_errors += 1
                if self._error_handlers:
                    for error_handler in list(self._error_handlers):
                        try:
                            r = error_handler(message, exc, handler)
                            if inspect.isawaitable(r):
                                await r
                        except Exception as err_exc:
                            self._logger.error(
                                "error handler itself raised: %s", err_exc
                            )
                else:
                    self._logger.error("handler error for %s: %s", message.event, exc)

    async def _fire_simple(self, handlers: list[_SimpleHandler]) -> None:
        """Run connection lifecycle callbacks and await async ones when needed."""
        for handler in list(handlers):
            try:
                result = handler()
                if inspect.isawaitable(result):
                    await result
            except Exception as exc:
                self._logger.error("lifecycle handler error: %s", exc)

connection_state property

Return the current connection lifecycle state.

is_connected property

Return True if the client is currently connected to the Stats API.

last_error property

Return the latest connection or reader error seen by the background task.

metrics property

Return read-only-style counters for throughput and reconnect behavior.

permanently_failed property

Return True when reconnect attempts were exhausted.

state property

Return the latest convenience match snapshot derived from recent events.

__aenter__() async

Connect and return self for use as an async context manager.

Source code in src/rlstatsapi/client.py
163
164
165
166
async def __aenter__(self) -> StatsClient:
    """Connect and return self for use as an async context manager."""
    await self.connect()
    return self

__aexit__(*_) async

Disconnect when leaving the async context manager.

Source code in src/rlstatsapi/client.py
168
169
170
async def __aexit__(self, *_: object) -> None:
    """Disconnect when leaving the async context manager."""
    await self.disconnect()

__init__(host='127.0.0.1', port=49123, reconnect=True, reconnect_delay=0.5, max_reconnect_delay=30.0, max_reconnect_attempts=None, include_raw=False, queue_size=2048, overflow='block', connect_timeout=5.0, drain_on_disconnect=False, handler_timeout=None)

Configure the client.

Does not connect until connect() or async context manager entry.

Parameters:

Name Type Description Default
host str

Stats API host address.

'127.0.0.1'
port int

Stats API TCP port.

49123
reconnect bool

Whether to reconnect automatically on connection loss.

True
reconnect_delay float

Initial delay in seconds before the first reconnect attempt.

0.5
max_reconnect_delay float

Upper bound for exponential backoff delay.

30.0
max_reconnect_attempts int | None

Stop reconnecting after this many failures. None means unlimited.

None
include_raw bool

Attach the original JSON string to each EventMessage.raw.

False
queue_size int

Max events buffered in the internal queue.

2048
overflow Literal['block', 'drop', 'raise']

Queue-full behavior "block" waits, "drop" discards, "raise" kills the connection.

'block'
connect_timeout float

Seconds to wait for TCP handshake before raising.

5.0
drain_on_disconnect bool

Clear the queue when the session ends.

False
handler_timeout float | None

Max seconds an async handler may run before being cancelled. Sync handlers are not affected. None disables the timeout.

None
Source code in src/rlstatsapi/client.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
    self,
    host: str = "127.0.0.1",
    port: int = 49123,
    reconnect: bool = True,
    reconnect_delay: float = 0.5,
    max_reconnect_delay: float = 30.0,
    max_reconnect_attempts: int | None = None,
    include_raw: bool = False,
    queue_size: int = 2048,
    overflow: Literal["block", "drop", "raise"] = "block",
    connect_timeout: float = 5.0,
    drain_on_disconnect: bool = False,
    handler_timeout: float | None = None,
) -> None:
    """Configure the client.

    Does not connect until ``connect()`` or async context manager entry.

    Args:
        host: Stats API host address.
        port: Stats API TCP port.
        reconnect: Whether to reconnect automatically on connection loss.
        reconnect_delay: Initial delay in seconds before the first reconnect attempt.
        max_reconnect_delay: Upper bound for exponential backoff delay.
        max_reconnect_attempts: Stop reconnecting after this many failures.
            None means unlimited.
        include_raw: Attach the original JSON string to each ``EventMessage.raw``.
        queue_size: Max events buffered in the internal queue.
        overflow: Queue-full behavior ``"block"`` waits, ``"drop"`` discards,
            ``"raise"`` kills the connection.
        connect_timeout: Seconds to wait for TCP handshake before raising.
        drain_on_disconnect: Clear the queue when the session ends.
        handler_timeout: Max seconds an async handler may run before being cancelled.
            Sync handlers are not affected. None disables the timeout.
    """
    if reconnect_delay <= 0:
        raise ValueError("reconnect_delay must be positive")
    self.host = host
    self.port = port
    self.reconnect = reconnect
    self.reconnect_delay = reconnect_delay
    self.max_reconnect_delay = max_reconnect_delay
    self.max_reconnect_attempts = max_reconnect_attempts
    self.include_raw = include_raw
    self.connect_timeout = connect_timeout
    self.overflow = overflow
    self.drain_on_disconnect = drain_on_disconnect
    self.handler_timeout = handler_timeout

    self._queue: asyncio.Queue[EventMessage] = asyncio.Queue(maxsize=queue_size)
    self._handlers_by_event: dict[str, list[_AnyCallable]] = defaultdict(list)
    self._handlers_any: list[_AnyCallable] = []
    self._on_connect_handlers: list[_SimpleHandler] = []
    self._on_disconnect_handlers: list[_SimpleHandler] = []
    self._error_handlers: list[_ErrorHandler] = []
    self._logger = logging.getLogger("rlstatsapi")

    self._reader: asyncio.StreamReader | None = None
    self._writer: asyncio.StreamWriter | None = None
    self._reader_task: asyncio.Task[None] | None = None
    self._stopping = False
    self._connection_state = ConnectionState.DISCONNECTED
    self._permanently_failed = False
    self._last_error: Exception | None = None
    self._metrics = ClientMetrics()
    self._state_tracker = MatchStateTracker()

clear_queue()

Drop any queued events that have not been consumed yet.

Source code in src/rlstatsapi/client.py
541
542
543
544
545
546
547
def clear_queue(self) -> None:
    """Drop any queued events that have not been consumed yet."""
    while True:
        try:
            self._queue.get_nowait()
        except asyncio.QueueEmpty:
            break

connect() async

Start the background reader task and begin connecting. No-op if already running.

Source code in src/rlstatsapi/client.py
172
173
174
175
176
177
178
179
180
181
182
async def connect(self) -> None:
    """Start the background reader task and begin connecting. No-op if already running."""
    if self._reader_task and not self._reader_task.done():
        self._logger.debug("connect() ignored: reader already running")
        return

    self._stopping = False
    self._permanently_failed = False
    self._last_error = None
    self._reader_task = asyncio.create_task(self._run(), name="rlstatsapi-reader")
    self._logger.debug("reader task started")

disconnect(timeout=5.0) async

Disconnect from the Stats API and stop the reader task.

Parameters:

Name Type Description Default
timeout float

Seconds to wait for the reader task to finish before giving up.

5.0
Source code in src/rlstatsapi/client.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def disconnect(self, timeout: float = 5.0) -> None:
    """Disconnect from the Stats API and stop the reader task.

    Args:
        timeout: Seconds to wait for the reader task to finish before giving up.
    """
    self._stopping = True
    self._logger.debug("disconnect requested")

    if self._writer is not None:
        with contextlib.suppress(OSError, asyncio.TimeoutError):
            self._writer.close()
            await self._writer.wait_closed()
        self._writer = None
        self._reader = None

    if self._reader_task is not None:
        self._reader_task.cancel()
        with contextlib.suppress(asyncio.TimeoutError, asyncio.CancelledError):
            await asyncio.wait_for(self._reader_task, timeout=timeout)
        self._reader_task = None
    if self.drain_on_disconnect:
        self.clear_queue()
    self._logger.debug("disconnected")

events(*event_names) async

Async iterator that yields incoming events, optionally filtered by name.

Source code in src/rlstatsapi/client.py
729
730
731
732
733
734
735
async def events(self, *event_names: str) -> AsyncIterator[EventMessage]:
    """Async iterator that yields incoming events, optionally filtered by name."""
    filters = set(event_names)
    while True:
        message = await self._queue.get()
        if not filters or message.event in filters:
            yield message

off(event_name, handler)

Unregister one handler for a specific event if present.

Source code in src/rlstatsapi/client.py
529
530
531
532
533
534
def off(self, event_name: str, handler: _AnyCallable) -> None:
    """Unregister one handler for a specific event if present."""
    handlers = self._handlers_by_event.get(event_name)
    if handlers:
        with contextlib.suppress(ValueError):
            handlers.remove(handler)

off_any(handler)

Unregister a global handler registered with on_any.

Source code in src/rlstatsapi/client.py
536
537
538
539
def off_any(self, handler: _AnyCallable) -> None:
    """Unregister a global handler registered with on_any."""
    with contextlib.suppress(ValueError):
        self._handlers_any.remove(handler)

on(event_name, handler=None)

on(event_name: Literal['UpdateState'], handler: Callable[[TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['UpdateState']) -> Callable[[Callable[[TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None]]
on(event_name: Literal['BallHit'], handler: Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['BallHit']) -> Callable[[Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None]]
on(event_name: Literal['ClockUpdatedSeconds'], handler: Callable[[TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['ClockUpdatedSeconds']) -> Callable[[Callable[[TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None]]
on(event_name: Literal['CountdownBegin'], handler: Callable[[TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['CountdownBegin']) -> Callable[[Callable[[TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None]]
on(event_name: Literal['CrossbarHit'], handler: Callable[[TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['CrossbarHit']) -> Callable[[Callable[[TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None]]
on(event_name: Literal['GoalReplayEnd'], handler: Callable[[TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['GoalReplayEnd']) -> Callable[[Callable[[TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None]]
on(event_name: Literal['GoalReplayStart'], handler: Callable[[TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['GoalReplayStart']) -> Callable[[Callable[[TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None]]
on(event_name: Literal['GoalReplayWillEnd'], handler: Callable[[TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['GoalReplayWillEnd']) -> Callable[[Callable[[TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None]]
on(event_name: Literal['GoalScored'], handler: Callable[[TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['GoalScored']) -> Callable[[Callable[[TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchCreated'], handler: Callable[[TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchCreated']) -> Callable[[Callable[[TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchInitialized'], handler: Callable[[TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchInitialized']) -> Callable[[Callable[[TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchDestroyed'], handler: Callable[[TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchDestroyed']) -> Callable[[Callable[[TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchEnded'], handler: Callable[[TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchEnded']) -> Callable[[Callable[[TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchPaused'], handler: Callable[[TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchPaused']) -> Callable[[Callable[[TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None]]
on(event_name: Literal['MatchUnpaused'], handler: Callable[[TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['MatchUnpaused']) -> Callable[[Callable[[TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None]]
on(event_name: Literal['PodiumStart'], handler: Callable[[TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['PodiumStart']) -> Callable[[Callable[[TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None]]
on(event_name: Literal['ReplayCreated'], handler: Callable[[TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['ReplayCreated']) -> Callable[[Callable[[TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None]]
on(event_name: Literal['RoundStarted'], handler: Callable[[TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['RoundStarted']) -> Callable[[Callable[[TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None]]
on(event_name: Literal['StatfeedEvent'], handler: Callable[[TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None]) -> None
on(event_name: Literal['StatfeedEvent']) -> Callable[[Callable[[TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None]], Callable[[TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None]]
on(event_name: str, handler: Handler) -> None
on(event_name: str) -> Callable[[Handler], Handler]

Register a handler for an event or return a decorator form of registration.

Source code in src/rlstatsapi/client.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def on(self, event_name: str, handler: _AnyCallable | None = None) -> Any:
    """Register a handler for an event or return a decorator form of registration."""
    if handler is None:

        def decorator(h: _AnyCallable) -> _AnyCallable:
            @functools.wraps(h)
            def wrapper(*args: Any, **kwargs: Any) -> Any:
                return h(*args, **kwargs)

            self._handlers_by_event[event_name].append(wrapper)
            return wrapper

        return decorator
    self._handlers_by_event[event_name].append(handler)
    return None

on_any(handler)

Register a handler that runs for every incoming event.

Source code in src/rlstatsapi/client.py
520
521
522
def on_any(self, handler: Handler) -> None:
    """Register a handler that runs for every incoming event."""
    self._handlers_any.append(handler)

on_ball_hit(handler)

Typed helper for registering BallHit handlers.

Source code in src/rlstatsapi/client.py
569
570
571
572
573
574
def on_ball_hit(
    self,
    handler: Callable[[TypedEventMessage[BallHitPayload]], Awaitable[None] | None],
) -> None:
    """Typed helper for registering BallHit handlers."""
    self._handlers_by_event["BallHit"].append(handler)

on_clock_updated_seconds(handler)

Typed helper for registering ClockUpdatedSeconds handlers.

Source code in src/rlstatsapi/client.py
576
577
578
579
580
581
582
583
def on_clock_updated_seconds(
    self,
    handler: Callable[
        [TypedEventMessage[ClockUpdatedSecondsPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering ClockUpdatedSeconds handlers."""
    self._handlers_by_event["ClockUpdatedSeconds"].append(handler)

on_connect(handler)

Register a callback fired after TCP connection is established.

Source code in src/rlstatsapi/client.py
209
210
211
def on_connect(self, handler: _SimpleHandler) -> None:
    """Register a callback fired after TCP connection is established."""
    self._on_connect_handlers.append(handler)

on_countdown_begin(handler)

Typed helper for registering CountdownBegin handlers.

Source code in src/rlstatsapi/client.py
585
586
587
588
589
590
591
592
def on_countdown_begin(
    self,
    handler: Callable[
        [TypedEventMessage[CountdownBeginPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering CountdownBegin handlers."""
    self._handlers_by_event["CountdownBegin"].append(handler)

on_crossbar_hit(handler)

Typed helper for registering CrossbarHit handlers.

Source code in src/rlstatsapi/client.py
594
595
596
597
598
599
600
601
def on_crossbar_hit(
    self,
    handler: Callable[
        [TypedEventMessage[CrossbarHitPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering CrossbarHit handlers."""
    self._handlers_by_event["CrossbarHit"].append(handler)

on_disconnect(handler)

Register a callback fired when the active TCP session is closed.

Source code in src/rlstatsapi/client.py
213
214
215
def on_disconnect(self, handler: _SimpleHandler) -> None:
    """Register a callback fired when the active TCP session is closed."""
    self._on_disconnect_handlers.append(handler)

on_goal_replay_end(handler)

Typed helper for registering GoalReplayEnd handlers.

Source code in src/rlstatsapi/client.py
603
604
605
606
607
608
609
610
def on_goal_replay_end(
    self,
    handler: Callable[
        [TypedEventMessage[GoalReplayEndPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering GoalReplayEnd handlers."""
    self._handlers_by_event["GoalReplayEnd"].append(handler)

on_goal_replay_start(handler)

Typed helper for registering GoalReplayStart handlers.

Source code in src/rlstatsapi/client.py
612
613
614
615
616
617
618
619
def on_goal_replay_start(
    self,
    handler: Callable[
        [TypedEventMessage[GoalReplayStartPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering GoalReplayStart handlers."""
    self._handlers_by_event["GoalReplayStart"].append(handler)

on_goal_replay_will_end(handler)

Typed helper for registering GoalReplayWillEnd handlers.

Source code in src/rlstatsapi/client.py
621
622
623
624
625
626
627
628
def on_goal_replay_will_end(
    self,
    handler: Callable[
        [TypedEventMessage[GoalReplayWillEndPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering GoalReplayWillEnd handlers."""
    self._handlers_by_event["GoalReplayWillEnd"].append(handler)

on_goal_scored(handler)

Typed helper for registering GoalScored handlers.

Source code in src/rlstatsapi/client.py
630
631
632
633
634
635
636
637
def on_goal_scored(
    self,
    handler: Callable[
        [TypedEventMessage[GoalScoredPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering GoalScored handlers."""
    self._handlers_by_event["GoalScored"].append(handler)

on_handler_error(handler)

Register a callback for exceptions raised inside event handlers.

Source code in src/rlstatsapi/client.py
217
218
219
def on_handler_error(self, handler: _ErrorHandler) -> None:
    """Register a callback for exceptions raised inside event handlers."""
    self._error_handlers.append(handler)

on_many(event_names, handler)

Register the same handler for several event names in one call.

Source code in src/rlstatsapi/client.py
524
525
526
527
def on_many(self, event_names: Iterable[str], handler: Handler) -> None:
    """Register the same handler for several event names in one call."""
    for event_name in event_names:
        self._handlers_by_event[event_name].append(handler)

on_match_created(handler)

Typed helper for registering MatchCreated handlers.

Source code in src/rlstatsapi/client.py
639
640
641
642
643
644
645
646
def on_match_created(
    self,
    handler: Callable[
        [TypedEventMessage[MatchCreatedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchCreated handlers."""
    self._handlers_by_event["MatchCreated"].append(handler)

on_match_destroyed(handler)

Typed helper for registering MatchDestroyed handlers.

Source code in src/rlstatsapi/client.py
657
658
659
660
661
662
663
664
def on_match_destroyed(
    self,
    handler: Callable[
        [TypedEventMessage[MatchDestroyedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchDestroyed handlers."""
    self._handlers_by_event["MatchDestroyed"].append(handler)

on_match_ended(handler)

Typed helper for registering MatchEnded handlers.

Source code in src/rlstatsapi/client.py
666
667
668
669
670
671
672
673
def on_match_ended(
    self,
    handler: Callable[
        [TypedEventMessage[MatchEndedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchEnded handlers."""
    self._handlers_by_event["MatchEnded"].append(handler)

on_match_initialized(handler)

Typed helper for registering MatchInitialized handlers.

Source code in src/rlstatsapi/client.py
648
649
650
651
652
653
654
655
def on_match_initialized(
    self,
    handler: Callable[
        [TypedEventMessage[MatchInitializedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchInitialized handlers."""
    self._handlers_by_event["MatchInitialized"].append(handler)

on_match_paused(handler)

Typed helper for registering MatchPaused handlers.

Source code in src/rlstatsapi/client.py
675
676
677
678
679
680
681
682
def on_match_paused(
    self,
    handler: Callable[
        [TypedEventMessage[MatchPausedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchPaused handlers."""
    self._handlers_by_event["MatchPaused"].append(handler)

on_match_unpaused(handler)

Typed helper for registering MatchUnpaused handlers.

Source code in src/rlstatsapi/client.py
684
685
686
687
688
689
690
691
def on_match_unpaused(
    self,
    handler: Callable[
        [TypedEventMessage[MatchUnpausedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering MatchUnpaused handlers."""
    self._handlers_by_event["MatchUnpaused"].append(handler)

on_podium_start(handler)

Typed helper for registering PodiumStart handlers.

Source code in src/rlstatsapi/client.py
693
694
695
696
697
698
699
700
def on_podium_start(
    self,
    handler: Callable[
        [TypedEventMessage[PodiumStartPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering PodiumStart handlers."""
    self._handlers_by_event["PodiumStart"].append(handler)

on_replay_created(handler)

Typed helper for registering ReplayCreated handlers.

Source code in src/rlstatsapi/client.py
702
703
704
705
706
707
708
709
def on_replay_created(
    self,
    handler: Callable[
        [TypedEventMessage[ReplayCreatedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering ReplayCreated handlers."""
    self._handlers_by_event["ReplayCreated"].append(handler)

on_round_started(handler)

Typed helper for registering RoundStarted handlers.

Source code in src/rlstatsapi/client.py
711
712
713
714
715
716
717
718
def on_round_started(
    self,
    handler: Callable[
        [TypedEventMessage[RoundStartedPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering RoundStarted handlers."""
    self._handlers_by_event["RoundStarted"].append(handler)

on_statfeed_event(handler)

Typed helper for registering StatfeedEvent handlers.

Source code in src/rlstatsapi/client.py
720
721
722
723
724
725
726
727
def on_statfeed_event(
    self,
    handler: Callable[
        [TypedEventMessage[StatfeedEventPayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering StatfeedEvent handlers."""
    self._handlers_by_event["StatfeedEvent"].append(handler)

on_update_state(handler)

Typed helper for registering UpdateState handlers.

Source code in src/rlstatsapi/client.py
560
561
562
563
564
565
566
567
def on_update_state(
    self,
    handler: Callable[
        [TypedEventMessage[UpdateStatePayload]], Awaitable[None] | None
    ],
) -> None:
    """Typed helper for registering UpdateState handlers."""
    self._handlers_by_event["UpdateState"].append(handler)

once(event_name, handler)

Register a handler that runs once and removes itself automatically.

Source code in src/rlstatsapi/client.py
549
550
551
552
553
554
555
556
557
558
def once(self, event_name: str, handler: _AnyCallable) -> None:
    """Register a handler that runs once and removes itself automatically."""

    async def wrapper(msg: EventMessage) -> None:
        self.off(event_name, wrapper)
        result = handler(msg)
        if inspect.isawaitable(result):
            await result

    self._handlers_by_event[event_name].append(wrapper)

wait_for(event_name, *, timeout=None) async

Wait until the next occurrence of event_name and return the message.

Parameters:

Name Type Description Default
event_name str

The event to wait for (e.g. "GoalScored").

required
timeout float | None

Seconds before raising asyncio.TimeoutError. None waits forever.

None

Returns:

Type Description
EventMessage

The first matching EventMessage received after this call.

Raises:

Type Description
TimeoutError

If timeout elapses before the event arrives.

Source code in src/rlstatsapi/client.py
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
async def wait_for(
    self,
    event_name: str,
    *,
    timeout: float | None = None,
) -> EventMessage:
    """Wait until the next occurrence of ``event_name`` and return the message.

    Args:
        event_name: The event to wait for (e.g. ``"GoalScored"``).
        timeout: Seconds before raising ``asyncio.TimeoutError``. None waits forever.

    Returns:
        The first matching ``EventMessage`` received after this call.

    Raises:
        asyncio.TimeoutError: If ``timeout`` elapses before the event arrives.
    """
    loop = asyncio.get_event_loop()
    future: asyncio.Future[EventMessage] = loop.create_future()

    def _handler(msg: EventMessage) -> None:
        if not future.done():
            self.off(event_name, _handler)
            future.set_result(msg)

    self._handlers_by_event[event_name].append(_handler)
    try:
        return await asyncio.wait_for(future, timeout=timeout)
    except asyncio.TimeoutError:
        self.off(event_name, _handler)
        raise

rlstatsapi.config

Helpers for reading and updating Rocket League's TAStatsAPI.ini.

This module keeps file management separate from the network client so users can prepare the game config before starting Rocket League and then connect with StatsClient using the same port.

StatsAPIConfigStatus dataclass

Snapshot of the current Stats API config file state.

Source code in src/rlstatsapi/config.py
27
28
29
30
31
32
33
34
35
36
37
@dataclass(slots=True)
class StatsAPIConfigStatus:
    """Snapshot of the current Stats API config file state."""

    found: bool
    enabled: bool
    path: str | None
    packet_send_rate: int | None
    port: int | None
    warning: str | None = None
    discovery_scope: str = "windows-user-config"

candidate_stats_api_paths()

Return likely user config locations for TAStatsAPI.ini on Windows.

Source code in src/rlstatsapi/config.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def candidate_stats_api_paths() -> list[Path]:
    """Return likely user config locations for ``TAStatsAPI.ini`` on Windows."""
    roots: list[Path] = []
    for env_name in ("USERPROFILE", "HOME"):
        value = os.getenv(env_name)
        if value:
            root = Path(value).expanduser().resolve()
            if root.exists():
                roots.append(root)

    candidates: list[Path] = []
    for root in roots:
        candidates.append(
            root / "Documents" / _USER_CONFIG_RELATIVE_DIR / DEFAULT_STATS_API_FILENAME
        )
        candidates.append(
            root / "Documentos" / _USER_CONFIG_RELATIVE_DIR / DEFAULT_STATS_API_FILENAME
        )
        candidates.append(
            root
            / "OneDrive"
            / "Documents"
            / _USER_CONFIG_RELATIVE_DIR
            / DEFAULT_STATS_API_FILENAME
        )
        candidates.append(
            root
            / "OneDrive"
            / "Documentos"
            / _USER_CONFIG_RELATIVE_DIR
            / DEFAULT_STATS_API_FILENAME
        )

    unique: list[Path] = []
    seen: set[str] = set()
    for path in candidates:
        key = str(path).casefold()
        if key not in seen:
            seen.add(key)
            unique.append(path)
    return unique

configure_stats_api(enabled, port=DEFAULT_STATS_API_PORT, packet_send_rate=DEFAULT_PACKET_SEND_RATE, path=None)

Set PacketSendRate and Port together in one write operation.

Source code in src/rlstatsapi/config.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def configure_stats_api(
    enabled: bool,
    port: int = DEFAULT_STATS_API_PORT,
    packet_send_rate: int = DEFAULT_PACKET_SEND_RATE,
    path: str | Path | None = None,
) -> StatsAPIConfigStatus:
    """Set ``PacketSendRate`` and ``Port`` together in one write operation."""
    _validate_port(port)
    if enabled:
        _validate_packet_send_rate(packet_send_rate)

    config_path = _require_config_path(path)
    text = config_path.read_text(encoding="utf-8")
    lines = text.splitlines()
    lines = _set_or_append_key(
        lines,
        "PacketSendRate",
        "0" if not enabled else str(packet_send_rate),
    )
    lines = _set_or_append_key(lines, "Port", str(port))
    _write_lines(config_path, lines, text)
    return _status_for_path(config_path)

find_stats_api_config(path=None)

Resolve an explicit config path or the first matching auto-discovered path.

Source code in src/rlstatsapi/config.py
83
84
85
86
87
88
89
90
91
92
def find_stats_api_config(path: str | Path | None = None) -> Path | None:
    """Resolve an explicit config path or the first matching auto-discovered path."""
    if path is not None:
        explicit = Path(path).expanduser().resolve()
        return explicit if explicit.exists() else None

    for candidate in candidate_stats_api_paths():
        if candidate.exists():
            return candidate
    return None

get_stats_api_status(path=None)

Read the current enabled state, send rate, and port from the config file.

Source code in src/rlstatsapi/config.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_stats_api_status(path: str | Path | None = None) -> StatsAPIConfigStatus:
    """Read the current enabled state, send rate, and port from the config file."""
    config_path = find_stats_api_config(path)
    if config_path is None:
        return StatsAPIConfigStatus(
            found=False,
            enabled=False,
            path=str(Path(path).expanduser().resolve()) if path is not None else None,
            packet_send_rate=None,
            port=None,
            warning=_WARNING,
        )
    return _status_for_path(config_path)

set_stats_api_enabled(enabled, packet_send_rate=DEFAULT_PACKET_SEND_RATE, path=None)

Write PacketSendRate and preserve the current port.

Source code in src/rlstatsapi/config.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def set_stats_api_enabled(
    enabled: bool,
    packet_send_rate: int = DEFAULT_PACKET_SEND_RATE,
    path: str | Path | None = None,
) -> StatsAPIConfigStatus:
    """Write ``PacketSendRate`` and preserve the current port."""
    if enabled:
        _validate_packet_send_rate(packet_send_rate)

    config_path = _require_config_path(path)
    text = config_path.read_text(encoding="utf-8")
    lines = text.splitlines()
    lines = _set_or_append_key(
        lines,
        "PacketSendRate",
        "0" if not enabled else str(packet_send_rate),
    )
    _write_lines(config_path, lines, text)
    return _status_for_path(config_path)

set_stats_api_port(port, path=None)

Write the local TCP port used by Rocket League's stats exporter.

Source code in src/rlstatsapi/config.py
131
132
133
134
135
136
137
138
139
140
141
142
143
def set_stats_api_port(
    port: int,
    path: str | Path | None = None,
) -> StatsAPIConfigStatus:
    """Write the local TCP port used by Rocket League's stats exporter."""
    _validate_port(port)

    config_path = _require_config_path(path)
    text = config_path.read_text(encoding="utf-8")
    lines = text.splitlines()
    lines = _set_or_append_key(lines, "Port", str(port))
    _write_lines(config_path, lines, text)
    return _status_for_path(config_path)

rlstatsapi.models

Runtime envelopes used by the streaming client.

These are lightweight transport models (not validators) used while events move through internal queues and user callbacks.

ClientMetrics dataclass

Mutable counters that summarize client throughput and reconnect behavior.

Source code in src/rlstatsapi/models.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(slots=True)
class ClientMetrics:
    """Mutable counters that summarize client throughput and reconnect behavior."""

    received_events: int = 0
    queued_events: int = 0
    dropped_events: int = 0
    reconnect_count: int = 0
    handler_errors: int = 0
    connection_failures: int = 0
    started_at: datetime = field(default_factory=datetime.now)

    def reset(self) -> None:
        """Reset all counters and update started_at to now."""
        self.received_events = 0
        self.queued_events = 0
        self.dropped_events = 0
        self.reconnect_count = 0
        self.handler_errors = 0
        self.connection_failures = 0
        self.started_at = datetime.now()

reset()

Reset all counters and update started_at to now.

Source code in src/rlstatsapi/models.py
46
47
48
49
50
51
52
53
54
def reset(self) -> None:
    """Reset all counters and update started_at to now."""
    self.received_events = 0
    self.queued_events = 0
    self.dropped_events = 0
    self.reconnect_count = 0
    self.handler_errors = 0
    self.connection_failures = 0
    self.started_at = datetime.now()

EventMessage dataclass

Normalized Rocket League Stats API message envelope.

Source code in src/rlstatsapi/models.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass(slots=True)
class EventMessage:
    """Normalized Rocket League Stats API message envelope."""

    event: str
    data: dict[str, Any]
    raw: str | None = None

    def as_type(self, event_name: EventName) -> TypedEventMessage[Any]:
        """
        Return this message with event-specific payload typing for editors/type-checkers.
        """
        return TypedEventMessage(
            event=event_name,
            data=cast_event_data(event_name, self.data),
        )

as_type(event_name)

Return this message with event-specific payload typing for editors/type-checkers.

Source code in src/rlstatsapi/models.py
24
25
26
27
28
29
30
31
def as_type(self, event_name: EventName) -> TypedEventMessage[Any]:
    """
    Return this message with event-specific payload typing for editors/type-checkers.
    """
    return TypedEventMessage(
        event=event_name,
        data=cast_event_data(event_name, self.data),
    )

MatchStateSnapshot dataclass

Summary of the latest match state seen from incoming events.

Source code in src/rlstatsapi/models.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass(slots=True)
class MatchStateSnapshot:
    """Summary of the latest match state seen from incoming events."""

    match_guid: str | None = None
    blue_score: int | None = None
    orange_score: int | None = None
    time_seconds: int | None = None
    overtime: bool = False
    arena: str | None = None
    replay_active: bool = False
    has_winner: bool = False
    winner: str | None = None
    last_event: str | None = None
    last_scorer: str | None = None
    last_goal_speed: float | None = None
    players: list[PlayerSnapshot] = field(default_factory=list)
    target_player: PlayerSnapshot | None = None
    event_counts: dict[str, int] = field(default_factory=dict)

PlayerSnapshot dataclass

Per-player state extracted from the latest UpdateState event.

Source code in src/rlstatsapi/models.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@dataclass(slots=True)
class PlayerSnapshot:
    """Per-player state extracted from the latest UpdateState event."""

    name: str
    shortcut: int
    team_num: int
    score: int = 0
    goals: int = 0
    assists: int = 0
    saves: int = 0
    shots: int = 0
    boost: int = 0
    speed: float = 0.0
    is_demolished: bool = False

rlstatsapi.state

Lightweight helpers for tracking a match summary from the live event stream.

The goal here is convenience, not full replay-state reconstruction. The tracker keeps the handful of fields that most HUDs, overlays, bots, and dashboards want without forcing users to rebuild the same state cache in every script.

MatchStateTracker

Maintain a rolling summary of the latest match state seen by the client.

Source code in src/rlstatsapi/state.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class MatchStateTracker:
    """Maintain a rolling summary of the latest match state seen by the client."""

    def __init__(self) -> None:
        self._snapshot = MatchStateSnapshot()

    @property
    def snapshot(self) -> MatchStateSnapshot:
        """Return the mutable snapshot object updated by incoming events."""
        return self._snapshot

    def reset(self) -> None:
        """Clear the current snapshot when a match ends or the caller wants a reset."""
        self._snapshot = MatchStateSnapshot()

    def update(self, message: EventMessage) -> MatchStateSnapshot:
        """Apply one incoming event and return the updated snapshot."""
        snapshot = self._snapshot
        snapshot.last_event = message.event
        snapshot.event_counts[message.event] = (
            snapshot.event_counts.get(message.event, 0) + 1
        )

        match_guid = message.data.get("MatchGuid")
        if isinstance(match_guid, str) and match_guid:
            snapshot.match_guid = match_guid

        if message.event == "UpdateState":
            self._apply_update_state(snapshot, message)

        elif message.event == "GoalScored":
            self._apply_goal_scored(snapshot, message)

        elif message.event == "MatchEnded":
            snapshot.has_winner = True

        elif message.event == "MatchDestroyed":
            self.reset()
            snapshot = self._snapshot

        return snapshot

    def _apply_update_state(
        self,
        snapshot: MatchStateSnapshot,
        message: EventMessage,
    ) -> None:
        """Extract game-level fields from an UpdateState payload into the snapshot."""
        game = message.data.get("Game", {})
        if not isinstance(game, dict):
            return

        self._apply_teams(snapshot, game.get("Teams", []))
        self._apply_players(snapshot, message.data.get("Players", []), game)

        time_seconds = game.get("TimeSeconds")
        if isinstance(time_seconds, int):
            snapshot.time_seconds = time_seconds

        snapshot.overtime = bool(game.get("bOvertime", False))
        snapshot.replay_active = bool(game.get("bReplay", False))
        snapshot.has_winner = bool(game.get("bHasWinner", False))

        arena = game.get("Arena")
        if isinstance(arena, str) and arena:
            snapshot.arena = arena

        winner = game.get("Winner")
        if isinstance(winner, str) and winner:
            snapshot.winner = winner

    def _apply_teams(self, snapshot: MatchStateSnapshot, teams: object) -> None:
        """Update blue/orange score from the Teams list in an UpdateState payload."""
        if not isinstance(teams, list):
            return

        for team in teams:
            if not isinstance(team, dict):
                continue
            team_num = team.get("TeamNum")
            score = team.get("Score")
            if team_num == 0 and isinstance(score, int):
                snapshot.blue_score = score
            if team_num == 1 and isinstance(score, int):
                snapshot.orange_score = score

    def _apply_players(
        self,
        snapshot: MatchStateSnapshot,
        players: object,
        game: dict,
    ) -> None:
        """Rebuild the players list and resolve target_player from UpdateState."""
        if not isinstance(players, list):
            snapshot.players = []
            snapshot.target_player = None
            return

        result: list[PlayerSnapshot] = []
        for p in players:
            if not isinstance(p, dict):
                continue
            name = p.get("Name")
            shortcut = p.get("Shortcut")
            team_num = p.get("TeamNum")
            if not isinstance(name, str) or not isinstance(shortcut, int):
                continue
            result.append(
                PlayerSnapshot(
                    name=name,
                    shortcut=shortcut,
                    team_num=team_num if isinstance(team_num, int) else 0,
                    score=p.get("Score") or 0,
                    goals=p.get("Goals") or 0,
                    assists=p.get("Assists") or 0,
                    saves=p.get("Saves") or 0,
                    shots=p.get("Shots") or 0,
                    boost=p.get("Boost") or 0,
                    speed=float(p.get("Speed") or 0.0),
                    is_demolished=bool(p.get("bDemolished", False)),
                )
            )
        snapshot.players = result

        target_player: PlayerSnapshot | None = None
        if game.get("bHasTarget"):
            target = game.get("Target")
            if isinstance(target, dict):
                target_shortcut = target.get("Shortcut")
                if isinstance(target_shortcut, int):
                    for player in result:
                        if player.shortcut == target_shortcut:
                            target_player = player
                            break
        snapshot.target_player = target_player

    def _apply_goal_scored(
        self,
        snapshot: MatchStateSnapshot,
        message: EventMessage,
    ) -> None:
        """Update last scorer name and goal speed from a GoalScored payload."""
        scorer = message.data.get("Scorer", {})
        if isinstance(scorer, dict):
            name = scorer.get("Name")
            if isinstance(name, str) and name:
                snapshot.last_scorer = name

        goal_speed = message.data.get("GoalSpeed")
        if isinstance(goal_speed, (int, float)):
            snapshot.last_goal_speed = float(goal_speed)

snapshot property

Return the mutable snapshot object updated by incoming events.

reset()

Clear the current snapshot when a match ends or the caller wants a reset.

Source code in src/rlstatsapi/state.py
25
26
27
def reset(self) -> None:
    """Clear the current snapshot when a match ends or the caller wants a reset."""
    self._snapshot = MatchStateSnapshot()

update(message)

Apply one incoming event and return the updated snapshot.

Source code in src/rlstatsapi/state.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def update(self, message: EventMessage) -> MatchStateSnapshot:
    """Apply one incoming event and return the updated snapshot."""
    snapshot = self._snapshot
    snapshot.last_event = message.event
    snapshot.event_counts[message.event] = (
        snapshot.event_counts.get(message.event, 0) + 1
    )

    match_guid = message.data.get("MatchGuid")
    if isinstance(match_guid, str) and match_guid:
        snapshot.match_guid = match_guid

    if message.event == "UpdateState":
        self._apply_update_state(snapshot, message)

    elif message.event == "GoalScored":
        self._apply_goal_scored(snapshot, message)

    elif message.event == "MatchEnded":
        snapshot.has_winner = True

    elif message.event == "MatchDestroyed":
        self.reset()
        snapshot = self._snapshot

    return snapshot

rlstatsapi.types

Static typing layer for documented Rocket League event payloads.

TypedDicts here mirror the official event schema and are designed for IDE autocomplete and type-checking, without adding runtime parsing overhead.

TypedEventMessage dataclass

Bases: Generic[TData]

Typed event envelope used by helpers and typed convenience callbacks.

Source code in src/rlstatsapi/types.py
258
259
260
261
262
263
264
@dataclass(slots=True)
class TypedEventMessage(Generic[TData]):
    """Typed event envelope used by helpers and typed convenience callbacks."""

    event: str
    data: TData
    raw: str | None = None

cast_event_data(event_name, data)

cast_event_data(event_name: Literal['UpdateState'], data: Mapping[str, Any]) -> UpdateStatePayload
cast_event_data(event_name: Literal['BallHit'], data: Mapping[str, Any]) -> BallHitPayload
cast_event_data(event_name: Literal['ClockUpdatedSeconds'], data: Mapping[str, Any]) -> ClockUpdatedSecondsPayload
cast_event_data(event_name: Literal['CountdownBegin'], data: Mapping[str, Any]) -> CountdownBeginPayload
cast_event_data(event_name: Literal['CrossbarHit'], data: Mapping[str, Any]) -> CrossbarHitPayload
cast_event_data(event_name: Literal['GoalReplayEnd'], data: Mapping[str, Any]) -> GoalReplayEndPayload
cast_event_data(event_name: Literal['GoalReplayStart'], data: Mapping[str, Any]) -> GoalReplayStartPayload
cast_event_data(event_name: Literal['GoalReplayWillEnd'], data: Mapping[str, Any]) -> GoalReplayWillEndPayload
cast_event_data(event_name: Literal['GoalScored'], data: Mapping[str, Any]) -> GoalScoredPayload
cast_event_data(event_name: Literal['MatchCreated'], data: Mapping[str, Any]) -> MatchCreatedPayload
cast_event_data(event_name: Literal['MatchInitialized'], data: Mapping[str, Any]) -> MatchInitializedPayload
cast_event_data(event_name: Literal['MatchDestroyed'], data: Mapping[str, Any]) -> MatchDestroyedPayload
cast_event_data(event_name: Literal['MatchEnded'], data: Mapping[str, Any]) -> MatchEndedPayload
cast_event_data(event_name: Literal['MatchPaused'], data: Mapping[str, Any]) -> MatchPausedPayload
cast_event_data(event_name: Literal['MatchUnpaused'], data: Mapping[str, Any]) -> MatchUnpausedPayload
cast_event_data(event_name: Literal['PodiumStart'], data: Mapping[str, Any]) -> PodiumStartPayload
cast_event_data(event_name: Literal['ReplayCreated'], data: Mapping[str, Any]) -> ReplayCreatedPayload
cast_event_data(event_name: Literal['RoundStarted'], data: Mapping[str, Any]) -> RoundStartedPayload
cast_event_data(event_name: Literal['StatfeedEvent'], data: Mapping[str, Any]) -> StatfeedEventPayload
cast_event_data(event_name: str, data: Mapping[str, Any]) -> Mapping[str, Any]

Type-only helper to narrow data based on event name.

Source code in src/rlstatsapi/types.py
388
389
390
391
392
def cast_event_data(event_name: str, data: Mapping[str, Any]) -> Mapping[str, Any]:
    """Type-only helper to narrow `data` based on event name."""

    _ = event_name
    return cast(Mapping[str, Any], data)

rlstatsapi.events

Known Rocket League Stats API event names. Sources: https://www.rocketleague.com/en/developer/stats-api