Libzmq Chapter 1 - Basics

From 탱이의 잡동사니
Jump to navigation Jump to search

Fixing the World

ZeroMQ 를 어떻게 설명할 수 있을까? 우리들 중 누군가는 끝내주는 기능들에 대해서 이야기를 하고 한다. 스테로이드를 맞은 소켓, 라우팅 기능이 있는 우편함, 빠른 속도!

Starting Assumptions

우리는 최소한 여러분이 최소한 ZeroMQ 3.2 버전 이상을 사용하고 있다고 가정한다. 그리고 Linux 혹은 그와 상응하는 운영체제를 사용하고 있다고 가정한다. 또한, 여러분이 더도 말고, 덜도 말고 딱 C example code 를 읽을 수 있다고 가정한다. 그리고 앞으로 계속해서 나올 PUSH 혹은 SUBSCRIBE 라는 말이 때때로 ZMQ_PUSH 혹은 ZMQ_SUBSCRIBE 라는 것을 이해할 수 있을 것이라고 가정한다.

Getting the Examples

예제 코드들은 아래 github 저장소에서 확인할 수 있다. 가장 쉽게 예제코드를 다운받을 수 있는 방법은 아래의 명령어로 저장소를 복사하는 것이다. <source lang=bash> git clone --depth=1 https://github.com/imatix/zguide.git </source> 다음, examples 의 하위 디렉토리를 살펴보자. 각각의 언어별로 예제가 있는 것을 확인할 수 있을 것이다. 만약 여러분이 사용하는 언어의 예제가 없다면 submit a translation 할 것을 강추한다. 이것이 바로 이 문서가 어떻게 여러 사람들에게 유용할 수 있는지의 이유이다. 모든 예제들은 MIT/X11 라이센스를 가진다.

Ask and Ye Shall Receive

먼저 아래 코드를 보자. Hello World 예제부터 시작해볼 것이다. client 와 server 를 만들고, client 가 "Hello" 라고 server 에게 보내면, server 는 "World"라고 대답할 것이다. 여기 C 로 짜여진 서버가 있다. 포트 번호 5555 를 이용하여 ZeroMQ socket 을 열고, request 를 수신하고, "World" 라는 응답을 준다. <source lang=c> // hwserver.c // Hello World server

  1. include <zmq.h>
  2. include <stdio.h>
  3. include <unistd.h>
  4. include <string.h>
  5. include <assert.h>
  6. include <string.h>

int main(int argc, char** argv) {

   // Socket to talk to clients
   void* context;
   void* responder;
   int ret;
   
   context = zmq_ctx_new();
   assert(context != NULL);
   
   responder = zmq_socket(context, ZMQ_REP);
   assert(responder != NULL);
   
   ret = zmq_bind(responder, "tcp://*:5555");
   assert(ret == 0);
   
   while(1) {
       char buffer[10];
       
       memset(buffer, 0x00, sizeof(buffer));
       zmq_recv(responder, buffer, sizeof(buffer) - 1, 0);
       printf("Received message. message[%s]\n", buffer);
       
       sleep(1);   // Do some 'work'
       zmq_send(responder, "World", 5, 0);
   }
   
   return 0;

} </source>

아래는 client 쪽 소스내용이다. <source lang=c> // hwclient.c // Hello World client

  1. include <zmq.h>
  2. include <string.h>
  3. include <stdio.h>
  4. include <unistd.h>
  5. include <assert.h>

int main(int argc, char** argv) {

   void* ctx;
   void* req;
   int req_cnt;
   int ret;
   
   printf("Connecting to hello world server..\n");
   
   ctx = zmq_ctx_new();
   assert(ctx != NULL);
   
   req = zmq_socket(ctx, ZMQ_REQ);
   assert(req != NULL);
   
   zmq_connect(req, "tcp://localhost:5555");
   
   for(req_cnt = 0; req_cnt != 10; req_cnt++) {
       char buffer[100];
       
       printf("Sending Hello %d..\n", req_cnt);
       sprintf(buffer, "Hello");
       zmq_send(req, buffer, strlen(buffer), 0);
       
       memset(buffer, 0x00, sizeof(buffer));
       ret = zmq_recv(req, buffer, sizeof(buffer) - 1, 0);
       printf("Received message. message[%s], cnt[%d], received_size[%d]\n", buffer, req_cnt, ret);
   }
   
   zmq_close(req);
   zmq_ctx_destroy(ctx);
   
   return 0;

} </source>

request-reply

REQ-REP socket pair 는 서로 굉장히 밀집되어 이뤄진다. client 는 루프를 돌면서 zmq_send() 를 호출하고, 이어 zmq_recv() 를 호출한다. 만약 한번에 두개 이상의 메시지를 전송/수신하고자 한다면 zmq_send() 혹은 zmq_recv()의 return code 는 -1(error) 를 반환할 것이다. 마찬가지로, server 쪽에서도 순서에 맞춰 zmq_recv() 후에 zmq_send() 를 호출한다.

ZeroMQ 는 C 를 reference language 로 사용하며, 앞으로 나올 예제들의 main language 가 될 것이다.

