Skip to content

Commit

Permalink
Merge pull request #19 from reiseburo/clippy
Browse files Browse the repository at this point in the history
Address all the concerns made by 📎
  • Loading branch information
rtyler authored May 11, 2020
2 parents d6430ae + 4f89067 commit 37ff647
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 119 deletions.
6 changes: 3 additions & 3 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Kafka {

warn!("Failed to connect to a Kafka broker");

return false;
false
}

/**
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Kafka {
timer.stop(handle);
m.counter("kafka.submitted").count(1);
}
Err((err, msg)) => {
Err((err, _)) => {
match err {
/*
* err_type will be one of RdKafkaError types defined:
Expand Down Expand Up @@ -219,7 +219,7 @@ fn metric_name_for(err: RDKafkaError) -> String {
if let Some(name) = err.to_string().to_lowercase().split(' ').next() {
return name.to_string();
}
return String::from("unknown");
String::from("unknown")
}

#[cfg(test)]
Expand Down
93 changes: 27 additions & 66 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_regex;
#[cfg(test)]
#[macro_use]
extern crate serde_json;
extern crate syslog_rfc5424;
Expand Down Expand Up @@ -91,7 +92,7 @@ fn main() -> Result<()> {
let settings = Arc::new(settings::load(settings_file));

if let Some(test_file) = matches.value_of("test") {
return task::block_on(rules::test_rules(&test_file, settings.clone()));
return task::block_on(rules::test_rules(&test_file, settings));
}

let metrics = Arc::new(
Expand All @@ -108,17 +109,21 @@ fn main() -> Result<()> {
info!("Listening on: {}", addr);

match &settings.global.listen.tls {
TlsType::CertAndKey { cert: _, key: _, ca: _ } => {
TlsType::CertAndKey {
cert: _,
key: _,
ca: _,
} => {
info!("Serving in TLS mode");
task::block_on(crate::serve_tls::accept_loop(
addr,
settings.clone(),
metrics.clone(),
metrics,
))
}
_ => {
info!("Serving in plaintext mode");
task::block_on(accept_loop(addr, settings.clone(), metrics.clone()))
task::block_on(accept_loop(addr, settings.clone(), metrics))
}
}
}
Expand Down Expand Up @@ -163,7 +168,9 @@ async fn accept_loop(
};

task::spawn(async move {
read_logs(reader, state).await;
if let Err(e) = read_logs(reader, state).await {
error!("Failed to read logs: {:?}", e);
}
debug!("Connection dropped");
});
}
Expand All @@ -190,11 +197,8 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(

let parsed = parse_message(line);

match &parsed {
Err(e) => {
error!("failed to parse message: {}", e);
}
_ => {}
if let Err(e) = &parsed {
error!("failed to parse message: {}", e);
}

/*
Expand Down Expand Up @@ -226,7 +230,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
/*
* Check to see if we have a jmespath first
*/
if rule.jmespath.len() > 0 {
if !rule.jmespath.is_empty() {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&msg.msg) {
// Search the data with the compiled expression
Expand All @@ -242,20 +246,22 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
}
}
}
} else if let Some(captures) = rule.regex.captures(&msg.msg) {
rule_matches = true;

for name in rule.regex.capture_names() {
if let Some(name) = name {
if let Some(value) = captures.name(name) {
hash.insert(name.to_string(), String::from(value.as_str()));
} else if let Some(regex) = &rule.regex {
if let Some(captures) = regex.captures(&msg.msg) {
rule_matches = true;

for name in regex.capture_names() {
if let Some(name) = name {
if let Some(value) = captures.name(name) {
hash.insert(name.to_string(), String::from(value.as_str()));
}
}
}
}
}
}
_ => {
debug!("unhandled `field` for this rule: {}", rule.regex);
warn!("unhandled `field` for rule");
}
}

Expand Down Expand Up @@ -291,7 +297,7 @@ pub async fn read_logs<R: async_std::io::Read + std::marker::Unpin>(
* If a custom output was never defined, just take the
* raw message and pass that along.
*/
if output.len() == 0 {
if output.is_empty() {
output = String::from(&msg.msg);
}

Expand Down Expand Up @@ -363,7 +369,7 @@ fn perform_merge(
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
return Ok(rendered);
}
return Ok(output);
Ok(output)
} else {
Err("Failed to render".to_string())
}
Expand All @@ -374,33 +380,6 @@ fn perform_merge(
}
}

/**
* merge_and_render will take care of merging the two values and manage the
* rendering of variable substitutions
*/
fn merge_and_render<'a>(
mut left: &mut serde_json::Value,
right: &serde_json::Value,
state: &RuleState<'a>,
) -> String {
merge::merge(&mut left, &right);

let output = serde_json::to_string(&left).unwrap();

/*
* This is a bit inefficient, but until I can figure out a better way
* to render the variables that are being substituted in a merged JSON
* object, hotdog will just render the JSON object and then render it
* as a template.
*
* what could possibly go wrong
*/
if let Ok(rendered) = state.hb.render_template(&output, &state.variables) {
return rendered;
}
return output;
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -490,22 +469,4 @@ mod tests {
let output = perform_merge("{}", &to_merge, &state);
assert_eq!(output, Ok("{\"hello\":\"world\"}".to_string()));
}

#[test]
fn test_merge_and_render() {
let mut hash = HashMap::<String, String>::new();
hash.insert("value".to_string(), "hi".to_string());

let hb = Handlebars::new();
let state = RuleState {
hb: &hb,
variables: &hash,
};

let mut origin = json!({"rust" : true});
let config = json!({"test" : "{{value}}"});

let buf = merge_and_render(&mut origin, &config, &state);
assert_eq!(buf, r#"{"rust":true,"test":"hi"}"#);
}
}
13 changes: 8 additions & 5 deletions src/merge.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* Disabling this lint because I don't want to fix it in this imported code
*/
#![allow(clippy::clone_double_ref)]
/**
* This module is from the crate json_value_merge
* <https://github.com/jmfiaschi/json_value_merge/>
*
* It is licensed under the MIT license
*/
extern crate serde_json;

use serde_json::{Map, Value};

/// Trait used to merge Json Values
Expand Down Expand Up @@ -85,8 +87,9 @@ pub fn merge(a: &mut Value, b: &Value) {
}
}

fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -> () {
let mut fields: Vec<&str> = json_pointer.split("/").skip(1).collect();
fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) {
let mut fields: Vec<&str> = json_pointer.split('/').skip(1).collect();
//let first_field = fields[0].clone();
let first_field = fields[0].clone();
fields.remove(0);
let next_fields = fields;
Expand All @@ -100,7 +103,7 @@ fn merge_in(json_value: &mut Value, json_pointer: &str, new_json_value: Value) -
match json_value.pointer_mut(format!("/{}", first_field).as_str()) {
// Find the field and the json_value_targeted.
Some(json_targeted) => {
if 0 < next_fields.len() {
if !next_fields.is_empty() {
merge_in(
json_targeted,
format!("/{}", next_fields.join("/")).as_ref(),
Expand Down
29 changes: 14 additions & 15 deletions src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,30 @@ pub async fn test_rules(file_name: &str, settings: Arc<Settings>) -> Result<()>
while let Some(line) = lines.next().await {
let line = line?;
debug!("Testing the line: {}", line);
number = number + 1;
number += 1;
let mut matches: Vec<&str> = vec![];

for rule in settings.rules.iter() {
match rule.field {
Field::Msg => {
if rule.jmespath.len() > 0 {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&line) {
// Search the data with the compiled expression
if let Ok(result) = expr.search(data) {
if !result.is_null() {
matches.push(&rule.jmespath);
}
if let Field::Msg = rule.field {
if !rule.jmespath.is_empty() {
let expr = jmespath::compile(&rule.jmespath).unwrap();
if let Ok(data) = jmespath::Variable::from_json(&line) {
// Search the data with the compiled expression
if let Ok(result) = expr.search(data) {
if !result.is_null() {
matches.push(&rule.jmespath);
}
}
} else if let Some(_captures) = rule.regex.captures(&line) {
matches.push(&rule.regex.as_str());
}
} else if let Some(regex) = &rule.regex {
if let Some(_captures) = regex.captures(&line) {
matches.push(&regex.as_str());
}
}
_ => {}
}
}

if matches.len() > 0 {
if !matches.is_empty() {
println!("Line {} matches on:", number);
for m in matches.iter() {
println!("\t - {}", m);
Expand Down
40 changes: 22 additions & 18 deletions src/serve_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ use dipstick::*;
use log::*;
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
use rustls::{
Certificate,
AllowAnyAnonymousOrAuthenticatedClient,
NoClientAuth,
PrivateKey,
RootCertStore,
ServerConfig
AllowAnyAnonymousOrAuthenticatedClient, Certificate, NoClientAuth, PrivateKey, RootCertStore,
ServerConfig,
};
use std::path::Path;

Expand All @@ -44,14 +40,14 @@ fn load_keys(path: &Path) -> io::Result<Vec<PrivateKey>> {
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));

if let Ok(keys) = result {
if keys.len() == 0 {
if keys.is_empty() {
debug!("Failed to load key as RSA, trying PKCS8");
return pkcs8_private_keys(&mut std::io::BufReader::new(std::fs::File::open(path)?))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"));
}
return Ok(keys);
}
return result;
result
}

/// Configure the server using rusttls
Expand All @@ -64,18 +60,22 @@ fn load_tls_config(settings: &Settings) -> io::Result<ServerConfig> {
let certs = load_certs(cert.as_path())?;
let mut keys = load_keys(key.as_path())?;

if keys.len() <= 0 {
if keys.is_empty() {
panic!("TLS key could not be properly loaded! This is fatal!");
}

let mut verifier = NoClientAuth::new();

if ca.is_some() {
let verifier = if ca.is_some() {
let ca_path = ca.as_ref().unwrap();
let mut store = RootCertStore::empty();
store.add_pem_file(&mut std::io::BufReader::new(std::fs::File::open(ca_path.as_path())?));
verifier = AllowAnyAnonymousOrAuthenticatedClient::new(store);
}
if let Err(e) = store.add_pem_file(&mut std::io::BufReader::new(
std::fs::File::open(ca_path.as_path())?,
)) {
error!("Failed to add the CA properly, certificate verification may not work as expected: {:?}", e);
}
AllowAnyAnonymousOrAuthenticatedClient::new(store)
} else {
NoClientAuth::new()
};

// we don't use client authentication
let mut config = ServerConfig::new(verifier);
Expand Down Expand Up @@ -138,7 +138,7 @@ pub async fn accept_loop(

loop {
if let Ok(count) = conn_rx.recv() {
connections = connections + count;
connections += count;
debug!("Connection count now {}", connections);
counter.value(connections);
}
Expand All @@ -163,7 +163,9 @@ pub async fn accept_loop(
let ctx = conn_tx.clone();

task::spawn(async move {
handle_connection(&acceptor, &mut stream, state).await;
if let Err(e) = handle_connection(&acceptor, &mut stream, state).await {
error!("Failed to handle the connection properly: {:?}", e);
}
ctx.send(-1).unwrap();
});
}
Expand All @@ -186,7 +188,9 @@ async fn handle_connection(
let tls_stream = handshake.await?;
let reader = BufReader::new(tls_stream);

read_logs(reader, state).await;
if let Err(e) = read_logs(reader, state).await {
error!("Failed to read logs properly: {:?}", e);
}
Ok(())
}

Expand Down
Loading

0 comments on commit 37ff647

Please sign in to comment.