class ActiveSupport::Notifications::Fanout

This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Public Class Methods

new() click to toggle source
# File lib/active_support/notifications/fanout.rb, line 24
def initialize
  @mutex = Mutex.new
  @string_subscribers = Hash.new { |h, k| h[k] = [] }
  @other_subscribers = []
  @listeners_for = Concurrent::Map.new
end

Public Instance Methods

finish(name, id, payload, listeners = listeners_for(name)) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 72
def finish(name, id, payload, listeners = listeners_for(name))
  iterate_guarding_exceptions(listeners) { |s| s.finish(name, id, payload) }
end
iterate_guarding_exceptions(listeners) { |s| ... } click to toggle source
# File lib/active_support/notifications/fanout.rb, line 84
def iterate_guarding_exceptions(listeners)
  exceptions = nil

  listeners.each do |s|
    yield s
  rescue Exception => e
    exceptions ||= []
    exceptions << e
  end

  if exceptions
    if exceptions.size == 1
      raise exceptions.first
    else
      raise InstrumentationSubscriberError.new(exceptions), cause: exceptions.first
    end
  end

  listeners
end
listeners_for(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 105
def listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @listeners_for[name] || @mutex.synchronize do
    # use synchronisation when accessing @subscribers
    @listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
listening?(name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 114
def listening?(name)
  listeners_for(name).any?
end
publish(name, *args) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 76
def publish(name, *args)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
end
publish_event(event) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 80
def publish_event(event)
  iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
start(name, id, payload) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 68
def start(name, id, payload)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.start(name, id, payload) }
end
subscribe(pattern = nil, callable = nil, monotonic: false, &block) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 31
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  @mutex.synchronize do
    case pattern
    when String
      @string_subscribers[pattern] << subscriber
      @listeners_for.delete(pattern)
    when NilClass, Regexp
      @other_subscribers << subscriber
      @listeners_for.clear
    else
      raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
    end
  end
  subscriber
end
unsubscribe(subscriber_or_name) click to toggle source
# File lib/active_support/notifications/fanout.rb, line 48
def unsubscribe(subscriber_or_name)
  @mutex.synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      @listeners_for.delete(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        @listeners_for.delete(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        @listeners_for.clear
      end
    end
  end
end
wait() click to toggle source

This is a sync queue, so there is no waiting.

# File lib/active_support/notifications/fanout.rb, line 119
def wait
end