profile image

L o a d i n g . . .

728x90

백엔드 개발자의 중요한 소양 중 하나는 네트워크에 대한 이해입니다. 분산환경에 대한 경험을 요구하는 회사가 많아지면서 그에 따른 네트워크에 대한 이해도 그 어느 때보다 중요하게 됐습니다. 이번 포스팅에서는 Java NIO는 TCP를 어떻게 처리하는지 살펴보겠습니다. 포스팅에 첨부된 소스코드는 아래 github 링크에서 확인할 수 있습니다. 

 

GitHub - seonwoo960000/java-nio-practice

Contribute to seonwoo960000/java-nio-practice development by creating an account on GitHub.

github.com

 

Java NIO(New I/O) 

Java NIO는 확장성 있는 Java 입출력(I/O) 기능을 구현할 수 있는 API를 제공합니다. Non-blocking I/O 작업을 지원하여 하나의 스레드에서 여러 I/O 작업을 처리(multiplexing)할 수 있어 네트워크 통신 및 파일 처리의 성능을 향상할 수 있습니다. 

https://kb.novaordis.com/index.php/Java_NIO_and_TCP_Connections

Java NIO를 사용하여 서버와 클라이언트를 구현한 구조를 도식화한 그림입니다. Java NIO에는 익숙하지 않은 개념이 등장하므로 이에 대한 설명을 우선 드리겠습니다. 

  • ServerSocketChannel: 네트워크 연결 요청을 수신 및 처리하기 위한 Java NIO 클래스입니다. Server 생성 시 사용되고 클라이언트로부터 연결 요청을 수신(accept)하고, SocketChannel을 생성하여 실제 데이터 통신을 처리합니다.  
  • SocketChannel: 네트워크로부터 데이터 입출력을 담당하는 Java NIO 클래스입니다. 클라이언트와 서버 간 양방향 통신을 지원하고 비동기 I/O 작업을 수행할 수 있습니다. 
  • Selector: 비동기 I/O를 관리하기 위한 클래스(다중 채널을 관리)입니다. ServerSocketChannel, SocketChannel과 함께 사용되며 네트워크 이벤트를 모니터링하고 이에 대한 응답으로 적절한 I/O 작업을 수행합니다. 
  • SelectionKey: Selector에 등록된 채널에 대한 관련 정보를 포함하는 클래스입니다. Selector와 함께 사용되며 채널의 상태 변화 및 이벤트를 추적하고, 채널과 관련된 연산을 수행할 수 있는 메서드를 제공합니다. 
  • InetSocketAddress: "IP 주소 + 포트"를 나타내는 클래스 
  • ByteBuffer: byte 데이터를 메모리에 임시로 저장하거나 읽는 데 사용됩니다. 네트워크 통신, 파일 입출력 등을 위해 사용됩니다. 

 

다음으로는 server와 client가 어떻게 동작하는지 단계별로 살펴보겠습니다. 

 

Server 

static void createServer() throws Exception {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false); // non-blocking 동작을 위해 설정
    InetSocketAddress address = new InetSocketAddress(8080); // localhost:8080
    ServerSocket serverSocket = serverSocketChannel.socket();
    serverSocket.bind(address);

    Selector selector = Selector.open();
    // serverSocketChannel은 OP_ACCEPT 네트워크 이벤트를 모니터링
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        // 최소 1개 이상의 네트워크 이벤트(OP_ACCEPT, OP_READ)가 발생될때까지 blocking
        selector.select();
        Set<SelectionKey> selectedKeys = selector.selectedKeys();

        // SelectionKey를 순회하면서 SelectionKey에 적합한 로직을 처리
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
            SelectionKey k = i.next();
            if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                System.out.println("New TCP Connection.");
                i.remove();

                // 새로운 TCP connection을 처리하기 위한 별도의 SocketChannel을 할당
                ServerSocketChannel ssc = (ServerSocketChannel) k.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // 새로 생성된 socketChannel은 OP_READ 네트워크 이벤트를 모니터링
                sc.register(selector, SelectionKey.OP_READ);
            } else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                i.remove();
                
                // SocketChannel에서 읽기 작업을 수행 
                SocketChannel sc = (SocketChannel) k.channel();
                // 읽은 데이터를 Buffer에 저장 
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesRead = sc.read(buffer);

                if (bytesRead == -1) {
                    System.out.println("TCP connection closed");
                    k.cancel();
                } else {
                    buffer.flip();
                    byte[] content = new byte[bytesRead];
                    buffer.get(content, 0, bytesRead);
                    System.out.println("Received: \n" + new String(content, StandardCharsets.UTF_8));

                    ByteBuffer message = ByteBuffer.wrap(content);
                    // client로부터 수신된 데이터를 그대로 응답 
                    sc.write(message);
                }
            }
        }
    }
}

