Pub-Sub design pattern is usually known as Observer pattern. I personally like to refer this pattern as Pub-Sub
which I think better captures the essence of this pattern.
In GoF’s book Design Patterns
, it is defined as follows:
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically
Let’s look at the classical implementation of this design pattern.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface Subject {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers(Message message);
}
public interface Observer {
void update(Message message);
}
public class ConcreteSubject implements Subject {
private List observers = new ArrayList();
@Override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@Override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyObservers(Message message) {
for (Observer observer : observers) {
observer.update(message);
}
}
}
public class ConcreteObserverOne implements Observer {
@Override
public void update(Message message) {
//TODO obtain notifications and process
System.out.println("ConcreteObserverOne is notified.");
}
}
public class ConcreteObserverTwo implements Observer {
@Override
public void update(Message message) {
//TODO obtain notifications and process
System.out.println("ConcreteObserverTwo is notified.");
}
Let’s take a look at a concrete example and see how this pattern can help. Suppose we are developing a workflow management system. Whenever a workflow is created, we need to register the configuration of the workflow into elastic storage and push workflow into execution queue. We can implement this like the following:
1
2
3
4
5
6
7
8
9
10
11
12
public class WorkflowManager {
private StorageService storageService; // dependency injection
private ExecutionQueue executionQueue; // dependency injection
public void register(Workflow workflow) {
// ...
WorkflowConfig config = workflow.getConfiguration();
storageService.save(config);
executionQueue.push(workflow);
// ...
}
}
Now suppose, say after a workflow is registered, we put it on hold and notify the creator that the workflow has been validated and created and is ready to run instead of putting it into execution. In this case, we need to frequently modify the register() function code, which violates the open-close principle. Moreover, suppose a successful registration will require more and more follow-up operations. In that case, the logic of the register() function will become more and more complex, which will affect the readability and maintainability of the code. This is where the observer pattern comes in handy. Using the observer pattern, we can refactor the above code like the following:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface RegObserver {
void onRegSuccess(Workflow workflow);
}
public class RegStorageObserver implements RegObserver {
private StorageService storageService;
@Override
public void onRegSuccess(Workflow workflow) {
WorkflowConfig config = workflow.getConfiguration();
storageService.save(config);
}
}
public class RegNotificationObserver implements RegObserver {
private NotificationService notificationService;
@Override
public void onRegSuccess(Workflow workflow) {
Contact owner = workflow.getOwner();
notificationService.sendMail(owner, "Workflow is ready...");
}
}
public class WorkflowManager {
private WorkflowValidator validator;
private List<RegObserver> regObservers = new ArrayList<>();
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public void register(Workflow workflow) {
if (!validator.validate(workflow)) {
// ...
}
for (RegObserver observer : regObservers) {
observer.onRegSuccess(workflow);
}
}
}
Appendix
Following is a Golang implementation of Pub-Sub pattern
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// subject.go
type subject interface {
registerObserver(Observer observer)
removeObserver(Observer observer)
notifyObservers(string)
}
// event.go implements subject
type event struct {
observers []observer
name string
}
func newEvent(name string) *event {
return &event{
name: name,
}
}
func (e *event) registerObserver(o observer) {
i.observers = append(e.observers, o)
}
func (e *event) removeObserver(o observer) {
i.observerList = removeFromslice(e.observers, o)
}
func (e *event) notifyObservers() {
for _, observer := range e.observers {
observer.update(e.name)
}
}
// observer.go
type observer interface {
update(string)
}
// listener.go implements observer
type listener struct {
id string
}
func (l *listener) update(eventMsg string) {
fmt.Printf("Listener %s received event %s\n", l.id, eventMsg)
}
Following is a C++ example of the Pub-Sub pattern
:
1