본문 바로가기
프로그래밍/boost

boost::asio 비동기 서버/클라 step 2

by neive 2012. 9. 25.
728x90

 

2011/11/09 - [프로그래밍/boost] - boost::asio 비동기 서버/클라 step 1
에 이어서 계속 됩니다

Client -> Recv -> Recv Queue -> Worker Thread -> Send Queue -> Send Callback -> Client

분홍색 까지의 부분을 소스 예제로 봅시다!

어디까지나 예제 차원으로 (원래 asio 예제도 그렇고) class 내부에 막 때려넣었으니 따라하거나 이상한
코딩하는 사람으로 오해하지마세요 ^^;

소스 퍼가실 때 소스코드 부분을 더블 클릭해서 전체 선택 후 카피가 좋습니다 드래그 하면 한줄로 복사되요
컴파일 관련은 이전 글을 참고해 주세요

새롭게 class 가 하나 추가 됐습니다. Worker 인데요 사용된 기술은
boost::lockfree
boost::function
boost::pool::pool
이 추가 됐네요.. Woker 는 여기서는 전역으로 해놨지만 실은 singleton 으로 구현하는게 좋겠죠 :)

Worker 는 1개로 해놨지만 멀티쓰레딩을 이용해서 다수의 Worker 로 만들고 메세지 분류에 따라서

처리하는 Worker 를 다르게하면 예제치고는 무서울 정도로 좋은 성능의 처리기가 완성됩니다

 


서버 예제

- Worker.h

#pragma once

#include <map>
#include <boost/asio.hpp>
#include <boost/pool/pool.hpp>
#include <boost/function.hpp>
#include <boost/lockfree/fifo.hpp>
 
typedef boost::function<void (BYTE*)> TYPE_SEND_CB;

struct SWorkData
{
    int nID;
    int nSize;
    BYTE* pData;
};
 
class CWorker
{
private:
    boost::pool<> m_Pool;
    boost::lockfree::fifo<SWorkData> m_WorkList;
    std::map<int, TYPE_SEND_CB> m_SendAdress;
 
	enum EEnum
	{
		eBufSize = 128,
	};

public:
 
    void AdrInsert(int nID, TYPE_SEND_CB cb)
    {
        m_SendAdress.insert(std::map<int, TYPE_SEND_CB>::value_type(nID, cb));
    }
 
    bool SendToAdr(int nID, int nSize, BYTE* pData)     // 받을 id, 보낼 data
    {
        std::map<int, TYPE_SEND_CB>::const_iterator it = m_SendAdress.find(nID);
        if(it != m_SendAdress.end())
            (it->second)(pData);
        else
            return false;
 
        return true;
    }
 
    void AddWork(int nID, int nSize, BYTE* pData)
    {
        SWorkData temp;
        temp.nID = nID;
        temp.nSize = nSize;
        temp.pData = (BYTE*)m_Pool.ordered_malloc(nSize);
        std::copy(pData, pData+nSize, temp.pData);
        m_WorkList.enqueue(temp);
    }
 
    bool GetWork(SWorkData &wd)
    {
        return m_WorkList.dequeue(wd);
    }
 
    void Worker()
    {
        while(true)
        {
            //std::cout << "Worker !!!" << std::endl;
            SWorkData temp;
            if(GetWork(temp))
            {
                if(temp.pData != NULL)
                {
                    std::cout << "Dequeue " << temp.nID << "'s : " << (*(char*)(temp.pData)) << std::endl;
                }
            }
        }
    }
 
	CWorker(void) : m_Pool(sizeof(eBufSize)) {};
	virtual ~CWorker(void) {};
};

 

- 서버 메인

#include "stdafx.h"
 
 
#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <ctime>
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
 
#include "Worker.h"
 
using boost::asio::ip::tcp;
 
CWorker g_Worker;
 
class CConnection : public boost::enable_shared_from_this<CConnection>
{
private:
    tcp::socket m_Socket;
    int m_nID;

	enum EEnum
	{
		eBufSize = 128,
	};
 
    std::string m_sMsg;
    boost::array<BYTE, eBufSize> m_RecvBuf;
 
    CConnection(boost::asio::io_service& io) : m_Socket(io), m_nID(-1)
    {
        g_Worker.AdrInsert(m_nID, std::bind1st(std::mem_fun(&CConnection::SendTo), this));
    }
 
