Skip to content

合并最新代码 #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
97b90d2
Update CMakeLists.txt
musicwood May 8, 2021
3208294
Merge pull request #55 from musicwood/patch-1
xia-chu May 8, 2021
b04d132
修复编译警告
xia-chu May 12, 2021
b678e47
Merge branch 'master' of https://github.com/xia-chu/ZLToolKit
xia-chu May 12, 2021
ca4985c
修复Windows下获取errno失败的bug:https://github.com/ZLMediaKit/ZLMediaKit/issu…
xia-chu May 26, 2021
4db5673
Socket.cpp未添加bom头
baiyfcu May 27, 2021
b716c03
windows FD_SETSIZE设置未生效问题
baiyfcu May 28, 2021
7aed59e
删除注释内容
baiyfcu May 28, 2021
46b7d1c
修复指定logger对象失败的问题
xia-chu Jun 3, 2021
52afb31
设置 SO_REUSEPORT
Jun 3, 2021
c1cbab6
为 UDP socket 添加 connect 及 dissolve
Jun 3, 2021
7bde486
剥离 Session 及 Server 对 TCP 的依赖
Jun 3, 2021
751488c
添加 UDP Server
Jun 3, 2021
23b57d6
调整部分接口
Jun 4, 2021
790918e
Merge branch 'feature/udp' of https://github.com/wasphin/ZLToolKit
xia-chu Jun 4, 2021
dae306d
合并pr:https://github.com/ZLMediaKit/ZLToolKit/pull/58
xia-chu Jun 4, 2021
4d7305f
优化
xia-chu Jun 4, 2021
39a610e
bug fixed
xia-chu Jun 7, 2021
9acbc5c
修改api名
xia-chu Jun 7, 2021
ad2bcb4
完善udp 服务器
xia-chu Jun 7, 2021
95a4851
dissolveUdpSock函数完善错误提示
xia-chu Jun 7, 2021
58d0e6a
提高udp session超时管理性能
xia-chu Jun 7, 2021
0c37146
确保UdpSession构建时socket有效
xia-chu Jun 8, 2021
8a626d3
修复TcpServer不能创建非TcpSession类实例的bug
xia-chu Jun 8, 2021
3b33327
修复Windows下编译问题
xia-chu Jun 8, 2021
2dac451
新增写入socket结果回调功能
xia-chu Jun 9, 2021
a1f39d5
修复全部发送完毕时不触发回调的bug
xia-chu Jun 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ else()
endif()


set(ENABLE_OPENSSL true)
set(ENABLE_MYSQL true)
set(ENABLE_OPENSSL ON CACHE BOOL "enable openssl")
set(ENABLE_MYSQL ON CACHE BOOL "enable mysql")


