Skip to content
This repository was archived by the owner on Sep 25, 2023. It is now read-only.

Commit 093a3ee

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 43c6e8d + 233817d commit 093a3ee

39 files changed

+862
-451
lines changed

src/main/java/org/atlasapi/equiv/EquivModule.java

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static org.atlasapi.media.entity.Publisher.TALK_TALK;
3535
import static org.atlasapi.media.entity.Publisher.YOUTUBE;
3636
import static org.atlasapi.media.entity.Publisher.YOUVIEW;
37+
import static org.atlasapi.media.entity.Publisher.YOUVIEW_STAGE;
3738

3839
import java.io.File;
3940
import java.util.Set;
@@ -71,7 +72,6 @@
7172
import org.atlasapi.equiv.results.persistence.FileEquivalenceResultStore;
7273
import org.atlasapi.equiv.results.persistence.RecentEquivalenceResultStore;
7374
import org.atlasapi.equiv.results.scores.Score;
74-
import org.atlasapi.equiv.scorers.BroadcastItemTitleScorer;
7575
import org.atlasapi.equiv.scorers.ContainerHierarchyMatchingScorer;
7676
import org.atlasapi.equiv.scorers.CrewMemberScorer;
7777
import org.atlasapi.equiv.scorers.EquivalenceScorer;
@@ -88,32 +88,38 @@
8888
import org.atlasapi.equiv.update.NullEquivalenceUpdater;
8989
import org.atlasapi.equiv.update.SourceSpecificEquivalenceUpdater;
9090
import org.atlasapi.media.channel.ChannelResolver;
91+
import org.atlasapi.media.entity.Broadcast;
9192
import org.atlasapi.media.entity.Container;
9293
import org.atlasapi.media.entity.Content;
9394
import org.atlasapi.media.entity.Item;
9495
import org.atlasapi.media.entity.Publisher;
9596
import org.atlasapi.media.entity.Song;
96-
import org.atlasapi.messaging.v3.AtlasMessagingModule;
97+
import org.atlasapi.messaging.v3.ContentEquivalenceAssertionMessage;
98+
import org.atlasapi.messaging.v3.JacksonMessageSerializer;
99+
import org.atlasapi.messaging.v3.KafkaMessagingModule;
97100
import org.atlasapi.persistence.content.ContentResolver;
98101
import org.atlasapi.persistence.content.ScheduleResolver;
99102
import org.atlasapi.persistence.content.SearchResolver;
100103
import org.atlasapi.persistence.lookup.LookupWriter;
104+
import org.joda.time.DateTime;
101105
import org.joda.time.Duration;
102106
import org.springframework.beans.factory.annotation.Autowired;
103107
import org.springframework.beans.factory.annotation.Value;
104108
import org.springframework.context.annotation.Bean;
105109
import org.springframework.context.annotation.Configuration;
106110
import org.springframework.context.annotation.Import;
107-
import org.springframework.jms.core.JmsTemplate;
108111

109112
import com.google.common.base.Predicate;
113+
import com.google.common.base.Predicates;
110114
import com.google.common.collect.ImmutableList;
111115
import com.google.common.collect.ImmutableSet;
112116
import com.google.common.collect.Iterables;
113117
import com.google.common.collect.Sets;
118+
import com.metabroadcast.common.queue.MessageSender;
119+
import com.metabroadcast.common.time.DateTimeZones;
114120

