Class Actor

Direct Known Subclasses:
DislocationClassificationActor, HeartrateClassificationActor, LimitClassificationActor, RespiratoryRateEstimationActor, RRMismatchClassificationActor, SFComputationActor, TrendClassificationActor

public abstract class Actor extends Service
An abstract reactive Actor that listens to a set of input channels on an EventBus, buffers their latest messages, and evaluates configurable firing rules to decide when to invoke a user-defined fireFunction(Map).

Key capabilities

  • Maintains a per-channel latest value and a per-channel monotonic sequence number (the number of received messages for that channel).
  • Supports multiple firing rules. Each rule is a Map<channel, token> where the token describes the condition for that channel within the rule. A rule fires when all its channel conditions are satisfied.
  • When a rule fires, the actor calls fireFunction(Map) with an immutable snapshot of the latest values of all configured input channels.
  • Evaluation is performed under a lock to ensure atomic update-and-evaluate semantics, avoiding races that could otherwise cause missed or duplicate firings for a given rule.

Firing Rule Configuration

The constructor accepts List<Map<String,String>> firingRules. Each map defines one rule; keys are input channel names and values are tokens with the following semantics:
  • "*" — The channel must have received at least one new message since the last time this rule fired (i.e., delta >= 1).
  • "N" — The channel must have received at least N new messages since the last time this rule fired (N is a positive integer).
  • "r:v" — The channel must have received at least one new message since the last time this rule fired, and the latest value on the channel must match v. Matching is implemented by valueMatchesExpected(Object, String) which, by default, compares string representations via Objects.toString(actual, null).equals(expected).
A rule is considered satisfied only if all its channel conditions are satisfied simultaneously. Multiple rules may be satisfied by the same incoming message and can fire within a single evaluation pass.

Thread Safety

  • Incoming messages update channel state (latestByChannel and channelSeq) and then trigger rule evaluation.
  • Rule evaluation runs under a ReentrantLock, ensuring atomicity between observing the updated state, testing conditions, taking the snapshot, and invoking fireFunction(Map).
  • Channel state maps are ConcurrentHashMaps suitable for concurrent access.

Snapshot Semantics

When a rule fires, the actor passes an unmodifiable snapshot containing the latest value for every input channel (not only those participating in the satisfied rule). This provides the callee with a full view of the actor's current inputs.

Extensibility

  • Override valueMatchesExpected(Object, String) to implement custom equality (e.g., typed comparison, JSON field matching, domain-specific predicates) for "r:v" conditions.
  • Implement fireFunction(Map) to define the behavior executed when any rule fires.
  • Implement Service.stop() to provide shutdown/cleanup behavior in concrete actors.

Assumptions

  • The provided EventBus supports registering a handler per channel via eventBus.register(channel, handler) where the handler receives the message object.
  • Messages are typed as Object; this class stores and forwards them opaquely.
  • Input channel names in rules must exist in inputChannels; otherwise construction fails.

Example


 List<String> inputs = List.of("A", "B", "C");

 Map<String, String> rule0 = Map.of(
     "A", "*",   // at least 1 new on A
     "B", "2"    // at least 2 new on B
 );

 Map<String, String> rule1 = Map.of(
     "C", "r:OK" // at least 1 new on C and latest equals "OK"
 );

 List<Map<String, String>> rules = List.of(rule0, rule1);

 Actor actor = new Actor(eventBus, rules, inputs, List.of("OUT")) {
   @Override public void stop() { /* cleanup *\/ }
   @Override public void fireFunction(Map<String, Object> latest) {
     // react to satisfied rules (latest contains A,B,C latest values)
   }
 };
 
  • Field Details

    • id

      protected final String id
      Identifier of the LimitClassifier instance
    • inputChannels

      protected final List<String> inputChannels
      Names of input channels that this actor subscribes to and maintains state for. All channels referenced in firingRules must appear here.
    • outputChannels

      protected final List<String> outputChannels
      Names of output channels this actor may use (not used internally in this base class).
  • Constructor Details

    • Actor

      protected Actor(EventBus eventBus, String id, List<Map<String,String>> firingRules, List<String> inputChannels, List<String> outputChannels)
      Constructs an Actor that subscribes to the given inputChannels, evaluates the provided firingRules, and optionally exposes outputChannels.

      For each input channel, this actor:

      1. Initializes its sequence counter and latest value holder;
      2. Registers a message handler on the EventBus that updates state and evaluates rules.

      During construction, rules are compiled to internal FiringRules and validated for channel existence.

      Parameters:
      eventBus - the event bus used to subscribe to input channels and receive messages; must not be null
      id - the identifier of the specified Actor. Commonly set in the config.
      firingRules - list of rule maps (channel -> token), where token is one of "*", a positive integer string, or "r:v"; must not be null
      inputChannels - list of input channels to subscribe to; must contain all channels referenced in firingRules; must not be null
      outputChannels - list of output channels (not used internally by this class but exposed via getOutputChannels()); must not be null
      Throws:
      NullPointerException - if any argument is null
      IllegalArgumentException - if a rule is empty or references a channel not present in inputChannels, or contains an invalid token
  • Method Details

    • fireFunction

      public abstract void fireFunction(Map<String,Object> latestSnapshot)
      Parameters:
      latestSnapshot - Unmodifiable snapshot of the latest value of ALL input channels (channel -> latest object).
    • getInputChannels

      public List<String> getInputChannels()
      Returns:
      the list of input channels this actor subscribes to; never null
    • getOutputChannels

      public List<String> getOutputChannels()
      Returns:
      the list of output channels associated with this actor (not used internally by this class); never null
    • valueMatchesExpected

      protected boolean valueMatchesExpected(Object actual, String expected)
      Hook for customizing equality checks used by "r:v" conditions.

      The default implementation compares string representations:

      
       Objects.toString(actual, null).equals(expected)
       
      Override this to perform typed or domain-specific comparisons.
      Parameters:
      actual - the latest channel value (may be null)
      expected - the expected value string provided in the rule token (never null)
      Returns:
      true if the actual value should be considered equal to the expected one