신뢰성 있는 메시지 교환을 위한 JGroups

2024. 2. 22. 08:41프로그래밍

728x90

https://baeldung.com/jgroups

1. 간략하게 우선...

JGroups는 안정적인 메시지 교환을 위한 자바 API입니다.

다음과 같은 특정 기능에 대한 간단한 인터페이스를 제공합니다:

  • 유연한 프로토콜 스택, TCP 및 UDP 포함
  • 대규모 메시지의 분할 및 재조립
  • 신뢰할 수 있는 유니캐스트 및 멀티캐스트
  • 장애 탐지
  • 흐름 제어

뿐만 아니라 많은 다른 특징들이 있습니다.

이 튜토리얼에서는 네트워크에 가입하는 새로운 응용 프로그램에 대한 공유 상태를 제공하고 응용 프로그램 간 문자열 메시지을  교환을 위한 간단한 응용 프로그램을 만들 것입니다.

2. 설정

2.1. Maven Dependency

pom.xml에 의존성 하나 추가해야 합니다:

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups</artifactId>
    <version>4.0.10.Final</version>
</dependency>


라이브러리의 최신 버전은 Maven Central에서 확인할 수 있습니다.

2.2. Networking

JGroups는 기본 값으로 IPV6을 사용하려고 시도 할 것입니다. 이는 시스템 구성에 따라 응용 프로그램 간 통신이 안 될 수 있을 가능성이 있습니다.
이 오류를 피하기 위해 우리는 응용 프로그램을 실행할 때 java.net.preferIPv4Stack을 true로 설정할 것입니다.

java -Djava.net.preferIPv4Stack=true com.baeldung.jgroups.JGroupsMessenger

 

3. JChannels

우리의  JGroups 네트워크로의 연결은 JChannel 클래스를 이용 합니다. 이 채널은 클러스터에 가입되고 메시지를 보내고 받으며 네트워크의 상태 정보를 전달합니다.

3.1. 하나의 Channel 생성

우리는 환경구성 파일의 경로를 입력 값으로 해서 JChannel을 생성 합니다. 만약 이 파일 이름을 생략하면 현재 작업 디렉토리에서 udp.xml 파일을 찾을 것입니다.

우리는 명시적으로 이름이 지정된 환경 구성 파일을 사용하여 채널을 생성할 것입니:

JChannel channel = new JChannel("src/main/resources/udp.xml");

 
JGroups 환경 구성이 매우 복잡해 질 수 있지만, 기본 UDP 및 TCP 환경 구성이면 대부분의 응용 프로그램을 만들기에는 충분합니다. 우리는 우리의 코드에 UDP를 위한 설정을 파일을 포함시켰고 이 설정을 튜토리얼에 사용할 것입니다.
전송 구성에 대한 자세한 정보는 여기 JGroups 메뉴얼에서 확인해 보십시오.

3.2. 채널에 연결하기

우리가 채널을 만든 후에는 클러스터에 가입해야만 합니다. 클러스터란 메시지를 교환하는 노드 그룹을 말합니다.
클러스터 가입을 위해서는 클러스터 이름이 필요합니다:

channel.connect("Baeldung");


최초의 노드가 클러스터에 가입하려고 할 때,  클러스터가 존재하지 않는다면 하나를 생성 할 것입니다. 아래에서 이러한 과정을 확인할 것입니다.

3.3. 채널 이름 지정하기

피어가 클러스터에 들어오고 나가는 것에 대한 알림을 받기 위해 노드들은 이름으로 식별됩니다. JGroups이 이름을 자동으로 할당하거나 우리가 설정할 수 있습니다:

channel.name("user1");


우리는 이러한 이름들을 아래에서 사용하여 클러스터에 노드가 들어오고 나가는 것을 추적할 것입니다.

3.4. 채널 닫기

피어가 우리가 종료했다는 시기적절한 알림을 받기를 원한다면 채널 정리가 필요합니다.
JChannel을 close 메서드로 닫습니다:

channel.close()



4. Cluster View 변경
With a JChannel created we’re now ready to see the state of peers in the cluster and exchange messages with them.
JChannel이 생성되었으므로 클러스터 내 피어의 상태를 확인하고 메시지를 교환할 준비가 되었습니다.

JGroups는 View 클래스 내에서 클러스터 상태를 유지하고 있습니다. 각 채널은 네트워크의 하나의 View를 가지고 있습니다. 뷰가 변경되면 viewAccepted() 콜백을 통해 전달됩니다.
이 튜토리얼에서는 우리는 모든 인터페이스 메서드를 구현하는 ReceiverAdaptor API 클래스를 확장(extend) 할 것입니다.
이렇게 하는 것이 권장하는 콜백 구현방법입니다.
우리의 애플리케이션에 viewAccepted를 추가해 봅시다:

public void viewAccepted(View newView) {

    private View lastView;

    if (lastView == null) {
        System.out.println("Received initial view:");
        newView.forEach(System.out::println);
    } else {
        System.out.println("Received new view.");

        List<Address> newMembers = View.newMembers(lastView, newView);
        System.out.println("New members: ");
        newMembers.forEach(System.out::println);

        List<Address> exMembers = View.leftMembers(lastView, newView);
        System.out.println("Exited members:");
        exMembers.forEach(System.out::println);
    }
    lastView = newView;
}


각 View 클래스는 클러스터의 각 멤버를 나타내는 Address 객체의 List를 포함하게 됩니다. JGroups는 클러스터내 신규 또는 나가버린 멤버를 감지하는 데 사용하는 View를 서로 비교하는 편리한 메서드를 제공합니다.

5. 메시지 보내기

이 튜토리얼에서 우리는 명령 줄에서 읽은 문자열을 사용했지만, 응용 프로그램이 다른 형식의 데이터 유형을 어떤 식으로 교환할 수 있는 지에 대해서 쉽게 파악할 수 있게 해줍니다.

