1 windows命令行使用nsq

nsq官网选择GitHub下载相应的发行版本,选择Windows发行版下载

下载完成之后,解压并进入到bin目录,可以看到有如下的文件:

然后按照nsq官网的quick start快速启动nsq,首先进入到bin目录:

  1. 开启一个cmd窗口,开启nsqlookupd

    1
    nsqlookupd
  2. 再开启一个cmd窗口,开启nsqd

    1
    nsqd --lookupd-tcp-address=127.0.0.1:4160
  3. 再开启一个窗口,开启nsqadmin

    1
    nsqadmin --lookupd-http-address=127.0.0.1:4161

    此时,执行完上面的命令之后,你就可以到浏览器输入以下地址进入管理页面:http://127.0.0.1:4171/

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Topic:主题名称
    empty Queue:清空队列
    Delete Topic : 删除主题
    Pause Topic : 暂停主题
    Memory+Disk : 内存和磁盘
    Messages : 表示消息总数
    channels : 消息通道
    In-Flight :飞行中,即将消费的消息
    Deferred : 延迟消息
    Requeued : 已请求的消息
    Time Out : 超时
    Connections : 连接数
  4. 再重新开启cmd窗口,生产一个消息,

    1
    curl -d 'hello world 1' http://127.0.0.1:4151/pub?topic=test

    这里需要注意,因为再Windows下使用curl命令的时候,网址不需要用单引号'括起来,否则会报错,错误信息解决可参考这里

  5. 然后再开启一个cmd窗口,你可以进行消费一个消息,使用nsq_to_file将某个topic的消息内容保存到文件中:

    1
    nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

注意:这些窗口都是持续监听的过程,不能关闭窗口,一旦有消息过来,消费者就会把消息写入文件。

2 Java使用nsq

Java使用nsq,可以去nsq官网查看相应的客户端库,选择其中一个即可。这里以JavaNSQClient为例。

  1. 首先创建一个maven项目

    maven项目的目录结构一般是标准化的,大致如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    a-maven-project
    ├── pom.xml
    ├── src
    │ ├── main
    │ │ ├── java
    │ │ └── resources
    │ └── test
    │ ├── java
    │ └── resources
    └── target

    项目的根目录a-maven-project是项目名,它有一个项目描述文件pom.xml,存放Java源码的目录是src/main/java,存放资源文件的目录是src/main/resources,存放测试源码的目录是src/test/java,存放测试资源的目录是src/test/resources,最后,所有编译、打包生成的文件都放在target目录里。这些就是一个Maven项目的标准目录结构。

  2. 然后再pom.xml文件中指定相应的依赖包:

    相关的maven依赖可以参考这个网址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.github.brainlag</groupId>
    <artifactId>nsq-client</artifactId>
    <version>1.0.0.RC4</version>
    </project>
  3. 然后去到JavaNSQClient的GitHub仓库下载源代码,该源码需要放到自己的项目目录中

  4. 然后再写生产者和消费者代码:

    • 生产者:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      import com.github.brainlag.nsq.NSQProducer;
      import com.github.brainlag.nsq.exceptions.NSQException;
      import java.text.SimpleDateFormat;
      import java.util.Date;
      import java.util.concurrent.TimeoutException;

      class nsqProducer {
      public void nsqProducer() {
      NSQProducer producer = new NSQProducer();
      producer.addAddress("127.0.0.1", 4150).start();
      try {
      SimpleDateFormat sdf = new SimpleDateFormat();
      sdf.applyPattern("yyyy-MM-dd HH:mm:ss a");
      for (int i = 0; i < 5; i++) {
      Date date = new Date();
      String message = "message go go go, cur_time: " + sdf.format(date) + ", cur_No: " + i;
      producer.produce("test", message.getBytes());
      System.out.println("生产了一个消息---> " + message);
      Thread.sleep(1000);
      }
      } catch (TimeoutException | InterruptedException | NSQException e) {
      e.printStackTrace();
      }
      }
      }
    • 消费者:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      import com.github.brainlag.nsq.NSQConsumer;
      import com.github.brainlag.nsq.NSQMessage;
      import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
      import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
      import com.github.brainlag.nsq.lookup.NSQLookup;
      import java.util.concurrent.Executor;

      public class nsqConsumer {
      public void nsqConsumer() {
      NSQLookup lookup = new DefaultNSQLookup();
      lookup.addLookupAddress("127.0.0.1", 4161);
      NSQConsumer consumer = new NSQConsumer(lookup, "test", "nsq_to_file", new NSQMessageCallback() {
      @Override
      public void message(NSQMessage message) {
      byte b[] = message.getMessage();
      String s = new String(b);
      System.out.println("消费了一个消息---> " + s);
      System.out.println(s);
      message.finished();
      }
      });
      consumer.start();
      //线程睡眠,让程序执行完
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      consumer.setExecutor(new Executor() {
      @Override
      public void execute(Runnable command) {
      // TODO Auto-generated method stub
      System.out.println("消费者开始消费");
      }
      });
      }
      }
    • 测试函数:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      public class Test {
      public static void main(String[] args) {
      nsqProducer producer = new nsqProducer();
      System.out.println("生产者开始运行。。。。");
      producer.nsqProducer();
      System.out.println("生产者结束运行。。。。");

      nsqConsumer consumer = new nsqConsumer();
      consumer.nsqConsumer();
      System.out.println("消费者已经运行。。。");
      }
      }

      这些代码能够正常运行的前提是nsq的监听程序已经开启。然后即可到相应的文件中查看是否有输出。

3 nsq

3.1 nsq四大组件与三大进程

三大进行:

  • nsqd是一个接收、排队、然后转发消息到客户端的进程。
  • nsqlookupd 管理拓扑信息并提供最终一致性的发现服务。
  • nsqadmin用于实时查看集群的统计数据(并且执行各种各样的管理任务)。

四大组件:

  • nsqlookupd:管理nsqd节点拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqd:负责接收、排队、转发消息到客户端的守护进程,并且定时向nsqlookupd服务发送心跳
  • nsqadmin:nsq的web统计界面,可实时查看集群的统计数据和执行一些管理任务
  • utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

各个组件之间的拓扑关系如下:

3.2 nsq数据流模型

3.3 nsqd执行图

单个 nsqd 可以有多个 Topic,每个 Topic 又可以有多个 Channel。Channel 能够接收 Topic 所有消息的副本,从而实现了消息多播分发;而 Channel 上的每个消息被分发给它的订阅者,从而实现负载均衡,所有这些就组成了一个可以表示各种简单和复杂拓扑结构的强大框架。

Reference

写在最后

欢迎大家关注鄙人的公众号【麦田里的守望者zhg】,让我们一起成长,谢谢。
微信公众号