소스를 보면 알겠지만, 믿을 수 없을 정도로 굉장히 쉽고 간단하다. 게다가 앞서 이야기 했듯이, ZeroMQ 는 굉장한 힘을 가지고 있다. 하나의 server 에 1000 개 이상의 client 를 연동할 수도 있고, 정말 빠르게 동작할 것이다. 한번 재미삼아 client 를 serer 보다 먼저 실행시켜 보라. 잘 동작하지 않는가? 잠시 이게 무슨 의미인지 생각해보라.

잠시 위의 두개의 프로그램이 정확히 어떤 일을 하는지 알아보도록 하자. 작업을 위해 ZeroMQ context 를 생성하고, socket 을 생성했다. 지금은 무슨말인지 몰라도 된다. 뒷부분에가면 전부 이해가 될 것이다. 서버는 REP(reply) 소켓을 5555 포트에 bind 를 하고, loop 를 돌면서 요청을 기다리고, 요청이 들어오면 바로 응답을 전송한다. 클라이언트 는 서버로 요청을 전송하고, 응답을 수신한다.

만약 여러분이 서버를 죽이고(Ctrl + c), 재시작을 할 경우, 클라이언트가 정상적으로 작동하지 않을 수도 있다. 사실, 오류를 해결하고 복구하는 일은 그리 쉽지가 않는 일이다. 신뢰성있는 request-reply 연결을 만드는 것은 매우 복잡한 일이며, Chapter 4 - Reliable Request-Reply Patterns 에서 다룰 것이다.

실은 화면에는 안보이지만 짧은 코드 라인으로는 상상할 수 없는 많은 일들이 벌어지고 있다. 오류 없이 잘 작동하고, 빠르고, 안정적으로 동작한다. 우리가 살편본 패턴은 request-reply pattern 이었다. 아마도 가장 간단한 ZeroMQ 사용법 중의 하나일 것이다. 이 패턴은 RPC 모델과 client/server 모델과 부합되는 패턴이다.

A Minor Note on String

ZeroMQ 는 소켓에 입력되는 데이터에 대해서 데이터의 길이(크기) 말고는 아무것도 알지 못한다. 그 뜻은, 다른 프로그램에서 수신한 데이터를 읽을 수 있도록 하는 것은 순전히 여러분의 책임이라는 뜻이다. 복잡한 데이터 타입을 다루는 특수 라이브러리(프로토콜 버퍼와 같은)부터 간단한 string 까지 모두 신경을 써야 한다.

C 와 같은 언어들에서는 string 의 끝을 NULL byte 로 표현한다. 만약 "Hello"라는 문자열을 NULL byte 와 함께 전송하고자 한다면 다음과 같이 해야 한다. <source lang=c> zmq_send(requester, "Hello", 6, 0); </source>

하지만 만약 다른 프로그램 언어에서 string 을 전송한다면 NULL byte 가 포함되지 않을지도 모른다. 예를 들어, 위와 같은 문자열을 python 에서는 아래와 같이 전송한다. <source lang=python> socket.send("Hello") </source> 이제 실제 데이터가 어떤식으로 저장되는지를 확인해보자.

A ZeroMQ string

그리고 만약 이 데이터를 C 프로그램에서 수신한다면, 처음에는 마치 string 처럼 보일 것이다. 그리고 어쩌면 실제 string 처럼 동작할 지도 모른다(운이 좋아서 문자열에 뒤에 NULL byte가 붙어있을지도 모른다). 하지만 이는 C 에서 사용하는 string 형식이 아니다. 때문에 만약 Server 와 Client 에서 string 형식에 대해 사전에 협의 없다면, 프로그램이 제대로 작동하지 않을 수도 있다.

간단하게, C 에서 ZeroMQ 를 통해서 string 데이터를 수신할 경우, 문자열의 끝에 문자열 종료문자가 있는지/없는지를 정확히 예측할 수 없다. 그렇기 때무에 매번 string 을 읽을 때마다, 새로이 버퍼를 할당하고, 버퍼에 string 종료 문자를 염두해서 크기를 살짝 더 키우는 것도 잊으면 안된다.

자, 이쯤에서 한가지 규칙을 정하자. ZeroMQ string 은 길이-지정 형태이며, 자체적으로 NULL 종료 문자를 포함하지 않은 상태로 전송한다. 여기 c 에서 ZeroMQ string 을 수신할 때 어떻게 하는지 예시를 나타내었다. <source lang=c> // Receivce ZeroMQ string from socket and convert into C string // Chops string at 255 chars, if it's longer static char* s_recv(void* socket) {

 char buffer[256];
 int size = zmq_recv(socket, buffer, 255, 0);
 if(size == -1) {
   return NULL;
 }
 if(size > 255) {
   size = 255;
 }
 buffer[size] = 0;
 return strdup(buffer);

} </source> 쉽고, 간단하다. 이와 비슷하게 s_send() 함수도 만들 수 있다. 앞으로의 예제를 위해 이런 간단 함수들을 모아 놓은 파일을 따로 만들어놓았다. 아래의 링크에서 확인이 가능하다