#查找openssl是否安装
Expand Down
72 changes: 47 additions & 25 deletions src/Network/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,35 +71,38 @@ ssize_t BufferList::send_l(int fd, int flags,bool udp) {
ssize_t n;
do {
struct msghdr msg;
if(!udp){
msg.msg_name = NULL;
if (!udp) {
msg.msg_name = NULL;
msg.msg_namelen = 0;
}else{
BufferSock *buffer = static_cast<BufferSock *>(_pkt_list.front().get());
} else {
auto &buffer = _pkt_list.front();
msg.msg_name = buffer->_addr;
msg.msg_namelen = buffer->_addr_len;
}

msg.msg_iov = &(_iovec[_iovec_off]);
msg.msg_iovlen = (decltype(msg.msg_iovlen))(_iovec.size() - _iovec_off);
msg.msg_iovlen = (decltype(msg.msg_iovlen)) (_iovec.size() - _iovec_off);
size_t max = udp ? 1 : IOV_MAX;
if(msg.msg_iovlen > max){
if (msg.msg_iovlen > max) {
msg.msg_iovlen = max;
}
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = flags;
n = sendmsg(fd,&msg,flags);
n = sendmsg(fd, &msg, flags);
} while (-1 == n && UV_EINTR == get_uv_error(true));

if(n >= (ssize_t)_remainSize){
if (n >= (ssize_t) _remainSize) {
//全部写完了
_iovec_off = _iovec.size();
_remainSize = 0;
_pkt_list.for_each([](BufferSock::Ptr &buffer){
buffer->onSendSuccess();
});
return n;
}

if(n > 0){
if (n > 0) {
//部分发送成功
reOffset(n);
return n;
Expand All @@ -111,10 +114,10 @@ ssize_t BufferList::send_l(int fd, int flags,bool udp) {

ssize_t BufferList::send(int fd, int flags, bool udp) {
auto remainSize = _remainSize;
while (_remainSize && send_l(fd,flags,udp) != -1);
while (_remainSize && send_l(fd, flags, udp) != -1);

ssize_t sent = remainSize - _remainSize;
if(sent > 0){
if (sent > 0) {
//部分或全部发送成功
return sent;
}
Expand All @@ -126,52 +129,60 @@ void BufferList::reOffset(size_t n) {
_remainSize -= n;
size_t offset = 0;
auto last_off = _iovec_off;
for(auto i = _iovec_off ; i != _iovec.size() ; ++i ){
for (auto i = _iovec_off; i != _iovec.size(); ++i) {
auto &ref = _iovec[i];
offset += ref.iov_len;
if(offset < n){
if (offset < n) {
continue;
}
ssize_t remain = offset - n;
ref.iov_base = (char *)ref.iov_base + ref.iov_len - remain;
ref.iov_len = (decltype(ref.iov_len))remain;
ref.iov_base = (char *) ref.iov_base + ref.iov_len - remain;
ref.iov_len = (decltype(ref.iov_len)) remain;
_iovec_off = i;
if(remain == 0){
if (remain == 0) {
_iovec_off += 1;
}
break;
}

//删除已经发送的数据,节省内存
for (auto i = last_off ; i < _iovec_off ; ++i){
for (auto i = last_off; i < _iovec_off; ++i) {
auto &front = _pkt_list.front();
front->onSendSuccess();
_pkt_list.pop_front();
}
}

BufferList::BufferList(List<Buffer::Ptr> &list) : _iovec(list.size()) {
BufferList::BufferList(List<BufferSock::Ptr> &list) : _iovec(list.size()) {
_pkt_list.swap(list);
auto it = _iovec.begin();
_pkt_list.for_each([&](Buffer::Ptr &buffer){
_pkt_list.for_each([&](BufferSock::Ptr &buffer) {
it->iov_base = buffer->data();
it->iov_len = (decltype(it->iov_len))buffer->size();
it->iov_len = (decltype(it->iov_len)) buffer->size();
_remainSize += it->iov_len;
++it;
});
}

BufferSock::BufferSock(Buffer::Ptr buffer,struct sockaddr *addr, int addr_len){
if(addr && addr_len){
_addr = (struct sockaddr *)malloc(addr_len);
memcpy(_addr,addr,addr_len);
BufferSock::BufferSock(Buffer::Ptr buffer, struct sockaddr *addr, int addr_len, onResult cb) {
if (addr && addr_len) {
_addr = (struct sockaddr *) malloc(addr_len);
memcpy(_addr, addr, addr_len);
_addr_len = addr_len;
}
assert(buffer);
_buffer = std::move(buffer);
_result = std::move(cb);
}

BufferSock::~BufferSock(){
if(_addr){
if (_addr) {
free(_addr);
}
if (_result) {
_result(0);
_result = nullptr;
}
}

char *BufferSock::data() const {
Expand All @@ -182,4 +193,15 @@ size_t BufferSock::size() const {
return _buffer->size();
}

void BufferSock::onSendSuccess() {
if (_result) {
_result(size());
_result = nullptr;
}
}

void BufferSock::setSendResult(onResult cb){
_result = std::move(cb);
}

}//namespace toolkit
18 changes: 12 additions & 6 deletions src/Network/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,25 +463,31 @@ struct msghdr {
#endif

class BufferList;
class BufferSock : public Buffer{
class BufferSock : public Buffer {
public:
typedef std::shared_ptr<BufferSock> Ptr;
using Ptr = std::shared_ptr<BufferSock>;
using onResult = function<void(size_t size)>;
friend class BufferList;
BufferSock(Buffer::Ptr ptr,struct sockaddr *addr = nullptr, int addr_len = 0);

BufferSock(Buffer::Ptr ptr, struct sockaddr *addr = nullptr, int addr_len = 0, onResult cb = nullptr);
~BufferSock();
char *data() const override ;

char *data() const override;
size_t size() const override;
void setSendResult(onResult cb);
void onSendSuccess();

private:
int _addr_len = 0;
struct sockaddr *_addr = nullptr;
Buffer::Ptr _buffer;
onResult _result;
};

class BufferList : public noncopyable {
public:
typedef std::shared_ptr<BufferList> Ptr;
BufferList(List<Buffer::Ptr> &list);
BufferList(List<BufferSock::Ptr> &list);
~BufferList() {}

bool empty();
Expand All @@ -496,7 +502,7 @@ class BufferList : public noncopyable {
size_t _iovec_off = 0;
size_t _remainSize = 0;
vector<struct iovec> _iovec;
List<Buffer::Ptr> _pkt_list;
List<BufferSock::Ptr> _pkt_list;
//对象个数统计
ObjectStatistic<BufferList> _statistic;
};
Expand Down
21 changes: 21 additions & 0 deletions src/Network/Server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/

#include "Network/Server.h"

namespace toolkit {

Server::Server(EventPoller::Ptr poller) {
_poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller();
}

Server::~Server() {}

} // namespace toolkit
125 changes: 125 additions & 0 deletions src/Network/Server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/

#ifndef ZLTOOLKIT_SERVER_H
#define ZLTOOLKIT_SERVER_H

#include <unordered_map>

#include "Network/Session.h"
#include "Util/mini.h"

namespace toolkit {

// 全局的 Session 记录对象, 方便后面管理
// 线程安全的
class SessionMap : public std::enable_shared_from_this<SessionMap> {
public:
friend class SessionHelper;
typedef std::shared_ptr<SessionMap> Ptr;

//单例
static SessionMap &Instance();
~SessionMap() {};

//获取Session
Session::Ptr get(const string &tag) {
lock_guard<mutex> lck(_mtx_session);
auto it = _map_session.find(tag);
if (it == _map_session.end()) {
return nullptr;
}
return it->second.lock();
}

void for_each_session(const function<void(const string &id, const Session::Ptr &session)> &cb) {
lock_guard<mutex> lck(_mtx_session);
for (auto it = _map_session.begin(); it != _map_session.end();) {
auto session = it->second.lock();
if (!session) {
it = _map_session.erase(it);
continue;
}
cb(it->first, session);
++it;
}
}

private:
SessionMap() {};

//添加Session
bool add(const string &tag, const Session::Ptr &session) {
lock_guard<mutex> lck(_mtx_session);
return _map_session.emplace(tag, session).second;
}

//移除Session
bool del(const string &tag) {
lock_guard<mutex> lck(_mtx_session);
return _map_session.erase(tag);
}

private:
mutex _mtx_session;
unordered_map<string, weak_ptr<Session> > _map_session;
};

class Server;
class SessionHelper {
public:
typedef std::shared_ptr<SessionHelper> Ptr;

SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session) {
_server = server;
_session = std::move(session);
//记录session至全局的map,方便后面管理
_session_map = SessionMap::Instance().shared_from_this();
_identifier = _session->getIdentifier();
_session_map->add(_identifier, _session);
}

~SessionHelper(){
if (!_server.lock()) {
//务必通知TcpSession已从TcpServer脱离
_session->onError(SockException(Err_other, "Server shutdown!"));
}
//从全局map移除相关记录
_session_map->del(_identifier);
}

const Session::Ptr &session() const { return _session; }

private:
string _identifier;
Session::Ptr _session;
SessionMap::Ptr _session_map;
std::weak_ptr<Server> _server;
};


//
// server 基类, 暂时仅用于剥离 SessionHelper 对 TcpServer 的依赖
// 后续将 TCP 与 UDP 服务通用部分加到这里.
//
class Server : public std::enable_shared_from_this<Server>, public mINI {
public:
typedef std::shared_ptr<Server> Ptr;

explicit Server(EventPoller::Ptr poller = nullptr);
virtual ~Server();

protected:
EventPoller::Ptr _poller;
};

} // namespace toolkit

#endif // ZLTOOLKIT_SERVER_H
38 changes: 38 additions & 0 deletions src/Network/Session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/

#include "Network/Session.h"

namespace toolkit {

Session::Session(const Socket::Ptr &sock) : SocketHelper(sock) {
}

Session::~Session() = default;

string Session::getIdentifier() const{
return std::to_string(reinterpret_cast<uint64_t>(this));
}

void Session::safeShutdown(const SockException &ex) {
std::weak_ptr<Session> weakSelf = shared_from_this();
async_first([weakSelf,ex](){
auto strongSelf = weakSelf.lock();
if (strongSelf) {
strongSelf->shutdown(ex);
}
});
}

StatisticImp(Session)
StatisticImp(UdpSession)
StatisticImp(TcpSession)

} // namespace toolkit
Loading