115121
@Configuration
116-
@Import({AtlasMessagingModule.class})
122+
@Import({KafkaMessagingModule.class})
117123
public class EquivModule {
118124

119125
private @Value("${equiv.results.directory}") String equivResultsDirectory;
@@ -126,7 +132,7 @@ public class EquivModule {
126132
private @Autowired EquivalenceSummaryStore equivSummaryStore;
127133
private @Autowired LookupWriter lookupWriter;
128134

129-
private @Autowired AtlasMessagingModule messaging;
135+
private @Autowired KafkaMessagingModule messaging;
130136

131137
public @Bean RecentEquivalenceResultStore equivalenceResultStore() {
132138
return new RecentEquivalenceResultStore(new FileEquivalenceResultStore(new File(equivResultsDirectory)));
@@ -143,8 +149,10 @@ private EquivalenceResultHandler<Container> containerResultHandlers(Iterable<Pub
143149
}
144150

145151
@Bean
146-
protected JmsTemplate equivAssertDestination() {
147-
return messaging.queueHelper().makeVirtualTopicProducer(equivAssertDest);
152+
protected MessageSender<ContentEquivalenceAssertionMessage> equivAssertDestination() {
153+
return messaging.messageSenderFactory()
154+
.makeMessageSender(equivAssertDest,
155+
JacksonMessageSerializer.forType(ContentEquivalenceAssertionMessage.class));
148156
}
149157

150158
private <T extends Content> EquivalenceFilter<T> standardFilter() {
@@ -161,10 +169,14 @@ private <T extends Content> EquivalenceFilter<T> standardFilter(Iterable<Equival
161169
}
162170

163171
private ContentEquivalenceUpdater.Builder<Item> standardItemUpdater(Set<Publisher> acceptablePublishers, Set<? extends EquivalenceScorer<Item>> scorers) {
172+
return standardItemUpdater(acceptablePublishers, scorers, Predicates.alwaysTrue());
173+
}
174+
175+
private ContentEquivalenceUpdater.Builder<Item> standardItemUpdater(Set<Publisher> acceptablePublishers, Set<? extends EquivalenceScorer<Item>> scorers, Predicate<? super Broadcast> filter) {
164176
return ContentEquivalenceUpdater.<Item> builder()
165177
.withGenerators(ImmutableSet.<EquivalenceGenerator<Item>> of(
166178
new BroadcastMatchingItemEquivalenceGenerator(scheduleResolver,
167-
channelResolver, acceptablePublishers, Duration.standardMinutes(10))
179+
channelResolver, acceptablePublishers, Duration.standardMinutes(10), filter)
168180
))
169181
.withScorers(scorers)
170182
.withCombiner(new NullScoreAwareAveragingCombiner<Item>())
@@ -220,7 +232,7 @@ public boolean apply(Publisher input) {
220232
Set<Publisher> acceptablePublishers = ImmutableSet.copyOf(Sets.difference(
221233
Publisher.all(),
222234
Sets.union(
223-
ImmutableSet.of(PREVIEW_NETWORKS, BBC_REDUX, RADIO_TIMES, LOVEFILM, NETFLIX, YOUVIEW),
235+
ImmutableSet.of(PREVIEW_NETWORKS, BBC_REDUX, RADIO_TIMES, LOVEFILM, NETFLIX, YOUVIEW, YOUVIEW_STAGE),
224236
Sets.union(musicPublishers, roviPublishers)
225237
)
226238
));
@@ -230,7 +242,7 @@ public boolean apply(Publisher input) {
230242
EquivalenceUpdater<Container> topLevelContainerUpdater = topLevelContainerUpdater(acceptablePublishers);
231243

232244
Set<Publisher> nonStandardPublishers = ImmutableSet.copyOf(Sets.union(
233-
ImmutableSet.of(ITUNES, BBC_REDUX, RADIO_TIMES, FACEBOOK, LOVEFILM, NETFLIX, YOUVIEW, TALK_TALK, PA),
245+
ImmutableSet.of(ITUNES, BBC_REDUX, RADIO_TIMES, FACEBOOK, LOVEFILM, NETFLIX, YOUVIEW, YOUVIEW_STAGE, TALK_TALK, PA),
234246
Sets.union(musicPublishers, roviPublishers)
235247
));
236248
final EquivalenceUpdaters updaters = new EquivalenceUpdaters();
@@ -242,31 +254,37 @@ public boolean apply(Publisher input) {
242254
.build());
243255
}
244256

245-
Set<Publisher> paPublishers = Sets.union(acceptablePublishers, ImmutableSet.of(YOUVIEW));
246-
247-
updaters.register(PA, SourceSpecificEquivalenceUpdater.builder(PA)
248-
.withItemUpdater(standardItemUpdater(paPublishers, ImmutableSet.<EquivalenceScorer<Item>>of()).build())
249-
.withTopLevelContainerUpdater(topLevelContainerUpdater(paPublishers))
250-
.withNonTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
251-
.build());
252-
253257
updaters.register(RADIO_TIMES, SourceSpecificEquivalenceUpdater.builder(RADIO_TIMES)
254258
.withItemUpdater(rtItemEquivalenceUpdater())
255259
.withTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
256260
.withNonTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
257261
.build());
258262

259-
Set<Publisher> youViewPublishers = Sets.union(acceptablePublishers, ImmutableSet.of(YOUVIEW));
263+
Set<Publisher> youViewPublishers = Sets.union(Sets.difference(acceptablePublishers, ImmutableSet.of(YOUVIEW_STAGE)), ImmutableSet.of(YOUVIEW));
264+
Predicate<Broadcast> youviewBroadcastFilter = new Predicate<Broadcast>(){
265+
@Override
266+
public boolean apply(Broadcast input) {
267+
DateTime twoWeeksAgo = new DateTime(DateTimeZones.UTC).minusDays(15);
268+
return input.getTransmissionTime().isAfter(twoWeeksAgo);
269+
}
270+
};
260271
updaters.register(YOUVIEW, SourceSpecificEquivalenceUpdater.builder(YOUVIEW)
261-
.withItemUpdater(broadcastItemEquivalenceUpdater(youViewPublishers, Score.negativeOne()))
272+
.withItemUpdater(broadcastItemEquivalenceUpdater(youViewPublishers, Score.negativeOne(),youviewBroadcastFilter))
262273
.withTopLevelContainerUpdater(broadcastItemContainerEquivalenceUpdater(youViewPublishers))
263274
.withNonTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
264275
.build());
276+
277+
Set<Publisher> youViewStagePublishers = Sets.union(Sets.difference(acceptablePublishers, ImmutableSet.of(YOUVIEW)), ImmutableSet.of(YOUVIEW_STAGE));
278+
updaters.register(YOUVIEW_STAGE, SourceSpecificEquivalenceUpdater.builder(YOUVIEW_STAGE)
279+
.withItemUpdater(broadcastItemEquivalenceUpdater(youViewStagePublishers, Score.negativeOne(),youviewBroadcastFilter))
280+
.withTopLevelContainerUpdater(broadcastItemContainerEquivalenceUpdater(youViewStagePublishers))
281+
.withNonTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
282+
.build());
265283

266284
Set<Publisher> reduxPublishers = Sets.union(acceptablePublishers, ImmutableSet.of(BBC_REDUX));
267285

268286
updaters.register(BBC_REDUX, SourceSpecificEquivalenceUpdater.builder(BBC_REDUX)
269-
.withItemUpdater(broadcastItemEquivalenceUpdater(reduxPublishers, Score.nullScore()))
287+
.withItemUpdater(broadcastItemEquivalenceUpdater(reduxPublishers, Score.nullScore(), Predicates.alwaysTrue()))
270288
.withTopLevelContainerUpdater(broadcastItemContainerEquivalenceUpdater(reduxPublishers))
271289
.withNonTopLevelContainerUpdater(NullEquivalenceUpdater.<Container>get())
272290
.build());
@@ -466,12 +484,13 @@ private EquivalenceUpdater<Container> broadcastItemContainerEquivalenceUpdater(S
466484
.build();
467485
}
468486

469-
private EquivalenceUpdater<Item> broadcastItemEquivalenceUpdater(Set<Publisher> sources, Score titleMismatch) {
487+
private EquivalenceUpdater<Item> broadcastItemEquivalenceUpdater(Set<Publisher> sources, Score titleMismatch,
488+
Predicate<? super Broadcast> filter) {
470489
return standardItemUpdater(sources, ImmutableSet.of(
471490
new TitleMatchingItemScorer(),
472491
new SequenceItemScorer(),
473492
new TitleSubsetBroadcastItemScorer(contentResolver, titleMismatch, 80/*percent*/)
474-
)).build();
493+
), filter).build();
475494
}
476495

477496
private EquivalenceUpdater<Item> rtItemEquivalenceUpdater() {

src/main/java/org/atlasapi/equiv/EquivTaskModule.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.atlasapi.media.entity.Publisher.ROVI_EN_US;
1616
import static org.atlasapi.media.entity.Publisher.TALK_TALK;
1717
import static org.atlasapi.media.entity.Publisher.YOUVIEW;
18+
import static org.atlasapi.media.entity.Publisher.YOUVIEW_STAGE;
1819

1920
import java.util.Set;
2021

@@ -38,7 +39,9 @@
3839
import org.atlasapi.media.entity.Container;
3940
import org.atlasapi.media.entity.Content;
4041
import org.atlasapi.media.entity.Publisher;
41-
import org.atlasapi.messaging.v3.AtlasMessagingModule;
42+
import org.atlasapi.messaging.v3.EntityUpdatedMessage;
43+
import org.atlasapi.messaging.v3.JacksonMessageSerializer;
44+
import org.atlasapi.messaging.v3.KafkaMessagingModule;
4245
import org.atlasapi.persistence.content.ContentResolver;
4346
import org.atlasapi.persistence.content.ScheduleResolver;
4447
import org.atlasapi.persistence.content.listing.ContentLister;
@@ -52,36 +55,46 @@
5255
import org.atlasapi.remotesite.youview.YouViewChannelResolver;
5356
import org.joda.time.Duration;
5457
import org.joda.time.LocalTime;
58+
import org.slf4j.Logger;
59+
import org.slf4j.LoggerFactory;
5560
import org.springframework.beans.factory.annotation.Autowired;
5661
import org.springframework.beans.factory.annotation.Qualifier;
5762
import org.springframework.beans.factory.annotation.Value;
5863
import org.springframework.context.annotation.Bean;
5964
import org.springframework.context.annotation.Configuration;
6065
import org.springframework.context.annotation.Import;
6166
import org.springframework.context.annotation.Lazy;
62-
import org.springframework.jms.listener.DefaultMessageListenerContainer;
6367

6468
import com.google.common.base.Function;
69+
import com.google.common.base.Optional;
6570
import com.google.common.base.Predicate;
6671
import com.google.common.base.Predicates;
6772
import com.google.common.collect.ImmutableList;
6873
import com.google.common.collect.ImmutableSet;
6974
import com.google.common.collect.Iterables;
75+
import com.google.common.util.concurrent.MoreExecutors;
76+
import com.google.common.util.concurrent.Service.Listener;
77+
import com.google.common.util.concurrent.Service.State;
7078
import com.metabroadcast.common.persistence.mongo.DatabasedMongo;
79+
import com.metabroadcast.common.queue.kafka.KafkaConsumer;
7180
import com.metabroadcast.common.scheduling.RepetitionRule;
7281
import com.metabroadcast.common.scheduling.RepetitionRules;
7382
import com.metabroadcast.common.scheduling.SimpleScheduler;
7483

7584
@Configuration
76-
@Import({EquivModule.class, AtlasMessagingModule.class})
85+
@Import({EquivModule.class, KafkaMessagingModule.class})
7786
public class EquivTaskModule {
87+
88+
private final Logger log = LoggerFactory.getLogger(getClass());
7889

7990
private static final Set<String> ignored = ImmutableSet.of("http://www.bbc.co.uk/programmes/b006mgyl");
8091
// private static final RepetitionRule EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(9, 00));
8192
private static final RepetitionRule RT_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(7, 00));
8293
private static final RepetitionRule TALKTALK_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(11, 15));
8394
private static final RepetitionRule YOUVIEW_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(15, 00));
95+
private static final RepetitionRule YOUVIEW_STAGE_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(8, 00));
8496
private static final RepetitionRule YOUVIEW_SCHEDULE_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(13, 00));
97+
private static final RepetitionRule YOUVIEW_STAGE_SCHEDULE_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(9, 00));
8598
private static final RepetitionRule BBC_SCHEDULE_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(9, 00));
8699
private static final RepetitionRule ITV_SCHEDULE_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(11, 00));
87100
private static final RepetitionRule ITV_EQUIVALENCE_REPETITION = RepetitionRules.daily(new LocalTime(12, 00));
@@ -108,7 +121,7 @@ public class EquivTaskModule {
108121
private @Autowired @Qualifier("contentUpdater") EquivalenceUpdater<Content> equivUpdater;
109122
private @Autowired RecentEquivalenceResultStore equivalenceResultStore;
110123

111-
private @Autowired AtlasMessagingModule messaging;
124+
private @Autowired KafkaMessagingModule messaging;
112125

113126
@PostConstruct
114127
public void scheduleUpdater() {
@@ -125,6 +138,7 @@ public void scheduleUpdater() {
125138
taskScheduler.schedule(publisherUpdateTask(LOVEFILM).withName("Lovefilm Equivalence Updater"), RepetitionRules.every(Duration.standardHours(12)).withOffset(Duration.standardHours(10)));
126139
taskScheduler.schedule(publisherUpdateTask(NETFLIX).withName("Netflix Equivalence Updater"), RepetitionRules.NEVER);
127140
taskScheduler.schedule(publisherUpdateTask(YOUVIEW).withName("YouView Equivalence Updater"), YOUVIEW_EQUIVALENCE_REPETITION);
141+
taskScheduler.schedule(publisherUpdateTask(YOUVIEW_STAGE).withName("YouView Stage Equivalence Updater"), YOUVIEW_STAGE_EQUIVALENCE_REPETITION);
128142
taskScheduler.schedule(publisherUpdateTask(TALK_TALK).withName("TalkTalk Equivalence Updater"), TALKTALK_EQUIVALENCE_REPETITION);
129143
taskScheduler.schedule(publisherUpdateTask(ROVI_EN_GB).withName("Rovi EN-GB Equivalence Updater"), ROVI_EN_GB_EQUIVALENCE_REPETITION);
130144
taskScheduler.schedule(publisherUpdateTask(ROVI_EN_US).withName("Rovi EN-US Equivalence Updater"), ROVI_EN_GB_EQUIVALENCE_REPETITION);
@@ -136,6 +150,11 @@ public void scheduleUpdater() {
136150
.withChannels(youViewChannelResolver().getAllChannels())
137151
.build().withName("YouView Schedule Equivalence (8 day) Updater"),
138152
YOUVIEW_SCHEDULE_EQUIVALENCE_REPETITION);
153+
taskScheduler.schedule(taskBuilder(0, 7)
154+
.withPublishers(YOUVIEW_STAGE)
155+
.withChannels(youViewChannelResolver().getAllChannels())
156+
.build().withName("YouView Stage Schedule Equivalence (8 day) Updater"),
157+
YOUVIEW_STAGE_SCHEDULE_EQUIVALENCE_REPETITION);
139158
taskScheduler.schedule(taskBuilder(0, 7)
140159
.withPublishers(BBC)
141160
.withChannels(bbcChannels())
@@ -245,7 +264,7 @@ private Iterable<Channel> bbcReduxChannels() {
245264
private EquivalenceUpdatingWorker equivUpdatingWorker() {
246265
return new EquivalenceUpdatingWorker(contentResolver, lookupStore, equivalenceResultStore, equivUpdater,
247266
Predicates.or(ImmutableList.<Predicate<? super Content>>of(
248-
sourceIsIn(BBC_REDUX, YOUVIEW),
267+
sourceIsIn(BBC_REDUX, YOUVIEW, YOUVIEW_STAGE),
249268
Predicates.and(Predicates.instanceOf(Container.class),
250269
sourceIsIn(BBC, C4, C4_PMLSD, ITV, FIVE, BBC_REDUX, ITUNES,
251270
RADIO_TIMES, LOVEFILM, TALK_TALK, YOUVIEW,NETFLIX))
@@ -265,13 +284,35 @@ public boolean apply(Content input) {
265284

266285
@Bean
267286
@Lazy(true)
268-
public DefaultMessageListenerContainer equivalenceUpdatingMessageListener() {
287+
public Optional<KafkaConsumer> equivalenceUpdatingMessageListener() {
269288
if (streamedChangesUpdateEquiv) {
270-
return messaging.queueHelper().makeVirtualTopicConsumer(
271-
equivUpdatingWorker(), "EquivUpdater", contentChanges,
272-
defaultStreamedEquivUpdateConsumers, maxStreamedEquivUpdateConsumers);
289+
return Optional.of(messaging.messageConsumerFactory().createConsumer(
290+
equivUpdatingWorker(), JacksonMessageSerializer.forType(EntityUpdatedMessage.class),
291+
contentChanges, "EquivUpdater")
292+
.withDefaultConsumers(defaultStreamedEquivUpdateConsumers)
293+
.withMaxConsumers(maxStreamedEquivUpdateConsumers)
294+
.build());
273295
} else {
274-
return messaging.queueHelper().noopContainer();
296+
return Optional.absent();
297+
}
298+
}
299+
300+
@PostConstruct
301+
public void startConsumer() {
302+
Optional<KafkaConsumer> consumer = equivalenceUpdatingMessageListener();
303+
if (consumer.isPresent()) {
304+
consumer.get().addListener(new Listener() {
305+
@Override
306+
public void failed(State from, Throwable failure) {
307+
log.warn("equiv update listener failed to transition from " + from, failure);
308+
}
309+
@Override
310+
public void running() {
311+
log.info("equiv update listener running");
312+
}
313+
314+
}, MoreExecutors.sameThreadExecutor());
315+
consumer.get().startAsync();
275316
}
276317
}
277318

0 commit comments

Comments
 (0)