Server는 아래와 같이 동작합니다. 

  1. ServerSocketChannel을 생성 후 localhost:8080 주소에 매핑 
  2. Selector 생선 후 ServerSocketChannel과 연동 
  3. 무한 루프에 진입해서 다음과 같이 동작 
    1. selector.select()와 selector.selectedKeys() 메서드를 호출해서 네트워크 이벤트를 기다리고, 발생한 네트워크 이벤트를 조회 
    2. OP_ACCEPT 네트워크 이벤트가 발생하면(새로운 TCP connection 생성) 별도의 소켓(SocketChannel)을 생성. 생성된 SocketChannel은 OP_READ 이벤트를 추적할 수 있도록 selector 등록 
    3. OP_READ 네트워크 이벤트가 발생하면 이벤트와 관련된 SocketChannel을 조회. 조회된 SocketChannel을 활용해서 수신한 데이터를 처리 및 조회. 

 

Client 

static void startClient() throws Exception {
    // Client용 SocketChannel 생성
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.connect(new InetSocketAddress("localhost", 8080)); // localhost:8080
    socketChannel.configureBlocking(false); // non-blocking
    System.out.println("Socket connected");

    // Selector 생성 후 OP_READ 등록
    final Selector selector = Selector.open();
    socketChannel.register(selector, SelectionKey.OP_READ);

    new Thread(() -> {
        while (true) {
            try {
                // OP_READ 이벤트를 모니터링. 서버로부터의 데이터 수신 시 아래 로직 실행
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
                    SelectionKey k = i.next();

                    if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        i.remove();
                        SocketChannel sc = (SocketChannel) k.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int bytesRead = sc.read(buffer);

                        if (bytesRead == -1) {
                            System.out.println("TCP connection closed");
                            k.cancel();
                        } else {
                            buffer.flip();
                            byte[] content = new byte[bytesRead];
                            buffer.get(content, 0, bytesRead);

                            System.out.println("Received: \n" + new String(content));
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Exception: " + e);
            }
        }
    }, "selector thread").start();

    // System.in에 입력된 메시지를 서버로 송신 
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        // exit 입력 시 종료
        if (Objects.equals(line, "exit")) break;

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.clear();
        buffer.put(line.getBytes());
        buffer.flip();

        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
    }

    socketChannel.close();
    System.out.println("Socket channel closed");
}

Client는 아래와 같이 동작합니다. 

  1. Server와 통신하기 위한 SocketChannel을 생성 후 localhost:8080에 바인딩합니다. 
  2. OP_READ 이벤트를 모니터링하기 위해 Selector을 생성해서 SocketChannel에 등록합니다. 
  3. SocketChannel은 OP_READ 이벤트가 발생하면(서버로부터 데이터가 수신되면) 수신된 데이터를 콘솔에 프린트합니다. 
  4. 콘솔에 유저가 입력한 데이터를 서버로 송신합니다. 

 

자, 여기까지 Java NIO를 활용해서 Server와 Client 애플리케이션을 어떻게 작성하는지 살펴봤습니다. 여기서 더 나아가 위에서 사용한 다양한 메서드들이(네트워크와 관련된) 내부적으로는 어떻게 동작하는지 살펴보겠습니다. 

 

Selector

Selector은 비동기 I/O 작업을 위한 Java NIO의 핵심 클래스입니다. Selector는 다수의 채널을 모니터링하고, 이벤트가 발생한 채널을 선택적으로 처리할 수 있게 합니다(위 코드에서 살펴봤습니다). 자 그럼 selector에는 어떤 메서드가 있는지 살펴보겠습니다. 

register(Selector sel, int ops) 

Selector에 채널을 등록합니다. 등록된 채널은 selector에 의해 관리되고, selector는 해당 채널에서 발생한 이벤트를 감지할 수 있습니다. 위 Server 코드에서 아래의 메서드(serverSocketChannel.register(...))가 실행됐을 때 내부적으로 selector의 register(...) 메서드가 호출됩니다. 

# 내부적으로 selector의 register 메서드가 호출됨 
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
...
sc.register(selector, SelectionKey.OP_READ);

select() 

