Package com.framed.cdss
Class Actor
java.lang.Object
com.framed.core.Service
com.framed.cdss.Actor
- Direct Known Subclasses:
DislocationClassificationActor,HeartrateClassificationActor,LimitClassificationActor,RespiratoryRateEstimationActor,RRMismatchClassificationActor,SFComputationActor,TrendClassificationActor
An abstract reactive
Actor that listens to a set of input channels on an EventBus,
buffers their latest messages, and evaluates configurable firing rules to decide when to invoke
a user-defined fireFunction(Map).
Key capabilities
- Maintains a per-channel latest value and a per-channel monotonic sequence number (the number of received messages for that channel).
- Supports multiple firing rules. Each rule is a
Map<channel, token>where the token describes the condition for that channel within the rule. A rule fires when all its channel conditions are satisfied. - When a rule fires, the actor calls
fireFunction(Map)with an immutable snapshot of the latest values of all configured input channels. - Evaluation is performed under a lock to ensure atomic update-and-evaluate semantics, avoiding races that could otherwise cause missed or duplicate firings for a given rule.
Firing Rule Configuration
The constructor acceptsList<Map<String,String>> firingRules. Each map defines one rule; keys are
input channel names and values are tokens with the following semantics:
- "*" — The channel must have received at least one new message since the last time
this rule fired (i.e.,
delta >= 1). - "N" — The channel must have received at least N new messages since the last time this rule fired (N is a positive integer).
- "r:v" — The channel must have received at least one new message since the last time
this rule fired, and the latest value on the channel must match
v. Matching is implemented byvalueMatchesExpected(Object, String)which, by default, compares string representations viaObjects.toString(actual, null).equals(expected).
Thread Safety
- Incoming messages update channel state (
latestByChannelandchannelSeq) and then trigger rule evaluation. - Rule evaluation runs under a
ReentrantLock, ensuring atomicity between observing the updated state, testing conditions, taking the snapshot, and invokingfireFunction(Map). - Channel state maps are
ConcurrentHashMaps suitable for concurrent access.
Snapshot Semantics
When a rule fires, the actor passes an unmodifiable snapshot containing the latest value for every input channel (not only those participating in the satisfied rule). This provides the callee with a full view of the actor's current inputs.Extensibility
- Override
valueMatchesExpected(Object, String)to implement custom equality (e.g., typed comparison, JSON field matching, domain-specific predicates) for"r:v"conditions. - Implement
fireFunction(Map)to define the behavior executed when any rule fires. - Implement
Service.stop()to provide shutdown/cleanup behavior in concrete actors.
Assumptions
- The provided
EventBussupports registering a handler per channel viaeventBus.register(channel, handler)where the handler receives the message object. - Messages are typed as
Object; this class stores and forwards them opaquely. - Input channel names in rules must exist in
inputChannels; otherwise construction fails.
Example
List<String> inputs = List.of("A", "B", "C");
Map<String, String> rule0 = Map.of(
"A", "*", // at least 1 new on A
"B", "2" // at least 2 new on B
);
Map<String, String> rule1 = Map.of(
"C", "r:OK" // at least 1 new on C and latest equals "OK"
);
List<Map<String, String>> rules = List.of(rule0, rule1);
Actor actor = new Actor(eventBus, rules, inputs, List.of("OUT")) {
@Override public void stop() { /* cleanup *\/ }
@Override public void fireFunction(Map<String, Object> latest) {
// react to satisfied rules (latest contains A,B,C latest values)
}
};
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final StringIdentifier of the LimitClassifier instanceNames of input channels that this actor subscribes to and maintains state for.Names of output channels this actor may use (not used internally in this base class). -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedActor(EventBus eventBus, String id, List<Map<String, String>> firingRules, List<String> inputChannels, List<String> outputChannels) Constructs anActorthat subscribes to the giveninputChannels, evaluates the providedfiringRules, and optionally exposesoutputChannels. -
Method Summary
Modifier and TypeMethodDescriptionabstract voidfireFunction(Map<String, Object> latestSnapshot) protected booleanvalueMatchesExpected(Object actual, String expected) Hook for customizing equality checks used by"r:v"conditions.
-
Field Details
-
id
Identifier of the LimitClassifier instance -
inputChannels
Names of input channels that this actor subscribes to and maintains state for. All channels referenced infiringRulesmust appear here. -
outputChannels
Names of output channels this actor may use (not used internally in this base class).
-
-
Constructor Details
-
Actor
protected Actor(EventBus eventBus, String id, List<Map<String, String>> firingRules, List<String> inputChannels, List<String> outputChannels) Constructs anActorthat subscribes to the giveninputChannels, evaluates the providedfiringRules, and optionally exposesoutputChannels.For each input channel, this actor:
- Initializes its sequence counter and latest value holder;
- Registers a message handler on the
EventBusthat updates state and evaluates rules.
During construction, rules are compiled to internal
FiringRules and validated for channel existence.- Parameters:
eventBus- the event bus used to subscribe to input channels and receive messages; must not benullid- the identifier of the specified Actor. Commonly set in the config.firingRules- list of rule maps (channel -> token), where token is one of"*", a positive integer string, or"r:v"; must not benullinputChannels- list of input channels to subscribe to; must contain all channels referenced infiringRules; must not benulloutputChannels- list of output channels (not used internally by this class but exposed viagetOutputChannels()); must not benull- Throws:
NullPointerException- if any argument isnullIllegalArgumentException- if a rule is empty or references a channel not present ininputChannels, or contains an invalid token
-
-
Method Details
-
fireFunction
- Parameters:
latestSnapshot- Unmodifiable snapshot of the latest value of ALL input channels (channel -> latest object).
-
getInputChannels
- Returns:
- the list of input channels this actor subscribes to; never
null
-
getOutputChannels
- Returns:
- the list of output channels associated with this actor (not used internally by this class); never
null
-
valueMatchesExpected
Hook for customizing equality checks used by"r:v"conditions.The default implementation compares string representations:
Override this to perform typed or domain-specific comparisons.Objects.toString(actual, null).equals(expected)- Parameters:
actual- the latest channel value (may benull)expected- the expected value string provided in the rule token (nevernull)- Returns:
trueif the actual value should be considered equal to the expected one
-