nanomsg-examples icon indicating copy to clipboard operation
nanomsg-examples copied to clipboard

Examples of different message types in C with nanomsg

#+TITLE: nanomsg #+AUTHOR: Tim Dysinger #+EMAIL: [email protected] #+DATE: <2013-08-23 Fri>

  • Pipeline

*** Code

#+begin_src c :tangle ./pipeline.c
  #include <assert.h>
  #include <libc.h>
  #include <stdio.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/pipeline.h>

  #define NODE0 "node0"
  #define NODE1 "node1"

  int node0 (const char *url)
  {
    int sock = nn_socket (AF_SP, NN_PULL);
    assert (sock >= 0);
    assert (nn_bind (sock, url) >= 0);
    while (1)
      {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        assert (bytes >= 0);
        printf ("NODE0: RECEIVED \"%s\"\n", buf);
        nn_freemsg (buf);
      }
  }

  int node1 (const char *url, const char *msg)
  {
    int sz_msg = strlen (msg) + 1; // '\0' too
    int sock = nn_socket (AF_SP, NN_PUSH);
    assert (sock >= 0);
    assert (nn_connect (sock, url) >= 0);
    printf ("NODE1: SENDING \"%s\"\n", msg);
    int bytes = nn_send (sock, msg, sz_msg, 0);
    assert (bytes == sz_msg);
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1)
      return node0 (argv[2]);
    else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 2)
      return node1 (argv[2], argv[3]);
    else
      {
        fprintf (stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
                 NODE0, NODE1);
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./pipeline.sh
  gcc pipeline.c /usr/local/lib/libnanomsg.a -o pipeline
#+end_src

*** Run

#+begin_src sh :tangle ./pipeline.sh
  ./pipeline node0 ipc:///tmp/pipeline.ipc & node0=$! && sleep 1
  ./pipeline node1 ipc:///tmp/pipeline.ipc "Hello, World!"
  ./pipeline node1 ipc:///tmp/pipeline.ipc "Goodbye."
  kill $node0
#+end_src

*** Results

#+begin_src text
  NODE1: SENDING "Hello, World!"
  NODE0: RECEIVED "Hello, World!"
  NODE1: SENDING "Goodbye."
  NODE0: RECEIVED "Goodbye."
#+end_src
  • Request/Reply

*** Code

#+begin_src c :tangle ./reqrep.c
  #include <assert.h>
  #include <libc.h>
  #include <stdio.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/reqrep.h>

  #define NODE0 "node0"
  #define NODE1 "node1"
  #define DATE "DATE"

  char *date ()
  {
    time_t raw = time (&raw);
    struct tm *info = localtime (&raw);
    char *text = asctime (info);
    text[strlen(text)-1] = '\0'; // remove '\n'
    return text;
  }

  int node0 (const char *url)
  {
    int sz_date = strlen (DATE) + 1; // '\0' too
    int sock = nn_socket (AF_SP, NN_REP);
    assert (sock >= 0);
    assert (nn_bind (sock, url) >= 0);
    while (1)
      {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        assert (bytes >= 0);
        if (strncmp (DATE, buf, sz_date) == 0)
          {
            printf ("NODE0: RECEIVED DATE REQUEST\n");
            char *d = date();
            int sz_d = strlen(d) + 1; // '\0' too
            printf ("NODE0: SENDING DATE %s\n", d);
            bytes = nn_send (sock, d, sz_d, 0);
            assert (bytes == sz_d);
          }
        nn_freemsg (buf);
      }
    return nn_shutdown (sock, 0);
  }

  int node1 (const char *url)
  {
    int sz_date = strlen(DATE) + 1; // '\0' too
    char *buf = NULL;
    int bytes = -1;
    int sock = nn_socket (AF_SP, NN_REQ);
    assert (sock >= 0);
    assert (nn_connect (sock, url) >= 0);
    printf ("NODE1: SENDING DATE REQUEST %s\n", DATE);
    bytes = nn_send (sock, DATE, sz_date, 0);
    assert (bytes == sz_date);
    bytes = nn_recv (sock, &buf, NN_MSG, 0);
    assert (bytes >= 0);
    printf ("NODE1: RECEIVED DATE %s\n", buf, bytes);
    nn_freemsg (buf);
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1)
      return node0 (argv[2]);
    else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 1)
      return node1 (argv[2]);
    else
      {
        fprintf (stderr, "Usage: reqrep %s|%s <URL> <ARG> ...\n",
                 NODE0, NODE1);
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./reqrep.sh
  gcc reqrep.c /usr/local/lib/libnanomsg.a -o reqrep
#+end_src

*** Run

#+begin_src sh :tangle ./reqrep.sh
  ./reqrep node0 ipc:///tmp/reqrep.ipc & node0=$! && sleep 1
  ./reqrep node1 ipc:///tmp/reqrep.ipc
  kill $node0
#+end_src

*** Results

#+begin_src text
  NODE1: SENDING DATE REQUEST DATE
  NODE0: RECEIVED DATE REQUEST
  NODE0: SENDING DATE Sat Sep  7 17:39:01 2013
  NODE1: RECEIVED DATE Sat Sep  7 17:39:01 2013
#+end_src
  • Pair

*** Code

#+begin_src c :tangle ./pair.c
  #include <assert.h>
  #include <libc.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/pair.h>
  #include <stdio.h>

  #define NODE0 "node0"
  #define NODE1 "node1"

  int send_name(int sock, const char *name)
  {
    printf ("%s: SENDING \"%s\"\n", name, name);
    int sz_n = strlen (name) + 1; // '\0' too
    return nn_send (sock, name, sz_n, 0);
  }

  int recv_name(int sock, const char *name)
  {
    char *buf = NULL;
    int result = nn_recv (sock, &buf, NN_MSG, 0);
    if (result > 0)
      {
        printf ("%s: RECEIVED \"%s\"\n", name, buf);
        nn_freemsg (buf);
      }
    return result;
  }

  int send_recv(int sock, const char *name)
  {
    int to = 100;
    assert (nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0);
    while(1)
      {
        recv_name(sock, name);
        sleep(1);
        send_name(sock, name);
      }
  }

  int node0 (const char *url)
  {
    int sock = nn_socket (AF_SP, NN_PAIR);
    assert (sock >= 0);
    assert (nn_bind (sock, url) >= 0);
    send_recv(sock, NODE0);
    return nn_shutdown (sock, 0);
  }

  int node1 (const char *url)
  {
    int sock = nn_socket (AF_SP, NN_PAIR);
    assert (sock >= 0);
    assert (nn_connect (sock, url) >= 0);
    send_recv(sock, NODE1);
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1)
      return node0 (argv[2]);
    else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 1)
      return node1 (argv[2]);
    else
      {
        fprintf (stderr, "Usage: pair %s|%s <URL> <ARG> ...\n",
                 NODE0, NODE1);
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./pair.sh
  gcc pair.c /usr/local/lib/libnanomsg.a -o pair
#+end_src

*** Run

#+begin_src sh :tangle ./pair.sh
  ./pair node0 ipc:///tmp/pair.ipc & node0=$!
  ./pair node1 ipc:///tmp/pair.ipc & node1=$!
  sleep 3
  kill $node0 $node1
#+end_src

*** Results

#+begin_src text
  NODE1: SENDING HELLO
  NODE0: RECEIVED HELLO
  NODE0: SENDING HELLO
  NODE1: RECEIVED HELLO
  NODE1: SENDING HELLO
  NODE0: RECEIVED HELLO
  NODE0: SENDING HELLO
  NODE1: RECEIVED HELLO
#+end_src
  • Pub/Sub

*** Code

#+begin_src c :tangle ./pubsub.c
  #include <assert.h>
  #include <libc.h>
  #include <stdio.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/pubsub.h>

  #define SERVER "server"
  #define CLIENT "client"

  char *date ()
  {
    time_t raw = time (&raw);
    struct tm *info = localtime (&raw);
    char *text = asctime (info);
    text[strlen(text)-1] = '\0'; // remove '\n'
    return text;
  }

  int server (const char *url)
  {
    int sock = nn_socket (AF_SP, NN_PUB);
    assert (sock >= 0);
    assert (nn_bind (sock, url) >= 0);
    while (1)
      {
        char *d = date();
        int sz_d = strlen(d) + 1; // '\0' too
        printf ("SERVER: PUBLISHING DATE %s\n", d);
        int bytes = nn_send (sock, d, sz_d, 0);
        assert (bytes == sz_d);
        sleep(1);
      }
    return nn_shutdown (sock, 0);
  }

  int client (const char *url, const char *name)
  {
    int sock = nn_socket (AF_SP, NN_SUB);
    assert (sock >= 0);
    // TODO learn more about publishing/subscribe keys
    assert (nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) >= 0);
    assert (nn_connect (sock, url) >= 0);
    while (1)
      {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        assert (bytes >= 0);
        printf ("CLIENT (%s): RECEIVED %s\n", name, buf);
        nn_freemsg (buf);
      }
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (strncmp (SERVER, argv[1], strlen (SERVER)) == 0 && argc >= 2)
      return server (argv[2]);
    else if (strncmp (CLIENT, argv[1], strlen (CLIENT)) == 0 && argc >= 3)
      return client (argv[2], argv[3]);
    else
      {
        fprintf (stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n",
                 SERVER, CLIENT);
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./pubsub.sh
  gcc pubsub.c /usr/local/lib/libnanomsg.a -o pubsub
#+end_src

*** Run

#+begin_src sh :tangle ./pubsub.sh
  ./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1
  ./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$!
  ./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$!
  ./pubsub client ipc:///tmp/pubsub.ipc client2 & client2=$!
  sleep 5
  kill $server $client0 $client1 $client2
#+end_src

*** Results

#+begin_src text
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:11 2013
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:12 2013
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:13 2013
  CLIENT (client2): RECEIVED Sat Sep  7 17:40:13 2013
  CLIENT (client0): RECEIVED Sat Sep  7 17:40:13 2013
  CLIENT (client1): RECEIVED Sat Sep  7 17:40:13 2013
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:14 2013
  CLIENT (client2): RECEIVED Sat Sep  7 17:40:14 2013
  CLIENT (client1): RECEIVED Sat Sep  7 17:40:14 2013
  CLIENT (client0): RECEIVED Sat Sep  7 17:40:14 2013
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:15 2013
  CLIENT (client1): RECEIVED Sat Sep  7 17:40:15 2013
  CLIENT (client2): RECEIVED Sat Sep  7 17:40:15 2013
  CLIENT (client0): RECEIVED Sat Sep  7 17:40:15 2013
  SERVER: PUBLISHING DATE Sat Sep  7 17:40:16 2013
  CLIENT (client1): RECEIVED Sat Sep  7 17:40:16 2013
  CLIENT (client2): RECEIVED Sat Sep  7 17:40:16 2013
  CLIENT (client0): RECEIVED Sat Sep  7 17:40:16 2013
#+end_src
  • Survey

*** Code

#+begin_src c :tangle ./survey.c
  #include <assert.h>
  #include <libc.h>
  #include <stdio.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/survey.h>

  #define SERVER "server"
  #define CLIENT "client"
  #define DATE   "DATE"

  char *date ()
  {
    time_t raw = time (&raw);
    struct tm *info = localtime (&raw);
    char *text = asctime (info);
    text[strlen(text)-1] = '\0'; // remove '\n'
    return text;
  }

  int server (const char *url)
  {
    int sock = nn_socket (AF_SP, NN_SURVEYOR);
    assert (sock >= 0);
    assert (nn_bind (sock, url) >= 0);
    sleep(1); // wait for connections
    int sz_d = strlen(DATE) + 1; // '\0' too
    printf ("SERVER: SENDING DATE SURVEY REQUEST\n");
    int bytes = nn_send (sock, DATE, sz_d, 0);
    assert (bytes == sz_d);
    while (1)
      {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        if (bytes == ETIMEDOUT) break;
        if (bytes >= 0)
        {
          printf ("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
          nn_freemsg (buf);
        }
      }
    return nn_shutdown (sock, 0);
  }

  int client (const char *url, const char *name)
  {
    int sock = nn_socket (AF_SP, NN_RESPONDENT);
    assert (sock >= 0);
    assert (nn_connect (sock, url) >= 0);
    while (1)
      {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        if (bytes >= 0)
          {
            printf ("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
            nn_freemsg (buf);
            char *d = date();
            int sz_d = strlen(d) + 1; // '\0' too
            printf ("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", name);
            int bytes = nn_send (sock, d, sz_d, 0);
            assert (bytes == sz_d);
          }
      }
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (strncmp (SERVER, argv[1], strlen (SERVER)) == 0 && argc >= 2)
      return server (argv[2]);
    else if (strncmp (CLIENT, argv[1], strlen (CLIENT)) == 0 && argc >= 3)
      return client (argv[2], argv[3]);
    else
      {
        fprintf (stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",
                 SERVER, CLIENT);
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./survey.sh
  gcc survey.c /usr/local/lib/libnanomsg.a -o survey
#+end_src

*** Run

#+begin_src sh :tangle ./survey.sh
  ./survey server ipc:///tmp/survey.ipc & server=$!
  ./survey client ipc:///tmp/survey.ipc client0 & client0=$!
  ./survey client ipc:///tmp/survey.ipc client1 & client1=$!
  ./survey client ipc:///tmp/survey.ipc client2 & client2=$!
  sleep 3
  kill $server $client0 $client1 $client2
#+end_src

*** Results

#+begin_src text
  SERVER: SENDING DATE SURVEY REQUEST
  CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
  CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
  CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
  CLIENT (client0): SENDING DATE SURVEY RESPONSE
  CLIENT (client1): SENDING DATE SURVEY RESPONSE
  CLIENT (client2): SENDING DATE SURVEY RESPONSE
  SERVER: RECEIVED "Sun Sep 15 13:39:46 2013" SURVEY RESPONSE
  SERVER: RECEIVED "Sun Sep 15 13:39:46 2013" SURVEY RESPONSE
  SERVER: RECEIVED "Sun Sep 15 13:39:46 2013" SURVEY RESPONSE
#+end_src
  • Bus

*** Code

#+begin_src c :tangle ./bus.c
  #include <assert.h>
  #include <libc.h>
  #include <stdio.h>
  #include <nanomsg/nn.h>
  #include <nanomsg/bus.h>

  int node (const int argc, const char **argv)
  {
    int sock = nn_socket (AF_SP, NN_BUS);
    assert (sock >= 0);
    assert (nn_bind (sock, argv[2]) >= 0);
    sleep (1); // wait for connections
    if (argc >= 3)
      {
        int x=3;
        for(x; x<argc; x++)
          assert (nn_connect (sock, argv[x]) >= 0);
      }
    sleep (1); // wait for connections
    int to = 100;
    assert (nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0);
    // SEND
    int sz_n = strlen(argv[1]) + 1; // '\0' too
    printf ("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
    int send = nn_send (sock, argv[1], sz_n, 0);
    assert (send == sz_n);
    while (1)
      {
        // RECV
        char *buf = NULL;
        int recv = nn_recv (sock, &buf, NN_MSG, 0);
        if (recv >= 0)
          {
            printf ("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf);
            nn_freemsg (buf);
          }
      }
    return nn_shutdown (sock, 0);
  }

  int main (const int argc, const char **argv)
  {
    if (argc >= 3) node (argc, argv);
    else
      {
        fprintf (stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
        return 1;
      }
  }
#+end_src

*** Compile

#+begin_src sh :tangle ./bus.sh
  gcc bus.c /usr/local/lib/libnanomsg.a -o bus
#+end_src

*** Run

#+begin_src sh :tangle ./bus.sh
  gcc bus.c /usr/local/lib/libnanomsg.a -o bus
  ./bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$!
  ./bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$!
  ./bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$!
  ./bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$!
  sleep 5
  kill $node0 $node1 $node2 $node3
#+end_src

*** Results

#+begin_src sh
  node0: SENDING 'node0' ONTO BUS
  node1: SENDING 'node1' ONTO BUS
  node2: SENDING 'node2' ONTO BUS
  node3: SENDING 'node3' ONTO BUS
  node0: RECEIVED 'node1' FROM BUS
  node0: RECEIVED 'node2' FROM BUS
  node0: RECEIVED 'node3' FROM BUS
  node1: RECEIVED 'node0' FROM BUS
  node1: RECEIVED 'node2' FROM BUS
  node1: RECEIVED 'node3' FROM BUS
  node2: RECEIVED 'node0' FROM BUS
  node2: RECEIVED 'node1' FROM BUS
  node2: RECEIVED 'node3' FROM BUS
  node3: RECEIVED 'node0' FROM BUS
  node3: RECEIVED 'node1' FROM BUS
  node3: RECEIVED 'node2' FROM BUS
#+end_src