Initial commit

This commit is contained in:
Senad Uka
2022-03-23 05:44:42 +01:00
parent 1405281a5c
commit eea10dd03b
113 changed files with 3617 additions and 81 deletions

View File

@@ -0,0 +1,13 @@
# frozen_string_literal: true
require_relative '../log/loggable'
module SchedulePipeline
class Errors
include Log::Loggable
def push(error)
logger.error error
end
end
end

View File

@@ -0,0 +1,56 @@
# frozen_string_literal: true
require_relative '../log/loggable'
require_relative '../vendors/broad_sign/broad_sign_tokens'
require_relative '../vendors/broad_sign/broad_sign_fetch_schedule'
require_relative '../vendors/vistar/vistar_tokens'
require_relative '../vendors/vistar/vistar_fetch_schedule'
module SchedulePipeline
class FetchSchedule
include Log::Loggable
def initialize(in_queue, out_queue, errors)
@in_queue = in_queue
@out_queue = out_queue
@errors = errors
end
def start
@in_queue.subscribe { |msg| process_msg(msg) }
end
def process_msg(msg)
logger.debug("fetch schedule: #{msg}")
vendor = msg[:vendor]
vendor_fetch = for_vendor(vendor)
unless vendor_fetch
@errors.push("FetchSchedule: Unknown vendor #{vendor}")
return
end
params = msg[:params]
fetched = vendor_fetch.call(params)
unless fetched
@errors.push("FetchSchedule: No ads returned from vendor #{vendor}")
return
end
Models::ScheduleProcessMsg.new(vendor, params[:player], fetched).push(@out_queue) if fetched
true
rescue StandardError => e
@errors.push(e.message)
end
private
def for_vendor(vendor)
vendor_map[vendor&.to_sym]
end
def vendor_map
@vendor_map ||= {
broad_sign: Vendors::BroadSign::BroadSignFetchSchedule.new(Vendors::BroadSign::BroadSignTokens.instance),
vistar: Vendors::Vistar::VistarFetchSchedule.new(Vendors::Vistar::VistarTokens.instance)
}
end
end
end

View File

@@ -0,0 +1,37 @@
# frozen_string_literal: true
module SchedulePipeline
module Models
class Schedule
attr_reader :name, :vendor, :player, :start_time, :items
def initialize(name, vendor, player, start_time, items)
@name = name
@vendor = vendor
@player = player
@start_time = start_time
@items = items
end
def to_hash
{
name: name,
vendor: vendor,
player: player,
start_time: start_time,
items: items.collect(&:to_hash)
}.with_indifferent_access
end
def self.from_hash(input)
self.new(
input[:name],
input[:vendor],
input[:player],
input[:start_time],
input[:items].collect { |i| ScheduleItem.from_hash(i) }
)
end
end
end
end

View File

@@ -0,0 +1,26 @@
# frozen_string_literal: true
module SchedulePipeline
module Models
class ScheduleFetchMsg
attr_reader :vendor, :params
def initialize(vendor, params)
@vendor = vendor
@params = params
end
def send(queue)
queue.push(serialized_msg)
end
private
def serialized_msg
{
vendor: @vendor,
params: @params
}
end
end
end
end

View File

@@ -0,0 +1,31 @@
# frozen_string_literal: true
module SchedulePipeline
module Models
class ScheduleItem
attr_reader :duration, :content_key, :pop_data
def initialize(duration, content_key, pop_data)
@duration = duration
@content_key = content_key
@pop_data = pop_data
end
def to_hash
{
duration: @duration,
content_key: @content_key,
pop_data: @pop_data
}.with_indifferent_access
end
def self.from_hash(input)
self.new(
input[:duration],
input[:content_key],
input[:pop_data]
)
end
end
end
end

View File

@@ -0,0 +1,35 @@
# frozen_string_literal: true
module SchedulePipeline
module Models
class ScheduleProcessMsg
attr_reader :vendor, :player, :vendor_schedule
def initialize(vendor, player, schedule)
@vendor = vendor
@player = player
@vendor_schedule = schedule.with_indifferent_access
end
def push(queue)
queue.push(to_hash)
end
def to_hash
{
vendor: @vendor,
player: @player,
vendor_schedule: @vendor_schedule
}
end
def self.from_hash(input)
self.new(
input[:vendor],
input[:player],
input[:vendor_schedule],
)
end
end
end
end

View File