5.1. 브로드캐스트 메시지

하나의 메시지는 바이트 배열과 목적지를 가지고 생성됩니다; JChannel 은 우리를 위해서 발신자를 설정해 줍니다. 만약 목적지가 null이면, 전체 클러스터가 해당 메시지를 받게 될 것입니다.
우리는 명령 줄에서 글자를 받아들여서 클러스트로 해당 글자를 보낼 것입니다:

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);


만약 우리가 우리 프로그램에 대해 다수의 인스턴스들을 실행하고 있고 이 메시지를 보낸다면 그 인스턴스 모두가 해당 메시지를 받을 수 있을 것입니다(아래 receive()  메서드를 우리가 구현해 놓았다면). 이는 발신자도 마찬가지 입니다.

5.2. 메시지 차단

만약 우리가 우리의 메시지를 보고 싶지 않다면, 그에 대한 속성을 다음과 같이 설정할 수 있습니다:

channel.setDiscardOwnMessages(true);


앞선 테스트를 실행할 때, 메시지 발신자는 자신의 브로드캐스트 메시지를 수신하지 않습니다.

 

5.3. 다이렉트 메시지

다이렉트 메시지를 보내려면 유효한 주소가 필요합니다. 우리가 노드를 이름으로 참조하는 경우, 주소를 조회하는 방법이 필요합니다. 다행스럽게도,  이를 위해 우리는 View를 가지고 있는 것이죠.
현재 View는 항상 JChannel에서 사용 할 수 있습니다:

private Optional<address> getAddress(String name) { 
    View view = channel.view(); 
    return view.getMembers().stream()
      .filter(address -> name.equals(address.toString()))
      .findAny(); 
}


주소 이름은 클래스 toString() 메서드를 통해 사용할 수 있으므로, 우리는 우리가 원하는 이름을 클러스터 멤버 목록에서 검색하기만 하면 됩니다.
그래서 우리는 콘솔에서 이름을 얻어서 연관이 있는 목적지을 찾아 직접 메시지를 보낼 수 있습니다:

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName)
  .orElseThrow(() -> new Exception("Destination not found"); 
Message message = new Message(destination, "Hi there!"); 
channel.send(message);

 


6. 메시지 수신

우리는 메시지를 보낼 수 있으므로, 이제 그것들을 받아보려는 시도를 더해 보도록 하겠습니다.
ReceiverAdaptor의 빈 receive 메서드를 오버라이드합시다:

public void receive(Message message) {
    String line = Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);
}


메시지가 문자열을 포함한다는 것을 알기 때문에, 우리는 getObject()를 System.out에 안전하게 전달할 수 있습니다.

7. State Exchange - 상태 교환

노드가 네트워크에 들어오면 클러스터에 대한 상태 정보를 검색해야 할 수 있습니다. JGroups는 이를 위한 상태 전송 메커니즘을 제공합니다.
노드가 클러스터에 가입하면, 단순하게 getState()를 호출합니다. 클러스터는 일반적으로 그룹의 가장 오래된 멤버, 즉 조정자( the coordinator)부터 상태를 검색합니다.

우리의 애플리케이션에 브로드캐스트 메시지 카운트를 추가해 봅시다. 우리는 새로운 멤버 변수를 추가하고 receive() 내에서 증가시킬 것입니다:

private Integer messageCount = 0;

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);

    if (message.getDest() == null) {
        messageCount++;
        System.out.println("Message count: " + messageCount);
    }
}


우리는 null 대상을 확인합니다. 왜냐하면 만약 다이렉트 메시지를 세어본다면, 각 노드는 틀린 번호를 가질 것입니다.
다음으로 ReceiverAdaptor의 두 가지 메서드를 더 오버라이드합니다:

public void setState(InputStream input) {
    try {
        messageCount = Util.objectFromStream(new DataInputStream(input));
    } catch (Exception e) {
        System.out.println("Error deserialing state!");
    }
    System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
    Util.objectToStream(messageCount, new DataOutputStream(output));
}


메시지와 유사하게, JGroups는 바이트 배열로 상태를 전송합니다.
JGroups는 조정자가 상태를 쓰기 위한 InputStream과 새로운 노드가 읽기 위한 OutputStream을 제공합니다. API는 데이터의 직렬화와 역직렬화를 위한 편리한 클래스를 제공합니다.
생산 코드에서 상태 정보에 액세스하는 것은 스레드로부터 안전해야 하는 것에 유의 해야 합니다.
마지막으로, 우리는 클러스터에 연결한 후에 시작시 getState()를 추가합니다:

channel.connect(clusterName);
channel.getState(null, 0);


getState()는 상태를 요청할 대상과 밀리초 단위의 타임아웃을 받습니다. null 대상은 조정자를 의미하고 0은 타임아웃되지 않음을 의미합니다.
이 앱을 두 개의 노드로 실행하고 브로드캐스트 메시지를 교환하면 메시지 카운트가 증가하는 것을 볼 수 있습니다.
그런 다음 세 번째 클라이언트를 추가하거나 하나를 중지하고 시작하면, 새로 연결된 노드가 올바른 메시지 카운트를 인쇄하는 것을 볼 수 있습니다.

 

8. 결론

이 튜토리얼에서는 JGroups를 사용하여 메시지를 교환하는 응용 프로그램을 만들어 보았습니다. 우리는 어떤 노드가 클러스터에 연결되었고 나갔는 지를 모니터링하고, 또한 클러스터에 새로운 노드가 가입할 때 클러스터 상태를 전송하기 위해 API를 사용했습니다.

코드 샘플은 언제나 GitHub에서 찾을 수 있습니다.

 

728x90