Skip to content
This repository was archived by the owner on Apr 3, 2025. It is now read-only.

Commit 4a7f3ff

Browse files
committed
feat: apply eBPF acceleration for TCP transmission
1 parent c7a6407 commit 4a7f3ff

File tree

9 files changed

+251
-21
lines changed

9 files changed

+251
-21
lines changed

README.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,81 @@ just deploy-agent
4141
- [ ] Build an XDP-based BGP Peering Router
4242
- [ ] Implement Service Load Balancing
4343
- [ ] Collect Network Telemetry with eBPF
44+
45+
### TCP Acceleration
46+
47+
An eBPF program has been applied to accelerate TCP transmission between pods communicating on the same host machine. This avoids unnecessary traversing through the Linux network stack, enabling efficient communication between local socket pairs.
48+
49+
#### Without eBPF Acceleration
50+
51+
```sh
52+
Get "http://10.244.1.2" with for 10s using 50 connections
53+
Statistics Avg Stdev Max
54+
Reqs/sec 77688.89 1898.47 80410.00
55+
Latency 592.08µs 411.46µs 8.69ms
56+
Latency Distribution
57+
50% 309.77µs
58+
75% 409.98µs
59+
90% 490.44µs
60+
99% 571.86µs
61+
HTTP codes:
62+
1XX - 0, 2XX - 777807, 3XX - 0, 4XX - 0, 5XX - 0
63+
others - 0
64+
Throughput: 84447.90/s
65+
66+
Get "http://10.244.1.2" with for 30s using 500 connections
67+
Statistics Avg Stdev Max
68+
Reqs/sec 73050.03 1374.04 75450.00
69+
Latency 791.10µs 822.26µs 54.64ms
70+
Latency Distribution
71+
50% 361.55µs
72+
75% 503.02µs
73+
90% 622.58µs
74+
99% 749.10µs
75+
HTTP codes:
76+
1XX - 0, 2XX - 2192021, 3XX - 0, 4XX - 0, 5XX - 0
77+
others - 0
78+
Throughput: 632031.35/s
79+
```
80+
81+
### With eBPF Acceleration
82+
83+
```sh
84+
Get "http://10.244.1.2" with for 10s using 50 connections
85+
Statistics Avg Stdev Max
86+
Reqs/sec 81633.44 1638.01 84030.00
87+
Latency 539.51µs 366.21µs 11.88ms
88+
Latency Distribution
89+
50% 285.54µs
90+
75% 377.27µs
91+
90% 449.91µs
92+
99% 521.56µs
93+
HTTP codes:
94+
1XX - 0, 2XX - 812374, 3XX - 0, 4XX - 0, 5XX - 0
95+
others - 0
96+
Throughput: 92676.69/s
97+
98+
Get "http://10.244.1.2" with for 30s using 500 connections
99+
Statistics Avg Stdev Max
100+
Reqs/sec 76810.21 1745.58 79262.00
101+
Latency 650.09µs 714.47µs 61.40ms
102+
Latency Distribution
103+
50% 305.78µs
104+
75% 422.25µs
105+
90% 518.34µs
106+
99% 616.42µs
107+
HTTP codes:
108+
1XX - 0, 2XX - 2305881, 3XX - 0, 4XX - 0, 5XX - 0
109+
others - 0
110+
Throughput: 769121.91/s
111+
```
112+
113+
Tests were conducted with 50 connections for 10 seconds and 500 connections for 30 seconds.
114+
The results indicate an increase in the average request rate and throughput, and a decrease in latency.
115+
116+
| Test Case | Requests/sec | Throughput | Latency |
117+
| --- | --- | --- | --- |
118+
| Without Acceleration (10s) | 77688.89 | 84447.90/s | 592.08µs |
119+
| With Acceleration (10s) | 81633.44 | 92676.69/s | 539.51µs |
120+
| Without Acceleration (30s) | 73050.03 | 632031.35/s | 791.10µs |
121+
| With Acceleration (30s) | 76810.21 | 769121.91/s | 650.09µs |

agent/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ ws = ["kube/ws"]
1414
sinabro-config = { path = "../config" }
1515

