728x90
2011/11/09 - [프로그래밍/boost] - boost::asio 비동기 서버/클라 step 1
에 이어서 계속 됩니다
Client -> Recv -> Recv Queue -> Worker Thread -> Send Queue -> Send Callback -> Client
분홍색 까지의 부분을 소스 예제로 봅시다!
어디까지나 예제 차원으로 (원래 asio 예제도 그렇고) class 내부에 막 때려넣었으니 따라하거나 이상한
코딩하는 사람으로 오해하지마세요 ^^;
소스 퍼가실 때 소스코드 부분을 더블 클릭해서 전체 선택 후 카피가 좋습니다 드래그 하면 한줄로 복사되요
컴파일 관련은 이전 글을 참고해 주세요
새롭게 class 가 하나 추가 됐습니다. Worker 인데요 사용된 기술은
boost::lockfree
boost::function
boost::pool::pool
이 추가 됐네요.. Woker 는 여기서는 전역으로 해놨지만 실은 singleton 으로 구현하는게 좋겠죠 :)
Worker 는 1개로 해놨지만 멀티쓰레딩을 이용해서 다수의 Worker 로 만들고 메세지 분류에 따라서
처리하는 Worker 를 다르게하면 예제치고는 무서울 정도로 좋은 성능의 처리기가 완성됩니다
서버 예제
- Worker.h
#pragma once
#include <map>
#include <boost/asio.hpp>
#include <boost/pool/pool.hpp>
#include <boost/function.hpp>
#include <boost/lockfree/fifo.hpp>
typedef boost::function<void (BYTE*)> TYPE_SEND_CB;
struct SWorkData
{
int nID;
int nSize;
BYTE* pData;
};
class CWorker
{
private:
boost::pool<> m_Pool;
boost::lockfree::fifo<SWorkData> m_WorkList;
std::map<int, TYPE_SEND_CB> m_SendAdress;
enum EEnum
{
eBufSize = 128,
};
public:
void AdrInsert(int nID, TYPE_SEND_CB cb)
{
m_SendAdress.insert(std::map<int, TYPE_SEND_CB>::value_type(nID, cb));
}
bool SendToAdr(int nID, int nSize, BYTE* pData) // 받을 id, 보낼 data
{
std::map<int, TYPE_SEND_CB>::const_iterator it = m_SendAdress.find(nID);
if(it != m_SendAdress.end())
(it->second)(pData);
else
return false;
return true;
}
void AddWork(int nID, int nSize, BYTE* pData)
{
SWorkData temp;
temp.nID = nID;
temp.nSize = nSize;
temp.pData = (BYTE*)m_Pool.ordered_malloc(nSize);
std::copy(pData, pData+nSize, temp.pData);
m_WorkList.enqueue(temp);
}
bool GetWork(SWorkData &wd)
{
return m_WorkList.dequeue(wd);
}
void Worker()
{
while(true)
{
//std::cout << "Worker !!!" << std::endl;
SWorkData temp;
if(GetWork(temp))
{
if(temp.pData != NULL)
{
std::cout << "Dequeue " << temp.nID << "'s : " << (*(char*)(temp.pData)) << std::endl;
}
}
}
}
CWorker(void) : m_Pool(sizeof(eBufSize)) {};
virtual ~CWorker(void) {};
};
- 서버 메인
#include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <ctime>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include "Worker.h"
using boost::asio::ip::tcp;
CWorker g_Worker;
class CConnection : public boost::enable_shared_from_this<CConnection>
{
private:
tcp::socket m_Socket;
int m_nID;
enum EEnum
{
eBufSize = 128,
};
std::string m_sMsg;
boost::array<BYTE, eBufSize> m_RecvBuf;
CConnection(boost::asio::io_service& io) : m_Socket(io), m_nID(-1)
{
g_Worker.AdrInsert(m_nID, std::bind1st(std::mem_fun(&CConnection::SendTo), this));
}
// 날짜시간 메세지를 만든다.
std::string make_daytime_string()
{
time_t now = time(0);
return ctime(&now); // ctime_s 권장. 예제니까 그냥 쓰고 넘어가지만
}
void handle_Accept(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
{
// Recv 대기
m_Socket.async_read_some(boost::asio::buffer(m_RecvBuf),
boost::bind(&CConnection::handle_Read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_Write(const boost::system::error_code& error, size_t /*bytes_transferred*/)
{
}
void handle_Read(const boost::system::error_code& error, size_t /*bytes_transferred*/)
{
if(!error) // 0 이 성공 나머지는 오류 플러그
{
// 데이터 처리
if(m_RecvBuf.size())
{
std::cout << "Enqueue " << m_nID << "'s : " << (*(char*)(m_RecvBuf.data())) << std::endl;
g_Worker.AddWork(m_nID, m_RecvBuf.size(), m_RecvBuf.data());
m_RecvBuf.assign(NULL); // 버퍼 초기화
}
// Recv 대기
m_Socket.async_read_some(boost::asio::buffer(m_RecvBuf),
boost::bind(&CConnection::handle_Read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
std::cout << m_nID << " Disconnect(Write) : " << error.message() << std::endl;
}
void SendTo(BYTE *pData)
{
std::cout << m_nID << " SendTo" << std::endl;
}
public:
typedef boost::shared_ptr<CConnection> pointer;
static pointer create(boost::asio::io_service& io)
{
return pointer(new CConnection(io));
}
tcp::socket& socket()
{
return m_Socket;
}
void start(int nID)
{
std::cout << "new connect id : "<< nID << " ::: Welcome !" << std::endl;
m_nID = nID;
// 접속 기념으로 접속 시간 한번 보내주고
m_sMsg = make_daytime_string();
boost::asio::async_write(m_Socket, boost::asio::buffer(m_sMsg),
boost::bind(&CConnection::handle_Accept, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
};
class CTCP_Server
{
private:
tcp::acceptor m_Acceptor;
int m_nAcceptCnt;
void WaitAccept()
{
++m_nAcceptCnt;
CConnection::pointer new_connection =
CConnection::create(m_Acceptor.get_io_service());
m_Acceptor.async_accept(new_connection->socket(),
boost::bind(&CTCP_Server::handle_Accept, this, new_connection,
boost::asio::placeholders::error));
}
void handle_Accept(CConnection::pointer new_connection, const boost::system::error_code& error)
{
if (!error)
{
new_connection->start(m_nAcceptCnt);
WaitAccept();
}
}
public:
CTCP_Server(boost::asio::io_service& io) : m_Acceptor(io, tcp::endpoint(tcp::v4(), 13)), m_nAcceptCnt(0)
{
WaitAccept();
}
};
int _tmain()
{
try
{
boost::asio::io_service io;
CTCP_Server server(io);
boost::thread_group WorkerThread;
WorkerThread.create_thread(boost::bind(&CWorker::Worker, &g_Worker));
WorkerThread.add_thread(new boost::thread(boost::bind(&CWorker::Worker, &g_Worker)));
io.run();
WorkerThread.join_all();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
클라이언트 예제
#include "stdafx.h"
#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
const std::string _MY_IP("192.168.0.5"); // 접속할 서버 ip 로 바꿔줘야 합니다
using boost::asio::ip::tcp;
class CProtocol
{
private:
enum EEnum
{
eBufSize = 128,
};
tcp::socket m_Socket;
bool m_bConnect;
int m_nTestCount;
public:
CProtocol(boost::asio::io_service& io) : m_Socket(io)
{
m_bConnect = false;
m_nTestCount = 0;
}
~CProtocol() {}
void Connect()
{
// 입력 받은 host을 resolving한다.
tcp::resolver resolver(m_Socket.get_io_service());
tcp::resolver::query query(_MY_IP, "daytime");
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::resolver::iterator end;
// resolving된 endpoint로 접속을 시도한다.
boost::system::error_code error = boost::asio::error::host_not_found;
while (error && endpoint_iterator != end)
{
m_Socket.close();
m_Socket.connect(*endpoint_iterator++, error);
}
// 접속 실패인지 확인
if (error)
throw boost::system::system_error(error);
m_bConnect = true;
// 읽어올 데이터를 저장할 array를 만든다.
boost::array<CHAR, eBufSize> buf;
size_t len = m_Socket.read_some(boost::asio::buffer(buf), error);
if (error == boost::asio::error::eof)
{
m_bConnect = false;
return;
}
else if (error)
throw boost::system::system_error(error);
// 받은 데이터를 cout로 출력한다.
std::cout.write(buf.data(), len);
}
bool IsRun() { return m_bConnect; }
bool IsSocketOpen()
{
if(!m_Socket.is_open() && m_bConnect) // 커넥트 된 이후 소켓이 닫혀버렸다면
{
m_bConnect = false; // 커넥트도 끊김 판정
return false;
}
return true;
}
void handle_Recive()
{
}
void handle_Send()
{
while(m_bConnect)
{
if(!IsSocketOpen())
break;
try
{
boost::array<BYTE, eBufSize> buf = { boost::lexical_cast<BYTE>(m_nTestCount) };
boost::system::error_code error;
int len = boost::asio::write(m_Socket, boost::asio::buffer(buf, buf.size()), error);
if(len > 0)
std::cout << "> Send " << m_nTestCount << std::endl;
m_nTestCount++;
}
catch (std::exception& e)
{
m_bConnect = false;
std::cerr << e.what() << std::endl;
}
Sleep(3000);
}
}
};
int _tmain(int argc, char* argv[])
{
try
{
boost::asio::io_service io_service; // io_service를 하나 생성한다.
CProtocol Ptc(io_service);
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
Ptc.Connect(); // 접속 시도
boost::thread Recv(boost::bind(&CProtocol::handle_Recive, &Ptc));
boost::thread Send(boost::bind(&CProtocol::handle_Send, &Ptc));
io_service.run();
while(Ptc.IsRun())
{
}
Recv.join();
Send.join();
t.join(); // 쓰레드가 종료될 때까지 메인 함수의 종료를 막는다
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
int in;
std::cout << "END";
std::cin >> in ;
return 0;
}
728x90
'프로그래밍 > boost' 카테고리의 다른 글
| boost::shared_ptr 과 Virtual Function 과 boost::timer (0) | 2012.10.25 |
|---|---|
| boost::random (0) | 2012.10.24 |
| boost::asio 비동기 TCP 서버 / 클라 예제 (1) | 2012.09.03 |
| boost::Program_options (0) | 2012.05.14 |
| boost::multi_index (0) | 2012.03.30 |
댓글