select() 메서드를 통해 selector에 등록된 채널에서 발생한 이벤트를 감지할 수 있습니다. 만약 이벤트가 없는 경우 blocking 방식으로 동작합니다(이벤트가 발생할때까지 스레드는 대기합니다). 참고로 여기서 말하는 이벤트는 채널의 상태 변화 및 데이터 수신을 의미합니다. 내부적으로는 SelectorImpl의 doSelect라는 abstract 메서드를 호출합니다. 이 메서드는 운영체제에 맞는 구현체에서 해당 메서드를 구현하고있습니다. 

https://github.com/openjdk/jdk17/blob/master/src/java.base/share/classes/sun/nio/ch/SelectorImpl.java#L51

SelectorImpl 구현체와 관련된 개념 

  • Poll: POSIX 시스템 콜. 대기 중인 파일 디스크립터에 대해 이벤트가 발생할 때까지 블록된 프로세스를 지원하기 위해서 사용됩니다. 주로 poll보다 확장성과 성능이 좋은 epoll 및 kqueue가 사용됩니다. 
  • WePoll: Windows 운영체제에서 동작. 파일 디스크립터 및 소켓에 대한 이벤트를 모니터링하고, 해당 이벤트에 대한 알림을 받는 기능을 제공합니다. 
  • KQueue: FreeBSD, macOS에서 사용. 파일 디스크립터 및 소켓에 대한 이벤트를 모니터링하고, 해당 이벤트에 대한 알림을 받는 기능을 제공합니다. 
  • Epoll: Linux 운영체제에서 사용. 전통적인 select 및 poll 보다 더 높은 확장성과 효율성을 제공합니다. WePoll과 KQueue와 같이 이벤트를 모니터링하고 해당 이벤트에 대한 알림을 받는 기능을 제공합니다. 

ServerSocketChannel 

accept() 

Client으로부터의 커넥션을 맺기 위해 사용합니다. Connection이 맺어지면 SocketChannel을 반환합니다. ServerSocketChannelImpl의 accept() 메서드는 아래와 같습니다. 

@Override
public SocketChannel accept() throws IOException {
    acceptLock.lock();
    try {
        int n = 0;
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        boolean blocking = isBlocking();
        try {
            begin(blocking);
            do {
                n = accept(this.fd, newfd, isaa); // (1)  
            } while (n == IOStatus.INTERRUPTED && isOpen());
        } finally {
            end(blocking, n > 0);
            assert IOStatus.check(n);
        }

        if (n < 1)
            return null;

        // newly accepted socket is initially in blocking mode
        IOUtil.configureBlocking(newfd, true);

        InetSocketAddress isa = isaa[0];
        SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);

        // check permitted to accept connections from the remote address
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    } finally {
        acceptLock.unlock();
    }
}

...

// (2)  
private int accept(FileDescriptor ssfd,
                   FileDescriptor newfd,
                   InetSocketAddress[] isaa)
    throws IOException
{
    return accept0(ssfd, newfd, isaa);
}

...

(3) 
// -- Native methods --

// Accepts a new connection, setting the given file descriptor to refer to
// the new socket and setting isaa[0] to the socket's remote address.
// Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
// connections are pending) or IOStatus.INTERRUPTED.
//
private native int accept0(FileDescriptor ssfd,
                           FileDescriptor newfd,
                           InetSocketAddress[] isaa)
    throws IOException;

 

호출순서는 (1) -> (2) -> (3)입니다. (3)의 accept0 메서드는 JNI 메서드를 호출함으로써 새로운 커넥션을 위한 socket(file descriptor)을 할당받습니다. accept0의 JNI 구현 코드는 아래와 같습니다. 

