Skip to content

Commit c06ba00

Browse files
authored
Merge pull request #4 from ZLMediaKit/master
合并最新代码
2 parents 4cdd46b + a1f39d5 commit c06ba00

21 files changed

+861
-307
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ else()
4545
endif()
4646

4747

48-
set(ENABLE_OPENSSL true)
49-
set(ENABLE_MYSQL true)
48+
set(ENABLE_OPENSSL ON CACHE BOOL "enable openssl")
49+
set(ENABLE_MYSQL ON CACHE BOOL "enable mysql")
5050

5151

5252
#查找openssl是否安装

src/Network/Buffer.cpp

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -71,35 +71,38 @@ ssize_t BufferList::send_l(int fd, int flags,bool udp) {
7171
ssize_t n;
7272
do {
7373
struct msghdr msg;
74-
if(!udp){
75-
msg.msg_name = NULL;
74+
if (!udp) {
75+
msg.msg_name = NULL;
7676
msg.msg_namelen = 0;
77-
}else{
78-
BufferSock *buffer = static_cast<BufferSock *>(_pkt_list.front().get());
77+
} else {
78+
auto &buffer = _pkt_list.front();
7979
msg.msg_name = buffer->_addr;
8080
msg.msg_namelen = buffer->_addr_len;
8181
}
8282

8383
msg.msg_iov = &(_iovec[_iovec_off]);
84-
msg.msg_iovlen = (decltype(msg.msg_iovlen))(_iovec.size() - _iovec_off);
84+
msg.msg_iovlen = (decltype(msg.msg_iovlen)) (_iovec.size() - _iovec_off);
8585
size_t max = udp ? 1 : IOV_MAX;
86-
if(msg.msg_iovlen > max){
86+
if (msg.msg_iovlen > max) {
8787
msg.msg_iovlen = max;
8888
}
8989
msg.msg_control = NULL;
9090
msg.msg_controllen = 0;
9191
msg.msg_flags = flags;
92-
n = sendmsg(fd,&msg,flags);
92+
n = sendmsg(fd, &msg, flags);
9393
} while (-1 == n && UV_EINTR == get_uv_error(true));
9494

95-
if(n >= (ssize_t)_remainSize){
95+
if (n >= (ssize_t) _remainSize) {
9696
//全部写完了
9797
_iovec_off = _iovec.size();
9898
_remainSize = 0;
99+
_pkt_list.for_each([](BufferSock::Ptr &buffer){
100+
buffer->onSendSuccess();
101+
});
99102
return n;
100103
}
101104

102-
if(n > 0){
105+
if (n > 0) {
103106
//部分发送成功
104107
reOffset(n);
105108
return n;
@@ -111,10 +114,10 @@ ssize_t BufferList::send_l(int fd, int flags,bool udp) {
111114

112115
ssize_t BufferList::send(int fd, int flags, bool udp) {
113116
auto remainSize = _remainSize;
114-
while (_remainSize && send_l(fd,flags,udp) != -1);
117+
while (_remainSize && send_l(fd, flags, udp) != -1);
115118

116119
ssize_t sent = remainSize - _remainSize;
117-
if(sent > 0){
120+
if (sent > 0) {
118121
//部分或全部发送成功
119122
return sent;
120123
}
@@ -126,52 +129,60 @@ void BufferList::reOffset(size_t n) {
126129
_remainSize -= n;
127130
size_t offset = 0;
128131
auto last_off = _iovec_off;
129-
for(auto i = _iovec_off ; i != _iovec.size() ; ++i ){
132+
for (auto i = _iovec_off; i != _iovec.size(); ++i) {
130133
auto &ref = _iovec[i];
131134
offset += ref.iov_len;
132-
if(offset < n){
135+
if (offset < n) {
133136
continue;
134137
}
135138
ssize_t remain = offset - n;
136-
ref.iov_base = (char *)ref.iov_base + ref.iov_len - remain;
137-
ref.iov_len = (decltype(ref.iov_len))remain;
139+
ref.iov_base = (char *) ref.iov_base + ref.iov_len - remain;
140+
ref.iov_len = (decltype(ref.iov_len)) remain;
138141
_iovec_off = i;
139-
if(remain == 0){
142+
if (remain == 0) {
140143
_iovec_off += 1;
141144
}
142145
break;
143146
}
144147

145148
//删除已经发送的数据,节省内存
146-
for (auto i = last_off ; i < _iovec_off ; ++i){
149+
for (auto i = last_off; i < _iovec_off; ++i) {
150+
auto &front = _pkt_list.front();
151+
front->onSendSuccess();
147152
_pkt_list.pop_front();
148153
}
149154
}
150155

151-
BufferList::BufferList(List<Buffer::Ptr> &list) : _iovec(list.size()) {
156+
BufferList::BufferList(List<BufferSock::Ptr> &list) : _iovec(list.size()) {
152157
_pkt_list.swap(list);
153158
auto it = _iovec.begin();
154-
_pkt_list.for_each([&](Buffer::Ptr &buffer){
159+
_pkt_list.for_each([&](BufferSock::Ptr &buffer) {
155160
it->iov_base = buffer->data();
156-
it->iov_len = (decltype(it->iov_len))buffer->size();
161+
it->iov_len = (decltype(it->iov_len)) buffer->size();
157162
_remainSize += it->iov_len;
158163
++it;
159164
});
160165
}
161166

162-
BufferSock::BufferSock(Buffer::Ptr buffer,struct sockaddr *addr, int addr_len){
163-
if(addr && addr_len){
164-
_addr = (struct sockaddr *)malloc(addr_len);
165-
memcpy(_addr,addr,addr_len);
167+
BufferSock::BufferSock(Buffer::Ptr buffer, struct sockaddr *addr, int addr_len, onResult cb) {
168+
if (addr && addr_len) {
169+
_addr = (struct sockaddr *) malloc(addr_len);
170+
memcpy(_addr, addr, addr_len);
166171
_addr_len = addr_len;
167172
}
173+
assert(buffer);
168174
_buffer = std::move(buffer);
175+
_result = std::move(cb);
169176
}
170177

171178
BufferSock::~BufferSock(){
172-
if(_addr){
179+
if (_addr) {
173180
free(_addr);
174181
}
182+
if (_result) {
183+
_result(0);
184+
_result = nullptr;
185+
}
175186
}
176187

177188
char *BufferSock::data() const {
@@ -182,4 +193,15 @@ size_t BufferSock::size() const {
182193
return _buffer->size();
183194
}
184195

196+
void BufferSock::onSendSuccess() {
197+
if (_result) {
198+
_result(size());
199+
_result = nullptr;
200+
}
201+
}
202+
203+
void BufferSock::setSendResult(onResult cb){
204+
_result = std::move(cb);
205+
}
206+
185207
}//namespace toolkit

src/Network/Buffer.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,25 +463,31 @@ struct msghdr {
463463
#endif
464464

465465
class BufferList;
466-
class BufferSock : public Buffer{
466+
class BufferSock : public Buffer {
467467
public:
468-
typedef std::shared_ptr<BufferSock> Ptr;
468+
using Ptr = std::shared_ptr<BufferSock>;
469+
using onResult = function<void(size_t size)>;
469470
friend class BufferList;
470-
BufferSock(Buffer::Ptr ptr,struct sockaddr *addr = nullptr, int addr_len = 0);
471+
472+
BufferSock(Buffer::Ptr ptr, struct sockaddr *addr = nullptr, int addr_len = 0, onResult cb = nullptr);
471473
~BufferSock();
472-
char *data() const override ;
474+
475+
char *data() const override;
473476
size_t size() const override;
477+
void setSendResult(onResult cb);
478+
void onSendSuccess();
474479

475480
private:
476481
int _addr_len = 0;
477482
struct sockaddr *_addr = nullptr;
478483
Buffer::Ptr _buffer;
484+
onResult _result;
479485
};
480486

481487
class BufferList : public noncopyable {
482488
public:
483489
typedef std::shared_ptr<BufferList> Ptr;
484-
BufferList(List<Buffer::Ptr> &list);
490+
BufferList(List<BufferSock::Ptr> &list);
485491
~BufferList() {}
486492

487493
bool empty();
@@ -496,7 +502,7 @@ class BufferList : public noncopyable {
496502
size_t _iovec_off = 0;
497503
size_t _remainSize = 0;
498504
vector<struct iovec> _iovec;
499-
List<Buffer::Ptr> _pkt_list;
505+
List<BufferSock::Ptr> _pkt_list;
500506
//对象个数统计
501507
ObjectStatistic<BufferList> _statistic;
502508
};

src/Network/Server.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
3+
*
4+
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
5+
*
6+
* Use of this source code is governed by MIT license that can be found in the
7+
* LICENSE file in the root of the source tree. All contributing project authors
8+
* may be found in the AUTHORS file in the root of the source tree.
9+
*/
10+
11+
#include "Network/Server.h"
12+
13+
namespace toolkit {
14+
15+
Server::Server(EventPoller::Ptr poller) {
16+
_poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller();
17+
}
18+
19+
Server::~Server() {}
20+
21+
} // namespace toolkit

src/Network/Server.h

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
3+
*
4+
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
5+
*
6+
* Use of this source code is governed by MIT license that can be found in the
7+
* LICENSE file in the root of the source tree. All contributing project authors
8+
* may be found in the AUTHORS file in the root of the source tree.
9+
*/
10+
11+
#ifndef ZLTOOLKIT_SERVER_H
12+
#define ZLTOOLKIT_SERVER_H
13+
14+
#include <unordered_map>
15+
16+
#include "Network/Session.h"
17+
#include "Util/mini.h"
18+
19+
namespace toolkit {
20+
21+
// 全局的 Session 记录对象, 方便后面管理
22+
// 线程安全的
23+
class SessionMap : public std::enable_shared_from_this<SessionMap> {
24+
public:
25+
friend class SessionHelper;
26+
typedef std::shared_ptr<SessionMap> Ptr;
27+
28+
//单例
29+
static SessionMap &Instance();
30+
~SessionMap() {};
31+
32+
//获取Session
33+
Session::Ptr get(const string &tag) {
34+
lock_guard<mutex> lck(_mtx_session);
35+
auto it = _map_session.find(tag);
36+
if (it == _map_session.end()) {
37+
return nullptr;
38+
}
39+
return it->second.lock();
40+
}
41+
42+
void for_each_session(const function<void(const string &id, const Session::Ptr &session)> &cb) {
43+
lock_guard<mutex> lck(_mtx_session);
44+
for (auto it = _map_session.begin(); it != _map_session.end();) {
45+
auto session = it->second.lock();
46+
if (!session) {
47+
it = _map_session.erase(it);
48+
continue;
49+
}
50+
cb(it->first, session);
51+
++it;
52+
}
53+
}
54+
55+
private:
56+
SessionMap() {};
57+
58+
//添加Session
59+
bool add(const string &tag, const Session::Ptr &session) {
60+
lock_guard<mutex> lck(_mtx_session);
61+
return _map_session.emplace(tag, session).second;
62+
}
63+
64+
//移除Session
65+
bool del(const string &tag) {
66+
lock_guard<mutex> lck(_mtx_session);
67+
return _map_session.erase(tag);
68+
}
69+
70+
private:
71+
mutex _mtx_session;
72+
unordered_map<string, weak_ptr<Session> > _map_session;
73+
};
74+
75+
class Server;
76+
class SessionHelper {
77+
public:
78+
typedef std::shared_ptr<SessionHelper> Ptr;
79+
80+
SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session) {
81+
_server = server;
82+
_session = std::move(session);
83+
//记录session至全局的map,方便后面管理
84+
_session_map = SessionMap::Instance().shared_from_this();
85+
_identifier = _session->getIdentifier();
86+
_session_map->add(_identifier, _session);
87+
}
88+
89+
~SessionHelper(){
90+
if (!_server.lock()) {
91+
//务必通知TcpSession已从TcpServer脱离
92+
_session->onError(SockException(Err_other, "Server shutdown!"));
93+
}
94+
//从全局map移除相关记录
95+
_session_map->del(_identifier);
96+
}
97+
98+
const Session::Ptr &session() const { return _session; }
99+
100+
private:
101+
string _identifier;
102+
Session::Ptr _session;
103+
SessionMap::Ptr _session_map;
104+
std::weak_ptr<Server> _server;
105+
};
106+
107+
108+
//
109+
// server 基类, 暂时仅用于剥离 SessionHelper 对 TcpServer 的依赖
110+
// 后续将 TCP 与 UDP 服务通用部分加到这里.
111+
//
112+
class Server : public std::enable_shared_from_this<Server>, public mINI {
113+
public:
114+
typedef std::shared_ptr<Server> Ptr;
115+
116+
explicit Server(EventPoller::Ptr poller = nullptr);
117+
virtual ~Server();
118+
119+
protected:
120+
EventPoller::Ptr _poller;
121+
};
122+
123+
} // namespace toolkit
124+
125+
#endif // ZLTOOLKIT_SERVER_H

src/Network/Session.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
3+
*
4+
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
5+
*
6+
* Use of this source code is governed by MIT license that can be found in the
7+
* LICENSE file in the root of the source tree. All contributing project authors
8+
* may be found in the AUTHORS file in the root of the source tree.
9+
*/
10+
11+
#include "Network/Session.h"
12+
13+
namespace toolkit {
14+
15+
Session::Session(const Socket::Ptr &sock) : SocketHelper(sock) {
16+
}
17+
18+
Session::~Session() = default;
19+
20+
string Session::getIdentifier() const{
21+
return std::to_string(reinterpret_cast<uint64_t>(this));
22+
}
23+
24+
void Session::safeShutdown(const SockException &ex) {
25+
std::weak_ptr<Session> weakSelf = shared_from_this();
26+
async_first([weakSelf,ex](){
27+
auto strongSelf = weakSelf.lock();
28+
if (strongSelf) {
29+
strongSelf->shutdown(ex);
30+
}
31+
});
32+
}
33+
34+
StatisticImp(Session)
35+
StatisticImp(UdpSession)
36+
StatisticImp(TcpSession)
37+
38+
} // namespace toolkit

0 commit comments

Comments
 (0)