1616
axum = "0.7.2"
17-
aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", features = [
18-
"async_tokio",
19-
] }
20-
aya-log = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
17+
aya = { version = "0.12", features = ["async_tokio"] }
18+
aya-log = "0.2"
2119
clap = { version = "4.1", features = ["derive"] }
2220
common = { path = "../common", features = ["user"] }
2321
anyhow = "1"

agent/src/bpf_loader.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use std::net::Ipv4Addr;
22

33
use anyhow::Result;
4-
use aya::maps::HashMap;
5-
use aya::programs::{tc, SchedClassifier, TcAttachType};
4+
use aya::maps::{HashMap, SockHash};
5+
use aya::programs::{tc, SchedClassifier, SkMsg, SockOps, TcAttachType};
66
use aya::{include_bytes_aligned, Bpf};
7-
use common::{NetworkInfo, CLUSTER_CIDR_KEY, HOST_IP_KEY};
7+
use common::{NetworkInfo, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY};
88

99
pub struct BpfLoader {
1010
pub bpf: Bpf,
1111
iface: String,
12+
cgroup_path: String,
1213
}
1314

1415
impl BpfLoader {
15-
pub fn load(iface: &str) -> Result<Self> {
16+
pub fn load(iface: &str, cgroup_path: &str) -> Result<Self> {
1617
#[cfg(debug_assertions)]
1718
let bpf = Bpf::load(include_bytes_aligned!(
1819
"../../target/bpfel-unknown-none/debug/ebpf"
@@ -25,6 +26,7 @@ impl BpfLoader {
2526
Ok(Self {
2627
bpf,
2728
iface: iface.to_string(),
29+
cgroup_path: cgroup_path.to_string(),
2830
})
2931
}
3032

@@ -75,6 +77,20 @@ impl BpfLoader {
7577
.expect("failed to insert node ip");
7678
});
7779

80+
let tcp_accelerate: &mut SockOps =
81+
self.bpf.program_mut("tcp_accelerate").unwrap().try_into()?;
82+
let cgroup = std::fs::File::open(&self.cgroup_path)?;
83+
tcp_accelerate.load()?;
84+
tcp_accelerate.attach(cgroup)?;
85+
86+
let sock_ops_map: SockHash<_, SockKey> =
87+
self.bpf.map("SOCK_OPS_MAP").unwrap().try_into()?;
88+
let map_fd = sock_ops_map.fd().try_clone()?;
89+
90+
let tcp_bypass: &mut SkMsg = self.bpf.program_mut("tcp_bypass").unwrap().try_into()?;
91+
tcp_bypass.load()?;
92+
tcp_bypass.attach(&map_fd)?;
93+
7894
Ok(())
7995
}
8096
}

agent/src/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use crate::netlink::Netlink;
2424
struct Opt {
2525
#[clap(short, long, default_value = "eth0")]
2626
iface: String,
27+
28+
#[clap(short, long, default_value = "/sys/fs/cgroup")]
29+
cgroup_path: String,
2730
}
2831

2932
#[tokio::main]
@@ -42,7 +45,7 @@ async fn main() -> Result<()> {
4245
setup_cni_config(&cluster_cidr, &host_route.pod_cidr)?;
4346
setup_network(&host_ip, host_route, &node_routes)?;
4447

45-
let mut bpf_loader = BpfLoader::load(&opt.iface)?;
48+
let mut bpf_loader = BpfLoader::load(&opt.iface, &opt.cgroup_path)?;
4649
BpfLogger::init(&mut bpf_loader.bpf)?;
4750

4851
bpf_loader

common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ default = []
88
user = ["aya"]
99

1010
[dependencies]
11-
aya = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f", optional = true }
11+
aya = { version = "0.12", optional = true }
1212

1313
[lib]
1414
path = "src/lib.rs"

common/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,16 @@ pub struct NetworkInfo {
3535

3636
#[cfg(feature = "user")]
3737
unsafe impl aya::Pod for NetworkInfo {}
38+
39+
#[repr(C)]
40+
#[derive(Clone, Copy)]
41+
pub struct SockKey {
42+
pub src_ip: u32,
43+
pub dst_ip: u32,
44+
pub src_port: u32,
45+
pub dst_port: u32,
46+
pub family: u32,
47+
}
48+
49+
#[cfg(feature = "user")]
50+
unsafe impl aya::Pod for SockKey {}

ebpf/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ version = "0.1.0"
44
edition = "2021"
55

66
[dependencies]
7-
aya-bpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
8-
aya-log-ebpf = { git = "https://github.com/aya-rs/aya", rev = "1979da92a722bacd9c984865a4c7108e22fb618f" }
7+
aya-ebpf = "0.1.0"
8+
aya-log-ebpf = "0.1.0"
99
common = { path = "../common" }
1010
network-types = "0.0.5"
1111
memoffset = "0.9"

ebpf/src/main.rs

Lines changed: 125 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,31 @@
33

44
use core::mem;
55

6-
use aya_bpf::{
7-
bindings::{BPF_F_PSEUDO_HDR, TC_ACT_PIPE, TC_ACT_SHOT},
6+
use aya_ebpf::bindings::sk_action::SK_PASS;
7+
use aya_ebpf::bindings::{
8+
sk_msg_md, BPF_ANY, BPF_F_INGRESS, BPF_F_PSEUDO_HDR, BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB,
9+
BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB, BPF_SOCK_OPS_STATE_CB_FLAG, TC_ACT_PIPE, TC_ACT_SHOT,
10+
};
11+
use aya_ebpf::maps::SockHash;
12+
use aya_ebpf::{
813
cty::c_long,
914
helpers::{bpf_csum_diff, bpf_get_prandom_u32},
10-
macros::{classifier, map},
15+
macros::{classifier, map, sk_msg, sock_ops},
1116
maps::HashMap,
12-
programs::TcContext,
17+
programs::{SkMsgContext, SockOpsContext, TcContext},
1318
};
14-
use aya_log_ebpf::info;
15-
use common::{NatKey, NetworkInfo, OriginValue, CLUSTER_CIDR_KEY, HOST_IP_KEY};
19+
use aya_log_ebpf::{error, info};
20+
use common::{NatKey, NetworkInfo, OriginValue, SockKey, CLUSTER_CIDR_KEY, HOST_IP_KEY};
1621
use memoffset::offset_of;
1722
use network_types::{
1823
eth::{EthHdr, EtherType},
1924
ip::{IpProto, Ipv4Hdr},
2025
tcp::TcpHdr,
2126
};
2227

28+
#[map]
29+
pub static mut SOCK_OPS_MAP: SockHash<SockKey> = SockHash::with_max_entries(65535, 0);
30+
2331
#[map]
2432
static mut NET_CONFIG_MAP: HashMap<u8, NetworkInfo> = HashMap::with_max_entries(2, 0);
2533

@@ -275,6 +283,117 @@ fn is_node_ip(ip: u32) -> bool {
275283
unsafe { NODE_MAP.get(&ip).is_some() }
276284
}
277285

286+
#[sock_ops]
287+
pub fn tcp_accelerate(ctx: SockOpsContext) -> u32 {
288+
try_tcp_accelerate(ctx).unwrap_or(0)
289+
}
290+
291+
fn try_tcp_accelerate(ctx: SockOpsContext) -> Result<u32, ()> {
292+
let family = ctx.family();
293+
294+
// currently only support IPv4
295+
if family != 2 {
296+
return Ok(0);
297+
}
298+
299+
match ctx.op() {
300+
BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB | BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB => {
301+
// info!(
302+
// &ctx,
303+
// "<<< ipv4 op = {}, src {:i}:{} => dst {:i}:{}",
304+
// ctx.op(),
305+
// u32::from_be(ctx.local_ip4()),
306+
// ctx.local_port(),
307+
// u32::from_be(ctx.remote_ip4()),
308+
// u32::from_be(ctx.remote_port())
309+
// );
310+
311+
let mut sock_key = extract_sock_key_from(&ctx);
312+
313+
unsafe {
314+
SOCK_OPS_MAP
315+
.update(&mut sock_key, &mut *ctx.ops, BPF_ANY.into())
316+
.map_err(|e| {
317+
error!(&ctx, "failed to update SOCK_OPS_MAP: {}", e);
318+
})?;
319+
}
320+
321+
ctx.set_cb_flags(BPF_SOCK_OPS_STATE_CB_FLAG as i32)
322+
.map_err(|e| {
323+
error!(&ctx, "failed to set BPF_SOCK_OPS_STATE_CB_FLAG: {}", e);
324+
})?;
325+
}
326+
// BPF_SOCK_OPS_STATE_CB => match ctx.arg(1) {
327+
// BPF_TCP_CLOSE | BPF_TCP_CLOSE_WAIT | BPF_TCP_LAST_ACK => {
328+
// // info!(
329+
// // &ctx,
330+
// // ">>> ipv4 op = {}, src {:i}:{} => dst {:i}:{}, state: {}",
331+
// // ctx.op(),
332+
// // u32::from_be(ctx.local_ip4()),
333+
// // ctx.local_port(),
334+
// // u32::from_be(ctx.remote_ip4()),
335+
// // u32::from_be(ctx.remote_port()),
336+
// // ctx.arg(1)
337+
// // );
338+
// }
339+
// _ => {}
340+
// },
341+
_ => {}
342+
}
343+
344+
Ok(0)
345+
}
346+
347+
fn extract_sock_key_from(ctx: &SockOpsContext) -> SockKey {
348+
SockKey {
349+
src_ip: u32::from_be(ctx.local_ip4()),
350+
dst_ip: u32::from_be(ctx.remote_ip4()),
351+
src_port: ctx.local_port(),
352+
dst_port: u32::from_be(ctx.remote_port()),
353+
family: ctx.family(),
354+
}
355+
}
356+
357+
#[sk_msg]
358+
pub fn tcp_bypass(ctx: SkMsgContext) -> u32 {
359+
try_tcp_bypass(ctx).unwrap_or(SK_PASS)
360+
}
361+
362+
fn try_tcp_bypass(ctx: SkMsgContext) -> Result<u32, ()> {
363+
// info!(&ctx, "received a message on the socket");
364+
365+
let msg = unsafe { &*ctx.msg };
366+
367+
if msg.family != 2 {
368+
return Ok(SK_PASS);
369+
}
370+
371+
let mut sock_key = sk_msg_extract_key(msg);
372+
373+
unsafe { SOCK_OPS_MAP.redirect_msg(&ctx, &mut sock_key, BPF_F_INGRESS as u64) };
374+
// info!(
375+
// &ctx,
376+
// "tcp_bypass: {:i}:{} <-> {:i}:{} / ret: {}",
377+
// sock_key.src_ip,
378+
// sock_key.src_port,
379+
// sock_key.dst_ip,
380+
// sock_key.dst_port,
381+
// ret
382+
// );
383+
384+
Ok(SK_PASS)
385+
}
386+
387+
fn sk_msg_extract_key(msg: &sk_msg_md) -> SockKey {
388+
SockKey {
389+
src_ip: u32::from_be(msg.remote_ip4),
390+
dst_ip: u32::from_be(msg.local_ip4),
391+
src_port: u32::from_be(msg.remote_port),
392+
dst_port: msg.local_port,
393+
family: msg.family,
394+
}
395+
}
396+
278397
#[panic_handler]
279398
fn panic(_info: &core::panic::PanicInfo) -> ! {
280399
unsafe { core::hint::unreachable_unchecked() }

justfile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ setup-kind-cluster: build-image
77
kind create cluster --config tests/e2e/kind-config.yaml
88
kind load docker-image sinabro:test
99

10-
clean-kind-cluster:
10+
delete-kind-cluster:
1111
kind delete cluster
1212

13-
deploy-agent: setup-kind-cluster
13+
create-kind-cluster-with-sinabro: setup-kind-cluster
1414
kubectl apply -f tests/e2e/deploy-test/agent.yaml
1515

1616
deploy-test-pods:
@@ -26,4 +26,7 @@ e2e-test: build-image
2626
kubectl kuttl test --config ./tests/kuttl-test.yaml
2727

2828
launch-rust-env:
29-
docker run --rm --privileged -it -v $(pwd):/source rust sh
29+
docker run --rm --privileged -it -v $(pwd):/source rust sh
30+
31+
run-rsb:
32+
kubectl run rsb --image gamelife1314/rsb -- -d 30 -c 500 -l http://$(kubectl get pod nginx-worker -o jsonpath='{.status.podIP}')

0 commit comments

Comments
 (0)