@@ -0,0 +1,85 @@
# frozen_string_literal: true
require_relative '../log/loggable'
require_relative '../vle/vle_settings'
require_relative '../vle/vle_create_asset'
require_relative '../vle/vle_ingest_asset'
require_relative '../vendors/broad_sign/broad_sign_transform_schedule'
require_relative '../vendors/vistar/vistar_transform_schedule'
require_relative '../auth/auth_token_service'
require 'auth/client/request/auth_aware_request'
module SchedulePipeline
class ProcessSchedule
include Log::Loggable
def initialize(in_queue, out_queue, errors, tokens = Auth::AuthTokenService.instance)
@in_queue = in_queue
@out_queue = out_queue
@errors = errors
@tokens = tokens
end
def start
@in_queue.subscribe do |msg|
msg = Models::ScheduleProcessMsg.from_hash(msg) if msg.is_a? Hash
process_msg(msg)
end
end
def process_msg(msg)
logger.info("Process Schedule")
vendor = msg.vendor
schedule_transform = for_vendor(vendor)
unless schedule_transform
@errors.push("ProcessSchedule: Unknown vendor #{vendor}")
return
end
content_map = []
msg.vendor_schedule[:contents].each do |content|
# TODO: optimization: first check whether the content has already been ingested into VLE
vle_asset = create_asset(content)[:asset]
content_map << vle_asset
ingest_content(content, vle_asset[:meta][:presigned_url])
end
schedule = schedule_transform.call(vendor, msg.player, msg.vendor_schedule, content_map)
@out_queue.push(schedule)
true
rescue StandardError => e
@errors.push(e.message)
end
private
def create_asset(content)
sanitized_name = content[:name].gsub(/\s+/, '_')
Auth::Client::Request::AuthAwareRequest.new(
Vle::VleCreateAsset.new(
{
name: sanitized_name,
file: sanitized_name,
project_id: Vle::VleSettings.instance.target_project
}
),
@tokens
).call
end
def ingest_content(content, destination)
Vle::VleIngestAsset.new.call(content, destination)
end
def for_vendor(vendor)
vendor_map[vendor&.to_sym]
end
def vendor_map
@vendor_map ||= {
broad_sign: Vendors::BroadSign::BroadSignTransformSchedule.new,
vistar: Vendors::Vistar::VistarTransformSchedule.new
}
end
end
end

View File

@@ -0,0 +1,41 @@
# frozen_string_literal: true
require_relative '../log/loggable'
require_relative '../vle/vle_vendor_schedule'
require_relative 'models/schedule'
require_relative '../auth/auth_token_service'
require 'auth/client/request/auth_aware_request'
module SchedulePipeline
class PublishSchedule
include Log::Loggable
def initialize(queue, errors, tokens = Auth::AuthTokenService.instance)
@queue = queue
@errors = errors
@tokens = tokens
end
def start
@queue.subscribe do |msg|
msg = Models::Schedule.from_hash(msg) if msg.is_a? Hash
process_msg(msg)
end
end
def process_msg(msg)
logger.info("Publishing schedule: #{msg.name}...")
Auth::Client::Request::AuthAwareRequest.new(
Vle::VleVendorSchedule.new(
msg.vendor,
msg.player,
{ name: msg.name, start_time: msg.start_time, items: msg.to_hash[:items] }
),
@tokens
).call
logger.info("Published schedule: #{msg.name}")
rescue StandardError => e
@errors.push(e.message)
end
end
end

View File

@@ -0,0 +1,32 @@
# frozen_string_literal: true
require_relative '../../amqp/amqp_service'
module SchedulePipeline
module Queue
class BunnyQueue
def initialize(destination)
@destination = destination
end
def push(msg)
amqp.default_exchange.publish(msg.to_json, routing_key: @destination)
end
def subscribe(&block)
amqp_queue.subscribe do |_delivery_info, _metadata, payload|
block.call(JSON.parse(payload).with_indifferent_access)
end
end
private
def amqp
Amqp::AmqpService.instance
end
def amqp_queue
@queue ||= amqp.queue(@destination.to_s, auto_delete: false, durable: true)
end
end
end
end

View File

@@ -0,0 +1,15 @@
# frozen_string_literal: true
require_relative 'bunny_queue'
module SchedulePipeline
module Queue
class BunnyQueueFactory
include Singleton
def for_name(name)
BunnyQueue.new(name)
end
end
end
end

View File

@@ -0,0 +1,44 @@
# frozen_string_literal: true
require_relative '../log/loggable'
require_relative 'errors'
require_relative 'publish_schedule'
require_relative 'process_schedule'
require_relative 'fetch_schedule'
module SchedulePipeline
# Schedule Processing Pipeline
class SchedulePipeline
include Log::Loggable
def initialize(queue_factory)
errors = Errors.new
processed_schedules_queue = queue_factory.for_name(:processed_schedules)
publish_schedule = PublishSchedule.new(processed_schedules_queue, errors)
unprocessed_schedules_queue = queue_factory.for_name(:unprocessed_schedules)
process_schedule = ProcessSchedule.new(unprocessed_schedules_queue, processed_schedules_queue, errors)
fetch_schedule_queue = queue_factory.for_name(:fetch_vendor_schedules)
fetch_schedule = FetchSchedule.new(fetch_schedule_queue, unprocessed_schedules_queue, errors)
@queues = [processed_schedules_queue, unprocessed_schedules_queue, fetch_schedule_queue]
@stages = [publish_schedule, process_schedule, fetch_schedule]
end
def start
stages.each(&:start)
logger.info "Started Schedule Pipeline"
end
def stop
stages.reverse_each(&:stop)
end
private
def stages
@stages
end
def queues
@queues
end
end
end