Version Reporting

ZeroMQ 는 매우 자주 버전이 업그레이드 되어 배포된다. 만약 사용중 문제에 부딫혔다면, 아마도 최신 버전에서는 이미 문제가 해결되어 있을 것이다. 때문에 현재 사용중인 ZeroMQ 의 정확한 버전을 안다는 것은 꽤나 유용할 수도 있다.

여기에 버전 정보를 확인하는 프로그램이 있다. <source lang=c> // version.c // Report 0MQ version

  1. include <zmq.h>

int main(int argc, char** argv) {

   int major, minor, patch;
   zmq_version(&major, &minor, &patch);
   printf("Current 0MQ version is %d.%d.%d\n", major, minor, patch);
   
   return 0;

} </source> 다음과 같이 정보를 나타낸다. <source lang=bash> $ ./main

Current 0MQ version is 3.2.5 </source>

Getting the Message Out

두번째로 알아볼 패턴은 one-way data distribution 이다. server 는 데이터를 입력하고 client 는 데이터를 수신하는 방식이다. 예제를 통해 알아보자. 서버는 지속적으로 zip 코드, 온도, 습도 등의 날씨 정보를 업데이트한다. 진짜 날씨처럼 보이도록 랜덤 변수로 변화를 주도록 한다.

먼저 서버쪽 소스를 보자. <source lang=c> // wuserver.c // Weather update server // Binds PUB socket to tcp://*:5556 // Publishes random weather updates

  1. include "zhelpers.h"

int main(int argc, char** argv) {

   // Prepare our context and publisher
   void* context;
   void* publisher;
   int ret;
   
   context = zmq_ctx_new();
   publisher = zmq_socket(context, ZMQ_PUB);
   ret = zmq_bind(publisher, "tcp://*:5556");
   assert(ret == 0);
   
   // Initialize random number generator
   srandom((unsigned)time(NULL));
   while(1) {
       // Get value that will fool the boss
       int zipcode, temperature, relhumidity;
       zipcode = randof(100000);
       temperature = randof(215) - 80;
       relhumidity = randof(50) + 10;
       
       // Send mesage to all subscribers
       char update[20];
       sprintf(update, "%05d %d %d", zipcode, temperature, relhumidity);
       s_send(publisher, update);
   }
   zmq_close(publisher);
   zmq_ctx_destroy(context);
   
   return 0;
   

} </source>

클라이언트쪽 소스이다. 기본값으로 10001 NewYork zipcode 를 설정하여 10001 zipcode 를 가진 데이터 모두를 수집한다. <source lang=c> // wuclient.c // Weather updte client // Conntes SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode

  1. include "zhelpers.h"

int main(int argc, char** argv) {

   void* context;
   void* subscriber;
   int ret;
   
   // Socket to talk to server
   printf("Collecting updates from weather server..\n");
   context = zmq_ctx_new();
   subscriber = zmq_socket(context, ZMQ_SUB);
   ret = zmq_connect(subscriber, "tcp://localhost:5556");
   assert(ret == 0);
   
   // Subscribe to zipcode, default is NYC. 10001
   char* filter = (argc > 1)? argv[1]:"10001";
   ret = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
   assert(ret == 0);
   
   // Process 100 updates
   int update_nbr;
   long total_temp = 0;
   for(update_nbr = 0; update_nbr < 100; update_nbr++) {
       char* string = s_recv(subscriber);
       
       int zipcode, temperature, relhumidity;
       sscanf(string, "%d %d %d", &zipcode, &temperature, &relhumidity);
       total_temp += temperature;
       free(string);
   }
   printf("Average temperature for zipcode '%s' was %dF\n", filter, (int)(total_temp / update_nbr));
   
   zmq_close(subscriber);
   zmq_ctx_destroy(context);
   
   return 0;

} </source>

Publish-Subscribe

SUB socket 을 사용한다면 반드시 위의 client 소스에서처럼 zmq_setsockopt() 함수와 SUBSCRIBE 옵션을 이용하여 subscription 을 설정해야한다는 것을 알아두자. 만약 어떠한 subscription 도 설정하지 않았다면, 아무런 메시지도 수신하지 못할 것이다. 처음이라면 누구나 흔히들 겪는 실수이다. subscriber 는 여러개의 subscription 들을 함께 추가함으로써, 한번에 여러개의 subscription 들을 설정할 수 있다. 반대로 subscriber 는 특정 subscription 을 제거할 수도 있다. subscription 은 반드시 print 가능한 string 일 필요는 없다. 자세한 내용은 zmq_setsockopt()<ref>http://api.zeromq.org/3-2:zmq_setsockopt</ref>를 참고하면 된다.

PUB-SUB socket pair 는 비동기식으로 작동한다. client 는 loop에서 zmq_recv() 만을 수행한다. 만약 SUB socket 으로 메시지를 송신하려고 한다면 에러가 발생할 것이다. 마찬가지로, 서버쪽에서는 zmq_send() 로 데이터를 송신할 뿐이다. PUB socket 으로 zmq_recv() 를 하면 안된다.

See also