    // 날짜시간 메세지를 만든다.
    std::string make_daytime_string()
    {
        time_t now = time(0);
        return ctime(&now); // ctime_s 권장. 예제니까 그냥 쓰고 넘어가지만
    }
 
    void handle_Accept(const boost::system::error_code& /*error*/, size_t /*bytes_transferred*/)
    {
        // Recv 대기
        m_Socket.async_read_some(boost::asio::buffer(m_RecvBuf),
            boost::bind(&CConnection::handle_Read, shared_from_this(), 
            boost::asio::placeholders::error, 
            boost::asio::placeholders::bytes_transferred));
    }
 
    void handle_Write(const boost::system::error_code& error, size_t /*bytes_transferred*/)
    {
    }
 
    void handle_Read(const boost::system::error_code& error, size_t /*bytes_transferred*/)
    {
        if(!error)  // 0 이 성공 나머지는 오류 플러그 
        {
            // 데이터 처리
            if(m_RecvBuf.size())
            {
                std::cout << "Enqueue " << m_nID << "'s : " << (*(char*)(m_RecvBuf.data())) << std::endl;
                g_Worker.AddWork(m_nID, m_RecvBuf.size(), m_RecvBuf.data());
 
                m_RecvBuf.assign(NULL); // 버퍼 초기화 
            }
            // Recv 대기
            m_Socket.async_read_some(boost::asio::buffer(m_RecvBuf),
                boost::bind(&CConnection::handle_Read, shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        }
        else
            std::cout << m_nID << " Disconnect(Write) : " << error.message() << std::endl;
    }
 
    void SendTo(BYTE *pData)
    {
        std::cout << m_nID << " SendTo" << std::endl;
    }
 
public:
    typedef boost::shared_ptr<CConnection> pointer; 
 
    static pointer create(boost::asio::io_service& io) 
    { 
        return pointer(new CConnection(io)); 
    } 
 
    tcp::socket& socket() 
    {
        return m_Socket;
    }
 
    void start(int nID)
    {
        std::cout << "new connect id : "<< nID << " ::: Welcome !" << std::endl;
        m_nID = nID;
 
        // 접속 기념으로 접속 시간 한번 보내주고
        m_sMsg = make_daytime_string();
        boost::asio::async_write(m_Socket, boost::asio::buffer(m_sMsg),
            boost::bind(&CConnection::handle_Accept, shared_from_this(),
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    }
};
 
class CTCP_Server 
{ 
private: 
    tcp::acceptor m_Acceptor; 
    int m_nAcceptCnt; 
 
    void WaitAccept() 
    { 
        ++m_nAcceptCnt; 
        CConnection::pointer new_connection = 
            CConnection::create(m_Acceptor.get_io_service()); 
 
        m_Acceptor.async_accept(new_connection->socket(), 
            boost::bind(&CTCP_Server::handle_Accept, this, new_connection, 
            boost::asio::placeholders::error)); 
    } 
 
    void handle_Accept(CConnection::pointer new_connection, const boost::system::error_code& error) 
    { 
        if (!error) 
        { 
            new_connection->start(m_nAcceptCnt); 
            WaitAccept(); 
        } 
    } 
 
public: 
    CTCP_Server(boost::asio::io_service& io) : m_Acceptor(io, tcp::endpoint(tcp::v4(), 13)), m_nAcceptCnt(0) 
    { 
        WaitAccept(); 
    } 
};
 
int _tmain() 
{ 
    try
    { 
        boost::asio::io_service io; 
 
        CTCP_Server server(io); 
 
        boost::thread_group WorkerThread;
        WorkerThread.create_thread(boost::bind(&CWorker::Worker, &g_Worker));
        WorkerThread.add_thread(new boost::thread(boost::bind(&CWorker::Worker, &g_Worker)));
 
        io.run();
 
        WorkerThread.join_all();
    } 
    catch (std::exception& e) 
    { 
        std::cerr << e.what() << std::endl; 
    } 
 
    return 0; 
}

 

 

클라이언트 예제

#include "stdafx.h"
 
 
#include <ctime>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

const std::string _MY_IP("192.168.0.5"); // 접속할 서버 ip 로 바꿔줘야 합니다

using boost::asio::ip::tcp; 
 
class CProtocol 
{ 
private:

	enum EEnum
	{
		eBufSize = 128,
	};

    tcp::socket m_Socket; 
    bool m_bConnect; 
 
    int m_nTestCount; 
 
public: 
 
    CProtocol(boost::asio::io_service& io) : m_Socket(io) 
    { 
        m_bConnect = false; 
        m_nTestCount = 0; 
    } 
 
    ~CProtocol() {} 
 
    void Connect() 
    { 
        // 입력 받은 host을 resolving한다. 
        tcp::resolver resolver(m_Socket.get_io_service()); 
        tcp::resolver::query query(_MY_IP, "daytime"); 
        tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); 
        tcp::resolver::iterator end; 
 
        // resolving된 endpoint로 접속을 시도한다. 
        boost::system::error_code error = boost::asio::error::host_not_found; 
        while (error && endpoint_iterator != end) 
        { 
            m_Socket.close(); 
            m_Socket.connect(*endpoint_iterator++, error); 
        } 
 
        // 접속 실패인지 확인 
        if (error) 
            throw boost::system::system_error(error); 
 
        m_bConnect = true; 
 
        // 읽어올 데이터를 저장할 array를 만든다. 
        boost::array<CHAR, eBufSize> buf; 
        size_t len = m_Socket.read_some(boost::asio::buffer(buf), error); 
        if (error == boost::asio::error::eof) 
        { 
            m_bConnect = false; 
            return; 
        } 
        else if (error) 
            throw boost::system::system_error(error); 
 
        // 받은 데이터를 cout로 출력한다. 
        std::cout.write(buf.data(), len); 
    } 
 
    bool IsRun() { return m_bConnect; } 
    bool IsSocketOpen() 
    { 
        if(!m_Socket.is_open() && m_bConnect)   // 커넥트 된 이후 소켓이 닫혀버렸다면 
        { 
            m_bConnect = false;                 // 커넥트도 끊김 판정 
            return false; 
        } 
 
        return true; 
    } 
 
    void handle_Recive() 
    { 
 
    } 
 
    void handle_Send() 
    { 
        while(m_bConnect) 
        { 
            if(!IsSocketOpen()) 
                break; 
 
            try
            { 
                boost::array<BYTE, eBufSize> buf = { boost::lexical_cast<BYTE>(m_nTestCount) }; 
                boost::system::error_code error; 
                int len = boost::asio::write(m_Socket, boost::asio::buffer(buf, buf.size()), error); 
                if(len > 0) 
                    std::cout << "> Send " << m_nTestCount << std::endl; 
                m_nTestCount++;  
            } 
            catch (std::exception& e) 
            { 
                m_bConnect = false; 
                std::cerr << e.what() << std::endl; 
            } 
 
            Sleep(3000); 
        } 
    } 
}; 
 
int _tmain(int argc, char* argv[]) 
{ 
    try
    { 
        boost::asio::io_service io_service; // io_service를 하나 생성한다. 
 
        CProtocol Ptc(io_service); 
        boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service)); 
        Ptc.Connect(); // 접속 시도 
        boost::thread Recv(boost::bind(&CProtocol::handle_Recive, &Ptc)); 
        boost::thread Send(boost::bind(&CProtocol::handle_Send, &Ptc)); 
        io_service.run(); 
 
        while(Ptc.IsRun()) 
        { 
        } 
 
        Recv.join(); 
        Send.join(); 
 
        t.join();   // 쓰레드가 종료될 때까지 메인 함수의 종료를 막는다 
    } 
    catch (std::exception& e) 
    { 
        std::cerr << e.what() << std::endl; 
    } 
 
    int in; 
    std::cout << "END"; 
    std::cin >> in ; 
 
    return 0; 
}

 

728x90

'프로그래밍 > boost' 카테고리의 다른 글

boost::shared_ptr 과 Virtual Function 과 boost::timer  (0) 2012.10.25
boost::random  (0) 2012.10.24
boost::asio 비동기 TCP 서버 / 클라 예제  (1) 2012.09.03
boost::Program_options  (0) 2012.05.14
boost::multi_index  (0) 2012.03.30

댓글