// JNIEnv *env: native code와 Java runtime의 상호작용을 위한 환경변수 
// jobject this: native 메서드를 호출한 클래스의 객체 
// jobject ssfdo: ServerSocket file descriptor 
// jobject newfdo: 커넥션 정보를 담을 새로운 (socket)file descriptor 
// jobject isaa: InetSocketAddress 객체를 담기위한 배열 
Java_sun_nio_ch_ServerSocketChannelImpl_accept0(JNIEnv *env, jobject this,
                                                jobject ssfdo, jobject newfdo,
                                                jobjectArray isaa)
{	
	// fd_dfID(key)를 활용해 ssfdo 객체로부터 server socket file descriptor 값을 추출 
    jint ssfd = (*env)->GetIntField(env, ssfdo, fd_fdID); 
    jint newfd;
    struct sockaddr *sa;
    int alloc_len;
    jobject remote_ia = 0;
    jobject isa;
    jint remote_port;
	
    // socket address 저장을 위한 메모리를 할당 후, socket address 정보를 sa에 저장 
    NET_AllocSockaddr(&sa, &alloc_len);

    /*
     * accept connection but ignore ECONNABORTED indicating that
     * a connection was eagerly accepted but was reset before
     * accept() was called.
     */
     // newfd가 생성되거나 error(ECONNABORTED)가 발생할때까지 무한루프 
    for (;;) {
        socklen_t sa_len = alloc_len;
        // ServerSocket file descriptor으로부터 새로운 커넥션을 취득 
        // 취득한 커넥션 정보를 newfd에 저장 
        newfd = accept(ssfd, sa, &sa_len);
        if (newfd >= 0) {
            break;
        }
        if (errno != ECONNABORTED) {
            break;
        }
        /* ECONNABORTED => restart accept */
    }

	// "newfd < 0"의 의미는 커넥션 accept 과정이 실패했음을 의미
	// 할당된 메모리를 해제 
    if (newfd < 0) {
        free((void *)sa);
        // java 코드의 native int accept0 메서드 주석의 IOStatus.UNAVAILABLE 참고 
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE; 
        // java 코드의 native int accept0 메서드 주석의 IOStatus.INTERRUPTED 참고 
        if (errno == EINTR)
            return IOS_INTERRUPTED;
        JNU_ThrowIOExceptionWithLastError(env, "Accept failed");
        return IOS_THROWN;
    }
	
    // newfdo에 newfd를 저장 
    (*env)->SetIntField(env, newfdo, fd_fdID, newfd);
    remote_ia = NET_SockaddrToInetAddress(env, sa, (int *)&remote_port);
    free((void *)sa);
    // InetSocketAddress 객체를 생성 
    isa = (*env)->NewObject(env, isa_class, isa_ctorID,
                            remote_ia, remote_port);
    // 생성된 InetSocketAddress객체를 isaa 배열의 첫번째 인덱스 위치에 저장 
    (*env)->SetObjectArrayElement(env, isaa, 0, isa);
    return 1;
}

 

SocketChannel 

read(ByteBuffer dst) 

SocketChannel의 read(...) 메서드는 인자로 전달한 ByteBuffer에 socket으로부터 읽은 데이터를 저장합니다. Socket으로부터 데이터를 읽어 들이는 로직이 위치한 메서드의 호출 순서는 SocketChannelImpl.read -> IOUtil.read -> NativeDispatcher.pread -> FileDispatcherImpl.pread0입니다. 그럼 pread0 메서드는 JNI로 어떻게 구현하는지 살펴보겠습니다. 

Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
                            jlong address, jint len, jlong offset)
{
    // file descriptor 값 조회 
    jint fd = fdval(env, fdo);
    // buffer 포인터 조회 
    void *buf = (void *)jlong_to_ptr(address);

    return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}

마지막에 사용된 pread64는 POSIX 표준에 정의된 함수로 다음과 같이 정의됩니다(참고로 리눅스 커널 업데이트 과정에서 pread()는 pread64()로 변경됐습니다).

ssize_t pread(int fd, void *buf, size_t count, off_t offset);

pread64 함수를 활용해 지정된 file descriptor의 특정 offset에서 count 만큼의 byte를 buf(버퍼)로 읽어 들입니다. 

write(ByteBuffer buf) 

SocketChannel의 read 함수를 이해하셨다면, write 함수의 구현 방식에 대해서도 대략적으로 감을 잡으실 수 있을 것입니다. Socket에 데이터를 쓰는 로직이 위치한 메서드의 호출 순서는 SocketChannelImpl.write -> IOUtil.write -> NativeDispatcher.pwrite -> FileDispatcherImpl.pwrite0입니다. 마찬가지로 pwrite0 메서드는 JNI로 어떻게 구현하는지 살펴보겠습니다. 

Java_sun_nio_ch_FileDispatcherImpl_pwrite0(JNIEnv *env, jclass clazz, jobject fdo,
                            jlong address, jint len, jlong offset)
{
    // file descriptor 값 조회 
    jint fd = fdval(env, fdo);
    // buffer 포인터 조회 
    void *buf = (void *)jlong_to_ptr(address);
    
    return convertReturnVal(env, pwrite64(fd, buf, len, offset), JNI_FALSE);
}

