Skip to content

Commit 0e6b2a5

Browse files
authored
Sp/clients (#87)
* start required refactoring for springboot 2.0 * finish refactoring Repository methods * finish refactoring Repository methods in tests * Fix models annotations * finish refactoring * refactoring configuration files * migrate remaining configuration files * Added stream consumer management UI * update changelog * fix CHANGELOG.md * Add test coverage for WebSocketConsumerManager * Add test coverage * fix code style violations
1 parent bc93f95 commit 0e6b2a5

File tree

23 files changed

+1555
-56
lines changed

23 files changed

+1555
-56
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
44

55
## 2.0.0 (UNRELEASED)
66

7-
### Breaking Changes
7+
- Added new Stream consumer management page at /configuration/stream
88
- Updated SpringBoot framework from 1.5.x to 2.0.4
9+
10+
### Breaking Changes
11+
912
TODO Write migration guide
1013

1114
## 1.0.5 (06/22/2018)

kafka-webview-ui/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@
160160
</exclusion>
161161
</exclusions>
162162
</dependency>
163+
164+
<!-- For parameterized tests -->
165+
<dependency>
166+
<groupId>pl.pragmatists</groupId>
167+
<artifactId>JUnitParams</artifactId>
168+
<version>1.1.1</version>
169+
<scope>test</scope>
170+
</dependency>
163171
</dependencies>
164172

165173
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2017, 2018 SourceLab.org (https://github.com/Crim/kafka-webview/)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package org.sourcelab.kafka.webview.ui.controller.configuration.stream;
26+
27+
import org.sourcelab.kafka.webview.ui.controller.BaseController;
28+
import org.sourcelab.kafka.webview.ui.manager.socket.StreamConsumerDetails;
29+
import org.sourcelab.kafka.webview.ui.manager.socket.WebSocketConsumersManager;
30+
import org.sourcelab.kafka.webview.ui.manager.ui.BreadCrumbManager;
31+
import org.sourcelab.kafka.webview.ui.manager.ui.FlashMessage;
32+
import org.sourcelab.kafka.webview.ui.model.Cluster;
33+
import org.sourcelab.kafka.webview.ui.model.User;
34+
import org.sourcelab.kafka.webview.ui.model.View;
35+
import org.sourcelab.kafka.webview.ui.repository.ClusterRepository;
36+
import org.sourcelab.kafka.webview.ui.repository.UserRepository;
37+
import org.sourcelab.kafka.webview.ui.repository.ViewRepository;
38+
import org.springframework.beans.factory.annotation.Autowired;
39+
import org.springframework.stereotype.Controller;
40+
import org.springframework.ui.Model;
41+
import org.springframework.web.bind.annotation.PathVariable;
42+
import org.springframework.web.bind.annotation.RequestMapping;
43+
import org.springframework.web.bind.annotation.RequestMethod;
44+
import org.springframework.web.servlet.mvc.support.RedirectAttributes;
45+
46+
import java.util.Collection;
47+
import java.util.HashMap;
48+
import java.util.HashSet;
49+
import java.util.Map;
50+
import java.util.Set;
51+
import java.util.stream.Collectors;
52+
53+
/**
54+
* Controller for Stream configuration.
55+
*/
56+
@Controller
57+
@RequestMapping("/configuration/stream")
58+
public class StreamConfigController extends BaseController {
59+
60+
@Autowired
61+
private ClusterRepository clusterRepository;
62+
63+
@Autowired
64+
private ViewRepository viewRepository;
65+
66+
@Autowired
67+
private UserRepository userRepository;
68+
69+
@Autowired
70+
private WebSocketConsumersManager webSocketConsumersManager;
71+
72+
/**
73+
* GET Displays currently active socket consumers.
74+
*/
75+
@RequestMapping(path = "", method = RequestMethod.GET)
76+
public String index(final Model model) {
77+
// Setup breadcrumbs
78+
setupBreadCrumbs(model, null, null);
79+
80+
// Retrieve all consumers
81+
final Collection<StreamConsumerDetails> consumers = webSocketConsumersManager.getConsumers();
82+
83+
// Loop thru and collect views and users
84+
final Set<Long> userIds = new HashSet<>();
85+
final Set<Long> viewIds = new HashSet<>();
86+
87+
consumers.forEach((consumer) -> {
88+
userIds.add(consumer.getUserId());
89+
viewIds.add(consumer.getViewId());
90+
91+
});
92+
93+
// Map by Id
94+
final Map<Long, User> userMap = new HashMap<>();
95+
final Map<Long, View> viewMap = new HashMap<>();
96+
final Map<Long, Cluster> clusterMap = new HashMap<>();
97+
98+
// Retrieve users and views
99+
userRepository.findAllById(userIds).forEach((user) -> userMap.put(user.getId(), user));
100+
viewRepository.findAllById(viewIds).forEach((view) -> viewMap.put(view.getId(), view));
101+
102+
// Build Cluster Map
103+
final Set<Long> clusterIds = viewMap.values()
104+
.stream()
105+
.map((view) -> view.getCluster().getId())
106+
.collect(Collectors.toSet());
107+
clusterRepository.findAllById(clusterIds).forEach((cluster) -> clusterMap.put(cluster.getId(), cluster));
108+
109+
// Add to UI model
110+
model.addAttribute("viewMap", viewMap);
111+
model.addAttribute("userMap", userMap);
112+
model.addAttribute("clusterMap", clusterMap);
113+
model.addAttribute("consumers", consumers);
114+
115+
return "configuration/stream/index";
116+
}
117+
118+
/**
119+
* POST deletes the selected cluster.
120+
*/
121+
@RequestMapping(path = "/close/{hash}", method = RequestMethod.POST)
122+
public String closeConsumer(@PathVariable final String hash, final RedirectAttributes redirectAttributes) {
123+
// Close by hash
124+
if (webSocketConsumersManager.removeConsumersForSessionHash(hash)) {
125+
// Notify requesting user.
126+
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newSuccess("Closed consumer!"));
127+
} else {
128+
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find consumer!"));
129+
}
130+
131+
// redirect to index
132+
return "redirect:/configuration/stream";
133+
}
134+
135+
private void setupBreadCrumbs(final Model model, final String name, final String url) {
136+
// Setup breadcrumbs
137+
final BreadCrumbManager manager = new BreadCrumbManager(model)
138+
.addCrumb("Configuration", "/configuration");
139+
140+
if (name != null) {
141+
manager.addCrumb("Streams", "/configuration/stream");
142+
manager.addCrumb(name, url);
143+
} else {
144+
manager.addCrumb("Streams", null);
145+
}
146+
}
147+
}

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/encryption/SecretManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public String encrypt(final String str) {
8585
final byte[] iv = params.getParameterSpec(IvParameterSpec.class).getIV();
8686
final byte[] encryptedText = cipher.doFinal(str.getBytes(StandardCharsets.UTF_8));
8787

88-
// concatenate salt + iv + ciphertext
88+
// concatenate salt + iv + cipher text
8989
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
9090
outputStream.write(salt);
9191
outputStream.write(iv);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2017, 2018 SourceLab.org (https://github.com/Crim/kafka-webview/)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package org.sourcelab.kafka.webview.ui.manager.encryption;
26+
27+
import java.io.UnsupportedEncodingException;
28+
import java.security.MessageDigest;
29+
import java.security.NoSuchAlgorithmException;
30+
import java.util.Formatter;
31+
32+
/**
33+
* Collection of utility methods around calculating SHA1 hashes.
34+
*/
35+
public class Sha1Tools {
36+
/**
37+
* Given an input, calculate the SHA1 hash of it and return the hash encoded in hex.
38+
* @param input input string to hash.
39+
* @return HEX'd result of SHA1 calculation.
40+
*/
41+
public static String sha1(final String input) {
42+
try {
43+
final MessageDigest crypt = MessageDigest.getInstance("SHA-1");
44+
crypt.reset();
45+
crypt.update(input.getBytes("UTF-8"));
46+
return byteToHex(crypt.digest());
47+
}
48+
catch (final NoSuchAlgorithmException | UnsupportedEncodingException exception) {
49+
throw new RuntimeException(exception.getMessage(), exception);
50+
}
51+
}
52+
53+
private static String byteToHex(final byte[] hash) {
54+
try (final Formatter formatter = new Formatter();) {
55+
for (final byte bit : hash) {
56+
formatter.format("%02x", bit);
57+
}
58+
return formatter.toString();
59+
}
60+
}
61+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2017, 2018 SourceLab.org (https://github.com/Crim/kafka-webview/)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package org.sourcelab.kafka.webview.ui.manager.socket;
26+
27+
import java.time.Instant;
28+
import java.util.Date;
29+
30+
/**
31+
* StreamConsumerDetails.
32+
* Immutable value class holding information about a consumer.
33+
*/
34+
public final class StreamConsumerDetails {
35+
private final long viewId;
36+
private final long userId;
37+
private final String sessionHash;
38+
private final long startedAtTimestamp;
39+
private final long recordCount;
40+
private final boolean isPaused;
41+
42+
/**
43+
* Constructor.
44+
* @param viewId id of the view the consumer is consuming.
45+
* @param userId id of the userx consuming.
46+
* @param sessionHash public session hash.
47+
* @param startedAtTimestamp unix timestamp of when consumer was created.
48+
* @param recordCount How many records the consumer has consumed.
49+
* @param isPaused if the consumer is currently paused.
50+
*/
51+
public StreamConsumerDetails(
52+
final long userId,
53+
final long viewId,
54+
final String sessionHash,
55+
final long startedAtTimestamp,
56+
final long recordCount,
57+
final boolean isPaused) {
58+
this.viewId = viewId;
59+
this.userId = userId;
60+
this.sessionHash = sessionHash;
61+
this.startedAtTimestamp = startedAtTimestamp;
62+
this.recordCount = recordCount;
63+
this.isPaused = isPaused;
64+
}
65+
66+
public long getUserId() {
67+
return userId;
68+
}
69+
70+
public long getViewId() {
71+
return viewId;
72+
}
73+
74+
public String getSessionHash() {
75+
return sessionHash;
76+
}
77+
78+
public long getStartedAtTimestamp() {
79+
return startedAtTimestamp;
80+
}
81+
82+
public Date getStartedAtDate() {
83+
return Date.from( Instant.ofEpochSecond( getStartedAtTimestamp() ) );
84+
}
85+
86+
public long getRecordCount() {
87+
return recordCount;
88+
}
89+
90+
public boolean isPaused() {
91+
return isPaused;
92+
}
93+
94+
@Override
95+
public String toString() {
96+
return "StreamConsumerDetails{"
97+
+ "userId=" + getUserId()
98+
+ ", viewId=" + getViewId()
99+
+ ", startedAtTimestamp=" + startedAtTimestamp
100+
+ ", recordCount=" + recordCount
101+
+ ", isPaused=" + isPaused
102+
+ '}';
103+
}
104+
}

0 commit comments

Comments
 (0)