pwrite64 함수가 사용된 점을 빼면 pread0과 유사하기 때문에 다른 설명을 스킵하겠습니다. pwrite64 함수는 다음과 같이 정의됩니다(마찬가지로 리눅스 커널이 업데이트되면서 pwrite()가 pwrite64()가 변경됐습니다). 

ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset);

pwrite64 함수는 버퍼에 저장된 데이터 중 offset 위치로부터 count 바이트만큼 file(file descriptor으로 표시된)에 쓰기 작업을 수행합니다. 

 

pread, pwrite 

pread, pwrite 함수를 C/C++ 코드를 통해 살펴보겠습니다. 

#include <iostream>
#include <fcntl.h>
#include <unistd.h>

int main() {
    int fd = open("file.txt", O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
    if (fd == -1) {
        perror("open");
        return 1;
    }

    // offset 10 위치에서 "hello world" 쓰기 
    const char* message = "hello world";
    off_t writeOffset = 10;
    size_t writeCount = strlen(message);

    ssize_t bytesWritten = pwrite(fd, message, writeCount, writeOffset);
    if (bytesWritten == -1) {
        perror("pwrite");
        close(fd);
        return 1;
    }
    printf("Successfully written %ld bytes to the file at offset %ld.\n", bytesWritten, writeOffset);

    // offset 10 위치에서 "hello world" 읽기 
    char buffer[11];   
    off_t readOffset = 10;
    size_t readCount = strlen(message);

    ssize_t bytesRead = pread(fd, buffer, readCount, readOffset);
    if (bytesRead == -1) {
        perror("pread");
        close(fd);
        return 1;
    }
    buffer[bytesRead] = '\0';    // Null-terminate the buffer
    printf("Read %ld bytes at offset %ld: %s\n", bytesRead, readOffset, buffer);

    close(fd);
    return 0;
}

위 코드는 다음과 같이 동작합니다.

  • pwrite 함수를 통해 file.txt에 "hello message"를 offset 10부터 쓰기를 수행합니다. 
  • pread 함수를 file.txt의 offset 10위치로부터 "hello message" 길이만큼 데이터를 읽습니다. 

콘솔에는 아래와 같은 텍스트가 출력됩니다.

main 실행 결과

file.txt의 쓰기 결과는 아래와 같습니다(offset은 모두 Null, 10번째 offset에서부터 "hello world" 쓰기). 

file.txt

 

connect(SocketAddress sa) 

SocketChannel의 connect를 활용해 서버의 소켓에 커넥션을 맺을 수 있습니다. 실제 커넥션을 수행하는 JNI 메서드의 호출의 순서는 SocketChannel.connect -> SocketChannelImpl.connect -> Net.connect0와 같이 이루어집니다. 그럼 connect0 메서드는 JNI로 어떻게 구현되는지 살펴보겠습니다. 

Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6, jobject fdo,
                             jobject iao, jint port)
{
    SOCKETADDRESS sa;
    int rv; // return value 
    int sa_len; // socket address length 
    SOCKET s = (SOCKET)fdval(env, fdo);

    if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) {
        return IOS_THROWN;
    }
	
    // s(socket file descriptor)를 sa_len 길이를 가진 sa(socket address)에 커넥션 시도 
    // connect는 POSIX system call: s(socket's file descriptor)와 sa(socket address)을 연결 
    rv = connect(s, (struct sockaddr *)&sa, sa_len);
    if (rv != 0) {
        int err = WSAGetLastError();
        if (err == WSAEINPROGRESS || err == WSAEWOULDBLOCK) {
            return IOS_UNAVAILABLE;
        }
        NET_ThrowNew(env, err, "connect");
        return IOS_THROWN;
    } else {
        /* Enable WSAECONNRESET errors when a UDP socket is connected */
        int type = 0, optlen = sizeof(type);
        rv = getsockopt(s, SOL_SOCKET, SO_TYPE, (char*)&type, &optlen);
        if (rv == 0 && type == SOCK_DGRAM) {
            setConnectionReset(s, TRUE);
        }
    }
    return 1;
}

 

요약 

이번 포스팅에서는 Java NIO를 사용하여 서버와 클라이언트 애플리케이션을 만드는 방법과 그중에서 발생하는 커넥션의 생성과 데이터 송수신 과정을 JNI 코드를 통해 확인한 것을 다루었습니다. 다음 포스팅에서는 TCP 3-way handshake와 같은 더 로우 레벨의 과정이 어떻게 이루어지는지도 코드를 통해 살펴보겠습니다.

 

Reference 

 

728x90
복사했습니다!